版本 3.7.5
taskList = []
task = asyncio.create_task(nested())
task1 = asyncio.create_task(nested())
taskList.append(task)
taskList.append(task1)
await asyncio.wait_for(taskList timeout=10)
然而 wait_for 不支持 task list.
请问怎么创建一堆 task,给每个 task 设置一个超时时间,然后并发运行呢。
如果一个 async 函数里有 time.sleep() ,那么 wait_for 设置的超时时间就无效,请问这个怎么解决?当然正确的方法是用 asyncio.sleep().然而第三方的库里面,它用的是 time.sleep()呢,那么无法 wait_for()超时中断了吗,task 有没有 kill 机制呢
import asyncio
async def nested():
await asyncio.sleep(10000)
async def main():
taskList = []
waitList = []
task = asyncio.create_task(nested())
task1 = asyncio.create_task(nested())
t1 = asyncio.create_task(asyncio.wait_for(task,timeout=10))
t2 = asyncio.create_task(asyncio.wait_for(task1,timeout=10))
waitList.append(t1)
waitList.append(t2)
await asyncio.gather(*waitList)
asyncio.run(main())
想到一种套娃实现。。可以了。。
1
wwqgtxx 2019-12-07 10:43:02 +08:00 1
在 async 函数中如果使用了 time.sleep(),无论如何都不可能中断的,也不可能 kill,除非你 kill 掉这个 thread 再重启一个 loop
如果一定要用该第三方库,请调用 https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor 把它单独丢到一个线程中(注意,python 中依然没有办法超时 Kill 一个 thread,这是非法的,当然你可以换一个 ProcessPoolExecutor 来把任务丢到一个新的进程,这样去 kill ) |
2
ClericPy 2019-12-07 10:43:38 +08:00 1
简单等待
coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED) 并发运行 aws 指定的 可等待对象 并阻塞线程直到满足 return_when 指定的条件。 返回两个 Task/Future 集合: (done, pending)。 用法: done, pending = await asyncio.wait(aws) 如指定 timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数。 请注意此函数不会引发 asyncio.TimeoutError。当超时发生时,未完成的 Future 或 Task 将在指定秒数后被返回。 time.sleep 本来就不是异步的, 非要用, 有三个方法吧 1. 模仿 gevent 猴子补丁把 time.sleep 给 hack 掉, 不建议 2. 动态语言可以加载以后动态修改原始函数, 把第三方的那个函数给修改掉成协程版本 3. 这种本来就不是异步函数的, 直接套个 executor 然后 await loop.run_in_executor 就可以了, 这个最保险 |
3
qcts33 2019-12-07 11:06:15 +08:00
等待一组 task 的话可以用 asyncio.gather()或 asyncio.as_completed()啊
不要用 time.sleep(),这个是不是异步的,应该用专门的 asyncio.sleep() |
5
pmispig OP |
6
pmispig OP @ClericPy 0.0 刚重新看了下文档,可以设置 timeout,与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象。
|
7
keepeye 2019-12-07 11:41:49 +08:00
taskList = []
task = asyncio.wait_for(asyncio.create_task(nested()), 10) task1 = asyncio.wait_for(asyncio.create_task(nested()), 10) taskList.append(task) taskList.append(task1) await asyncio.gather(*taskList) |
8
ClericPy 2019-12-07 11:45:36 +08:00
@pmispig #6 对的, asyncio 里大部分设计都比较明确, 就是 wait 和 gather 以及是否默认 cancel 这部分比较混乱. 又懒得装 trio 那些, 只能硬着头皮看源码看文档了
|
9
pmispig OP ```
import asyncio async def nested(): await asyncio.sleep(10000) async def main(): taskList = [] waitList = [] task = asyncio.create_task(nested()) task1 = asyncio.create_task(nested()) t1 = asyncio.create_task(asyncio.wait_for(task,timeout=10)) t2 = asyncio.create_task(asyncio.wait_for(task1,timeout=10)) waitList.append(t1) waitList.append(t2) await asyncio.gather(*waitList) asyncio.run(main()) ``` 想了一种套娃实现... |
10
0Y89tX3MgR4I 2019-12-07 12:44:31 +08:00
用 asyncio 不爽的一个地方就是所有的组件都要换成支持 asyncio 的
|
11
0Y89tX3MgR4I 2019-12-07 12:44:52 +08:00
@0Y89tX3MgR4I 不支持的只能放到一个单独的线程里面
|
12
jingcoco 2019-12-07 14:25:54 +08:00
小白,用 python 3.8 运行楼主的 报错 ........
|
13
pmispig OP @jingcoco concurrent.futures._base.TimeoutError 报这个是正常的,目的就是为了引出这个异常
|
14
wwqgtxx 2019-12-07 15:22:45 +08:00 via iPhone 1
我自己写过一个给 task 加上 timeout 的辅助函数,你可以参考一下,不过这样输出的会是 asyncio.CancelledError
https://github.com/wwqgtxx/wwqLyParse/blob/master/wwqLyParse/common/asyncio.py#L182 |
15
wwqgtxx 2019-12-07 15:29:17 +08:00 via iPhone
不过其实 python 官方的 wait_for 的实现也是类似思路
https://github.com/python/cpython/blob/3.8/Lib/asyncio/tasks.py#L434 |
17
ClericPy 2019-12-07 18:22:29 +08:00
@pmispig #16 可以借鉴 aiohttp 依赖的 async-timeout
wait 之所以不主动 cancel 看它 api 就明白了, 区分了 pending 和 done, pending 的自己 cancel 就好了 |