ToolDelta ICU

tooldelta_thread

attributePT
= ParamSpec('PT')
attributeVT
= TypeVar('VT')
attributeRT
= TypeVar('RT')
attributethreads_listlist[ToolDeltaThread]
= []
attributecreateThread
= ToolDeltaThread
funcget_threads_list() -> list[createThread]

返回使用 createThread 创建的全线程列表。

Returns

list[tooldelta.utils.tooldelta_thread.createThread]
functhread_func(usage, thread_level=ToolDeltaThread.PLUGIN)

在事件方法可能执行较久会造成堵塞时使用,方便快捷地创建一个新线程,例如:

@thread_func("一个会卡一分钟的线程")
def on_inject(self):
    ...
paramusagestr
paramthread_level
= ToolDeltaThread.PLUGIN

Returns

None
functhread_gather(funcs_and_args) -> list[VT]

使用回调线程作为并行器的伪异步执行器 可以用于并发获取多个阻塞方法的返回

>>> results = thread_gather([
    (requests.get, ("https://www.github.com",)),
    (time.sleep, (1,)),
    (sum, (1, 3, 5))
])
>>> results
[\<Response 200>, None, 9]

Args: funcs_and_args (list[tuple[Callable[..., VT], tuple]]): (方法, 参数) 的列表 Returns

Returns: list[VT]: 方法的返回 (按传入方法的顺序依次返回其结果)

paramfuncs_and_argslist[tuple[Callable[..., VT], tuple]]

Returns

list[tooldelta.utils.tooldelta_thread.VT]
funcforce_stop_normal_threads()

强制终止非系统级线程, 仅系统内部调用

Returns

None