tooldelta_thread
attribute
PT= ParamSpec('PT')
attribute
VT= TypeVar('VT')
attribute
RT= TypeVar('RT')
attribute
threads_listlist[ToolDeltaThread]
= []
attribute
createThread= ToolDeltaThread
func
get_threads_list() -> list[createThread]
返回使用 createThread 创建的全线程列表。
Returns
list[tooldelta.utils.tooldelta_thread.createThread]
func
thread_func(usage, thread_level=ToolDeltaThread.PLUGIN)
在事件方法可能执行较久会造成堵塞时使用,方便快捷地创建一个新线程,例如:
@thread_func("一个会卡一分钟的线程")
def on_inject(self):
...
param
usagestr
param
thread_level= ToolDeltaThread.PLUGIN
Returns
None
func
thread_gather(funcs_and_args) -> list[VT]
使用回调线程作为并行器的伪异步执行器 可以用于并发获取多个阻塞方法的返回
>>> results = thread_gather([
(requests.get, ("https://www.github.com",)),
(time.sleep, (1,)),
(sum, (1, 3, 5))
])
>>> results
[\<Response 200>, None, 9]
Args: funcs_and_args (list[tuple[Callable[..., VT], tuple]]): (方法, 参数) 的列表 Returns
Returns: list[VT]: 方法的返回 (按传入方法的顺序依次返回其结果)
param
funcs_and_argslist[tuple[Callable[..., VT], tuple]]
Returns
list[tooldelta.utils.tooldelta_thread.VT]
func
force_stop_normal_threads()
强制终止非系统级线程, 仅系统内部调用
Returns
None