tooldelta_thread
attributePT= ParamSpec('PT')attributeVT= TypeVar('VT')attributeRT= TypeVar('RT')attributethreads_listlist[ToolDeltaThread]= []attributecreateThread= ToolDeltaThreadfuncget_threads_list() -> list[createThread]返回使用 createThread 创建的全线程列表。
Returns
list[tooldelta.utils.tooldelta_thread.createThread]functhread_func(usage, thread_level=ToolDeltaThread.PLUGIN)在事件方法可能执行较久会造成堵塞时使用,方便快捷地创建一个新线程,例如:
@thread_func("一个会卡一分钟的线程")
def on_inject(self):
...paramusagestrparamthread_level= ToolDeltaThread.PLUGINReturns
Nonefuncthread_gather(funcs_and_args) -> list[VT]使用回调线程作为并行器的伪异步执行器 可以用于并发获取多个阻塞方法的返回
>>> results = thread_gather([
(requests.get, ("https://www.github.com",)),
(time.sleep, (1,)),
(sum, (1, 3, 5))
])
>>> results
[\<Response 200>, None, 9]Args: funcs_and_args (list[tuple[Callable[..., VT], tuple]]): (方法, 参数) 的列表 Returns
Returns: list[VT]: 方法的返回 (按传入方法的顺序依次返回其结果)
paramfuncs_and_argslist[tuple[Callable[..., VT], tuple]]Returns
list[tooldelta.utils.tooldelta_thread.VT]funcforce_stop_normal_threads()强制终止非系统级线程, 仅系统内部调用
Returns
None