V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
dongcheng
V2EX  ›  Python

问一个关于 asyncio 的问题,当多个 task 运行,其中一个正确返回后,怎么取消其他协程?

  •  
  •   dongcheng · 2021-01-22 10:33:25 +08:00 · 2839 次点击
    这是一个创建于 1401 天前的主题,其中的信息可能已经有所发展或是发生改变。

    一个简单例子 import asyncio import time

    特殊函数

    async def get_request(url): print('正在下载',url) # time.sleep(2) 不支持异步的模块 会中断整个的异步效果 await asyncio.sleep(2) print('下载完成',url) return 'page_text'

    def parse(task): print(task.result())

    start = time.time() urls = ['www.xxx1.com','www.xxx2.com','www.xxx3.com']

    tasks = [] #存放多任务 for url in urls: # 调用特殊函数 func = get_request(url)

    # 创建任务对象
    task = asyncio.ensure_future(func)
    
    # 给任务对象绑定回调函数
    task.add_done_callback(parse)
    tasks.append(task)
    

    创建事件循环对象

    loop = asyncio.get_event_loop()

    执行任务

    loop.run_until_complete(asyncio.wait(tasks)) print('总耗时:',time.time()-start) #2.0015313625335693

    可以在 get_request 判断任务是否成功,但是如何判断成功之后取消其他任务呢?

    谢谢

    23 条回复    2021-01-22 14:38:17 +08:00
    chanchancl
        1
    chanchancl  
       2021-01-22 10:46:51 +08:00
    将 func 包装成一个 function

    在结尾的地方,将 tasks 中,除了自己的 task 都 cancle 掉
    pursuer
        2
    pursuer  
       2021-01-22 10:52:53 +08:00
    如果想要让协程提前结束等待,可以使用 asyncio.wait(...,return_when='FIRST_COMPLETED')或者更灵活的方案 asyncio.Future,然后处理下资源释放
    dongcheng
        3
    dongcheng  
    OP
       2021-01-22 11:02:28 +08:00
    @chanchancl 这个 func 能获取到 task 吗?
    sujin190
        4
    sujin190  
       2021-01-22 11:08:05 +08:00   ❤️ 4
    你是不是想错了,对于协程来说,如果设计到 io,那么就算能取消协程其实并不能取消 io 操作

    比如 http 请求,就算你要取消协程其实并不能取消已经发送出去的 http 请求,取消协程完全没有意义,如果你能取消 io,比如关闭 http 连接来取消 http 请求,那么你应该通过关闭 http 请求的方式来触发协程返回,即先取消 io 操作再由 io 完成来触发协程返回,而不能倒过来

    对于非 io 请求,再整个协程完成前会独占线程,并不会调度到其他协程,所以你自然也不能用其他协程来取消当前正在运行的协程了
    keepeye
        5
    keepeye  
       2021-01-22 11:10:10 +08:00
    不是有 tasks 吗?遍历一下都 cancel 掉
    fiveelementgid
        6
    fiveelementgid  
       2021-01-22 11:10:13 +08:00 via Android
    看见 Task 和 continuation 还激动了一下以为有. NET 玩家一起研究 TAP 了,一看是 Python,告辞
    sujin190
        7
    sujin190  
       2021-01-22 11:12:28 +08:00
    对于非 io 请求协程再补充一点,似乎协程再创建的时候就会进行首次运行,没 io 操作,所以首次运行必然独占线程一直到运行完成,所以这种情况下完全做不了中途取消的操作
    dongcheng
        8
    dongcheng  
    OP
       2021-01-22 11:20:08 +08:00
    @sujin190 应该可以取消 task 吧。只是有局限。

    @keepeye asyncio.Task.all_tasks()可以获取 all task 。但是现在有个需求是嵌套的 task,可以取消所有 task,不能取消第 2 层的 task (保留第一层)
    sujin190
        9
    sujin190  
       2021-01-22 11:26:56 +08:00
    @dongcheng #8 但是没意义,不符合协程原理实现,从程序的可靠性严谨性来说也不应该出现这种设计,不关闭 io 而取消上层协程是很不科学的,很容易导致 io 、连接泄露啥的,而且还不容易排查是啥问题
    xiaoHuang3
        10
    xiaoHuang3  
       2021-01-22 11:48:58 +08:00
    @sujin190 附议~,我觉得楼主这种需求根本不适合用 asyncio 来做- -
    crclz
        11
    crclz  
       2021-01-22 12:10:44 +08:00
    @fiveelementgid .NET 的设计就很清晰,TAP 、cancellation token

    如果是.NET 的话,应该把一个 CancellationTokenSource ( cts )的 CancellationToken 传给所有异步方法,异步方法会返回 task,再等 Task.WhenAny 返回后,再取消 cts 。

    由于 CancellationToken 是层层深入的( HTTP 、TCP ),所以底层负责 TCP 的代码也会取消正在进行的操作,然后会扔出一个取消异常,最后捕获这个异常就行了(在异步方法中)。

    我想 python 的问题就是,底层的 api 都没有提供 cancellation token 的参数,导致无法让取消操作层层传播。
    abersheeran
        12
    abersheeran  
       2021-01-22 12:21:18 +08:00   ❤️ 1
    写过相关代码,在我的项目里使用过很久,一切良好。

    https://gist.github.com/abersheeran/bd2372bb35fec859d7fca453ca5f7826
    abersheeran
        13
    abersheeran  
       2021-01-22 12:32:01 +08:00
    刚看到楼主在 #8 说有嵌套 Task 。很遗憾,asyncio 不支持嵌套取消。可以试试 trio,宣传上说是解决了这个问题。我也没试过。
    optional
        14
    optional  
       2021-01-22 12:33:41 +08:00 via Android
    从设计上来说,这是不可取消的,如果有 transaction 对象之类,可以标记一下
    fiveelementgid
        15
    fiveelementgid  
       2021-01-22 13:01:58 +08:00 via Android
    @crclz Sure,我昨晚刚看完印象深刻,特地还去扒了一下源码。最底下就是 new 一个新的 Token 和上层传下来的 Token 然后 Link,整层整层地控制
    dongcheng
        16
    dongcheng  
    OP
       2021-01-22 13:30:11 +08:00
    @abersheeran 感谢分享代码。研究了下 asyncio.as_completed(tasks)可以实现类似的功能
    zhuangzhuang1988
        17
    zhuangzhuang1988  
       2021-01-22 13:36:50 +08:00
    @crclz 微软还是最牛的
    cancal 进度报告是 api 中设计的
    xuboying
        18
    xuboying  
       2021-01-22 13:41:24 +08:00
    @abersheeran #12 个人感觉这个代码片段用类来实现可读性可能会更好一点。loop = loop or asyncio.get_running_loop()学到了。之前写的时候还没这个函数。。。
    imn1
        19
    imn1  
       2021-01-22 13:44:33 +08:00
    刚好有类似需求
    没找到满意的解决方案
    目前是用异步队列,当满足一定条件就清空队列,但条件满足时,已经并行启动的不能取消,要继续工作,只是清空了未启动的队列
    js8510
        20
    js8510  
       2021-01-22 14:19:27 +08:00
    同 @sujin190 。 我猜测可能想的是 multiprocessing 而不是 asyncio.
    如果 pre request/ post response 有 cpu heavy 的处理。并发倒是有意义的。

    你可以 map_async 。如果 i/o 还没发生,或者已经发生,你可以 terminate 掉其他 process. 如果正在 i/o blocking 你还是要 wait 然后 kill 掉。

    这样调度有开销,要具体分析。。简单的 post 请求就返回应该不值得。
    zeroDev
        21
    zeroDev  
       2021-01-22 14:30:44 +08:00 via Android
    用协程里的 active 来判断
    zeroDev
        22
    zeroDev  
       2021-01-22 14:31:59 +08:00 via Android
    另外,我感觉对于你这个需求,不应该进行并发请求。
    muzuiget
        23
    muzuiget  
       2021-01-22 14:38:17 +08:00
    老生常谈,从外部取消相当于强杀进程,好容易造成数据一致性问题。

    正确姿势就是线程 /协程自己在某些关键点判断某个共享变量状态,然后自己优雅退出。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1110 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 23:48 · PVG 07:48 · LAX 15:48 · JFK 18:48
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.