V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
waibunleung
V2EX  ›  Python

python3.7 中的 async/await 以及 asyncio 问题

  •  
  •   waibunleung · 2019-07-16 23:36:22 +08:00 · 5648 次点击
    这是一个创建于 1959 天前的主题,其中的信息可能已经有所发展或是发生改变。

    有一段代码:

    import asyncio
    
    async def crawl_page(url):
        print('crawling {}'.format(url))
        sleep_time = int(url.split('_')[-1])
        await asyncio.sleep(sleep_time)
        print('OK {}'.format(url))
    
    async def main(urls):
        tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
        for task in tasks:
            await task
    
    %time asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))
    
    ########## 输出 ##########
    
    crawling url_1
    crawling url_2
    crawling url_3
    crawling url_4
    OK url_1
    OK url_2
    OK url_3
    OK url_4
    Wall time: 3.99 s
    

    我想问在 main 函数中的 for 循环处,原意是等待所有任务结束。但是遇到第一个 await 时不会直接跳出整个 for 循环吗?还是说只是会跳过当前的一轮循环?还是说 for 循环对于 await 其实有特别的处理对待?

    我也知道这个和 python 的事件循环有关系,但是在网上找了不少资料都没有很能说清楚个大概的,希望 v 友们能给我解个惑,python 的事件循环是怎么样的?

    第 1 条附言  ·  2019-07-17 11:07:32 +08:00
    我又换了一段代码:
    ```python
    import asyncio
    import time

    async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))

    async def main(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    # for task in tasks:
    # await task
    await tasks[2]
    await tasks[1]
    await tasks[0]
    await tasks[3]
    start = time.time()
    asyncio.run(main(['url_1', 'url_2', 'url_4', 'url_3']))
    end = time.time()
    print('wall time:'+ str(end - start))

    ########## output ###########

    crawling url_1
    crawling url_2
    crawling url_4
    crawling url_3
    OK url_1
    OK url_2
    OK url_3
    OK url_4
    wall time:4.005656003952026
    ```
    大家可以仔细看看,url 的顺序我是调换了一下的。其实和 for 循环没有太大关系
    第 2 条附言  ·  2019-07-17 11:22:43 +08:00
    import asyncio
    import time

    async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))

    async def main(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    # for task in tasks:
    # await task
    await tasks[2]
    print('will print after tasks[2] awaited?')
    await tasks[1]
    await tasks[0]
    print('will print after tasks[0] awaited?')
    await tasks[3]
    start = time.time()
    asyncio.run(main(['url_1', 'url_2', 'url_4', 'url_3']))
    end = time.time()
    print('wall time:'+ str(end - start))

    ###### output #######

    crawling url_1
    crawling url_2
    crawling url_4
    crawling url_3
    OK url_1
    OK url_2
    OK url_3
    OK url_4
    will print after tasks[2] awaited?
    will print after tasks[0] awaited?
    wall time:4.002974987030029


    有没有人能解释一下这段代码的运行逻辑?
    48 条回复    2019-07-23 14:10:44 +08:00
    ysc3839
        1
    ysc3839  
       2019-07-16 23:41:17 +08:00
    https://docs.python.org/zh-cn/3/library/asyncio-task.html
    我估计 await task 不会等待 task 执行完的。
    junkun
        2
    junkun  
       2019-07-16 23:43:25 +08:00
    await 会跳出循环吗? await 不会跳出循环吧。
    ysc3839
        3
    ysc3839  
       2019-07-16 23:52:29 +08:00
    @junkun 楼主的意思是 await 类似 yield 那种效果吧,在函数中间返回。
    reus
        4
    reus  
       2019-07-16 23:55:09 +08:00
    await 就是挂起等待,task 执行完,再继续 await 后面的,是不是在 for 循环里都没有任何区别
    Vegetable
        5
    Vegetable  
       2019-07-17 00:10:19 +08:00
    这代码问题挺大的,我看了很久才看出来他到底是什么意思,可以说属于奇淫技巧。
    当然了,我看懂以后就能明白咋回事了。

    任务不是在 main 中的 await 里执行的,这里只是在检测任务是不是完成了。create_task 之后,任务就会开始执行,所以 tasks 生成之后就开始执行任务了,作为测试,可以在 for 循环前添加一个 await asyncio.sleep(10)来验证。创建完 tasks 之后使用 for 循环去 await 任务,已经完成的就会进入下一次循环,没完成的会阻塞 for 循环,最后所有任务都完成了才能走完循环结束任务。

    我挺不喜欢这个写法的
    Vegetable
        6
    Vegetable  
       2019-07-17 00:13:11 +08:00
    验证代码
    import asyncio

    async def task():
    print("开始")
    await asyncio.sleep(5)
    print("结束")

    async def main():
    tasks = [asyncio.create_task(task()) for i in range(3)]
    await asyncio.sleep(10) # 这一行会阻塞程序结束,但是不会影响开始和结束输出
    for t in tasks:
    await t
    asyncio.run(main())
    so1n
        7
    so1n  
       2019-07-17 00:13:31 +08:00 via Android
    await 是主动挂起等待,这时要是有别的协程再跑就跑的协程,但你这里没有。for 循环替换成 asynico.wait(task)即可
    ClericPy
        8
    ClericPy  
       2019-07-17 00:43:32 +08:00
    很多地方协程里 Task 和 Future 的设计复用了(甚至原样用)多线程的 concurrent.futures 那套
    Task 类创建以后就开始执行了,Future 则不会
    你挨个 await 用来等他们全跑完原则上没什么毛病,不过可能会有一些异常如果不是 return exception 的话会打断 for Loop
    所以可以考虑用下原生的 asyncio.wait 等方法来实现
    wwqgtxx
        9
    wwqgtxx  
       2019-07-17 05:22:02 +08:00 via iPhone
    @Vegetable 用 create_task 去创建任务而不是直接在 main 中 await 是一种很常见的操作,并不算什么非常规写法,你可以大概类比到多线程编程中开多个子线程然后挨个 wait 它们结束。至于你直接在主 task 中 await 就变成串行执行了,完全改变了程序的本意
    wwqgtxx
        10
    wwqgtxx  
       2019-07-17 05:28:44 +08:00 via iPhone
    回答一下楼主的问题,你这里的 await 其实内部是转化为 yield from 了,但是这个机制是给 asyncio 的 eventloop 使用的,在你 await 的时候会把控制权给别的 task,当别的 task 出现 await 或者说执行完成的时候再回到这个地方接着执行(会恢复现场),直到你当前 tast 结束( return 或者是抛异常)
    建议楼主先学习一下 python 的生成器,自己用 yield 和 yield from 配合.send()来模仿一下 asyncio 的原理就能深入的了解你想知道的事件循环到底是怎么回事了
    metaclass
        11
    metaclass  
       2019-07-17 06:19:24 +08:00
    楼主,你这样写实际上每个 task 之间还是 blocking 的,因为你放到 for 循环里去 await,执行完一个再执行另一个。这个写法是不对的

    如果要异步多个 async task,需要用 gather():
    https://docs.python.org/3/library/asyncio-task.html#asyncio.gather

    asyncio.gather()实际上和 JavaScript 的 Promise.all()类似:
    https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Promise/all
    wwqgtxx
        12
    wwqgtxx  
       2019-07-17 07:05:35 +08:00 via iPhone
    @metaclass 请不要误导人,在 create_task 之后就不是了,请老老实实看 create_task 的说明文档
    jaskle
        13
    jaskle  
       2019-07-17 07:15:26 +08:00 via Android
    async,await,yield 这不是 js 语法?
    wwqgtxx
        14
    wwqgtxx  
       2019-07-17 07:16:28 +08:00 via iPhone
    @metaclass 如果你仔细看过 asycnio.gather 就会发现它的内部调用了 ensure_future
    https://github.com/python/cpython/blob/3.7/Lib/asyncio/tasks.py#L746
    而 ensure_future 内部会调用 create_task
    https://github.com/python/cpython/blob/3.7/Lib/asyncio/tasks.py#L608
    关于 create_task 的文档在这里
    https://docs.python.org/3/library/asyncio-eventloop.html#creating-futures-and-tasks
    其中明确提到了 Schedule the execution of a Coroutines. Return a Task object.
    而且楼主的实验也证明了他创建的 task 是交替执行的
    wwqgtxx
        15
    wwqgtxx  
       2019-07-17 07:18:26 +08:00 via iPhone
    @jaskle c#也有这种用法,又不是 js 家的专利🙄
    wwqgtxx
        16
    wwqgtxx  
       2019-07-17 07:31:20 +08:00 via iPhone
    keysona
        17
    keysona  
       2019-07-17 08:25:32 +08:00
    当我碰到用 python 的异步 io 出问题的帖子,我都要回复:用 go 吧,python 的异步你用着难受。
    keepeye
        18
    keepeye  
       2019-07-17 08:36:52 +08:00
    ```
    import asyncio

    wg = 0

    async def crawl_page(url):
    global wg
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))
    wg -= 1


    async def main(urls):
    global wg
    for url in urls:
    wg+=1
    asyncio.ensure_future(crawl_page(url))
    while wg > 0:
    asyncio.sleep(0.1)
    ```
    Vegetable
        19
    Vegetable  
       2019-07-17 09:44:45 +08:00
    @wwqgtxx #9 这是不是应该直接用 asyncio.gather(*task)?我没看出来有什么别的好处,代码也更多
    qq976739120
        20
    qq976739120  
       2019-07-17 09:48:05 +08:00
    按照我的经验来看...如果不是闲的蛋疼,不要用 asyncio,gevent 一把梭就好
    Torpedo
        21
    Torpedo  
       2019-07-17 10:14:39 +08:00
    楼主,你不写 for,tasks 也会执行。
    await 只是等一个异步执行完成,至于这个异步什么时候开始,和 await 没关系
    tisswb
        22
    tisswb  
       2019-07-17 10:15:01 +08:00
    await 不会跳出循环,而是告诉程序 task,你去干活吧,做完了跟我说,我也要忙别的了。
    tisswb
        23
    tisswb  
       2019-07-17 10:18:10 +08:00
    补充一下,就算你不写 for 循环,task 也还是执行的,但是这种情况下,main 结束的话 task 就会被终止,有两种方法解决,1 )手写延迟固定时间; 2 )使用 await,让 main 等各个 task 出结果,都结束在结束自己。
    waibunleung
        24
    waibunleung  
    OP
       2019-07-17 10:31:07 +08:00
    @metaclass 按照你这么说是 blocking 的话,那最后的运行时间应该大于 4s 才对,但是很明显运行时间取决于耗时最大的那个任务
    congeec
        25
    congeec  
       2019-07-17 10:35:01 +08:00
    for loop 并不是 event loop
    waibunleung
        26
    waibunleung  
    OP
       2019-07-17 10:37:36 +08:00
    至于为什么我觉得他会跳出整个 for 循环或者觉得 for 循环对协程会有特别处理,是因为我类比了 nodejs 的 await/async 机制...另外大家有关于 python 的 eventloop 相关介绍吗?
    Vegetable
        27
    Vegetable  
       2019-07-17 10:42:28 +08:00
    @keepeye 强行 WaitGroup,这样的缺点是需要在写任务的时候就开始考虑并发执行的问题,如果是同一个函数还好,不同类型的任务不在一个函数里定义,不方便 wg.done(),就需要再包装一次,所以还是 asyncio.gather 好一点
    xxxy
        28
    xxxy  
       2019-07-17 10:42:42 +08:00
    ```
    async function sleep(interval) {
    return new Promise(resolve => {
    setTimeout(resolve, interval);
    })
    }

    async function f(i) {
    console.log(`crawl ${i}`)
    await sleep(1000)
    console.log(`ok ${i}`)
    }


    async function f1() {
    for (let i=0;i<5;i++){
    await f(i)
    }
    }

    f1()
    ```
    谁能解释下为什么楼主的程序跟 js 的这段运行结果不一样吗?
    waibunleung
        29
    waibunleung  
    OP
       2019-07-17 10:45:45 +08:00
    @wwqgtxx
    > 你这里的 await 其实内部是转化为 yield from 了,但是这个机制是给 asyncio 的 eventloop 使用的,在你 await 的时候会把控制权给别的 task,当别的 task 出现 await 或者说执行完成的时候再回到这个地方接着执行(会恢复现场),直到你当前 tast 结束( return 或者是抛异常)

    这么说的话,在 for 循环中遇到 await 时,for 循环所在的主协程会挂起去执行别的 task,那这个时候整个 for 循环会被 block 住不会往下继续执行吧?等到所有任务完成或者 await 之后才往下面执行 for 循环后面的代码?
    waibunleung
        30
    waibunleung  
    OP
       2019-07-17 10:46:21 +08:00
    @congeec 在?既然要评论,就将话说得更具体一些咯
    waibunleung
        31
    waibunleung  
    OP
       2019-07-17 10:51:43 +08:00
    @Vegetable 所以按照你的意思,for 循环里 await,它会阻塞当前正在 await 的任务直到它完成才进到下一轮循环去?
    congeec
        32
    congeec  
       2019-07-17 11:11:28 +08:00   ❤️ 1
    ````
    tasks = [task1, task2, task2]
    for t in tasks:
    await t
    ```

    完全等价于

    ````
    tasks = [task1, task2, task2]
    await tasks[0]
    await tasks[1]
    await tasks[2]
    ```
    这样执行顺序是同步的你能理解吧。其实并不能,因为你可能不知道 Task/Future 和 coroutine 的区别。task 被创建的那一刻就已经开始执行了,你 await 的只不过是他的结果 Task.result()。所以如果你加副作用,比如说 print(),打印出来的结果可能是乱序的。

    coroutine 就不一样

    ```
    coros = [coro1, coro2, coro3]
    await corps[0]
    await corps[1]
    await corps[3]
    ```
    这三个 corotines 绝对是按顺序执行


    好了,再来说 for loop 和 event loop。
    你把 for loop 展开就是几个普通的语句放在一起,没啥好说的

    有意思的是 event loop。看下面这些代码。
    ```
    async def coro1:
    await asyncio.sleep(xxx)
    await asyncio.sleep(xxx)

    sync def coro2:
    await asyncio.sleep(xxx)
    await asyncio.sleep(xxx)

    asyncio.gather(coro1, coro2)
    ```

    这儿有两个 coroutines,哪个先执行完呢?不知道。每个 await 那儿都会让出( yield from 的语法糖嘛)控制权。python 知道的是每个 coroutine 的状态( Ready/NotReady )。event loop 会不断的轮询( polls )这些 coroutines,如果状态是 NotReady,就去看看其他的 coroutine,如果是 Ready 就执行下一行呗。

    例子里用了 状态机+polling。具体实现取决于平台,我也不知道。
    waibunleung
        33
    waibunleung  
    OP
       2019-07-17 11:23:11 +08:00
    @wwqgtxx 你可以解释一下我 append 的第二段代码的运行逻辑吗?
    waibunleung
        34
    waibunleung  
    OP
       2019-07-17 11:26:09 +08:00
    @congeec 哥,会说话就多说点昂~ 好像有点眉目了
    waibunleung
        35
    waibunleung  
    OP
       2019-07-17 11:26:38 +08:00
    @waibunleung 能解释一下我 append 的第二段代码是逻辑吗?
    lolizeppelin
        36
    lolizeppelin  
       2019-07-17 12:48:34 +08:00
    这问题论坛上是问不清楚的.

    你真要搞懂直接把 eventlet 的源码读懂就明白了

    所有的异步都一个卵模型,套其他语言有是一样

    你可以简单理解为所有的异步语法都是生成一个"微线程"被丢到调度队列里
    await 语法导致你当前代码块立刻被挂起(变成"微线程"),然后切换到主循环里去了,主循环按照队列的顺序选择执行的“微线程”
    切换回来的时候就是你 await 对象完成的时候

    说白了都是排序,所有的任务都到队列里排序,等待被调度,整个异步循环就是不停的 goto 来 goto 去,从一个代码片段跳到另外一个片段
    wwqgtxx
        37
    wwqgtxx  
       2019-07-17 12:51:51 +08:00 via iPhone
    @Vegetable #19 没有任何好处,只不过可以作为底层实现的一种方式,gather 内部是创建了一个新的 future 配合 done_callback 来解决这个问题
    wwqgtxx
        38
    wwqgtxx  
       2019-07-17 12:55:26 +08:00 via iPhone
    @waibunleung 对于协程来说,本来就是只有在 await 的时候才会把当前 task 阻塞,并执行其他 task,或者当前 task return 了
    waibunleung
        39
    waibunleung  
    OP
       2019-07-17 13:00:52 +08:00
    @wwqgtxx 你可以解释一下我 append 的第二段代码的运行逻辑吗?为什么两次打印会在最后才出现?
    wwqgtxx
        40
    wwqgtxx  
       2019-07-17 14:28:44 +08:00 via iPhone
    @waibunleung 没有问题呀,await 是等待另一个 task 结束,并不是等待另一个 task 阻塞
    wwqgtxx
        41
    wwqgtxx  
       2019-07-17 14:31:34 +08:00 via iPhone
    await 的意思是阻塞自己,等待别人结束
    在调度器看来,你调用了 await 就把你当前的任务暂停,然后去做别的事,当你等待的任务结束了再择机继续执行当前任务(注意不是立刻执行,是择机执行)
    silentsee
        42
    silentsee  
       2019-07-17 14:40:31 +08:00
    @waibunleung 我怎么感觉这个是个 bug。。。我把 await tasks[0]放到 await tasks[2]前面就能提前输出了。。。
    wwqgtxx
        43
    wwqgtxx  
       2019-07-18 00:03:10 +08:00
    @silentsee 本质上协程和线程调度一样,在没有锁、等待条件这些控制因素下并不保证调度顺序,所以在实现上无论如何实现都不属于 bug
    waibunleung
        44
    waibunleung  
    OP
       2019-07-18 15:59:47 +08:00
    @wwqgtxx 那这么说的话不应该在 await 自己等待别人的时候输出两句 print 吗?为什么是最后才输出呢?
    wwqgtxx
        45
    wwqgtxx  
       2019-07-18 16:50:36 +08:00 via iPhone
    @waibunleung 你自己都在等待别人了,怎么还能同时输出呢
    waibunleung
        46
    waibunleung  
    OP
       2019-07-18 16:57:58 +08:00
    @wwqgtxx 你说的有点道理,我再梳理一下
    dingyaguang117
        47
    dingyaguang117  
       2019-07-23 13:44:12 +08:00
    最后那段代码的运行结果跟我的理解不一样啊

    感觉在 node 中应该不是这个结果,虽然我也没试过
    dingyaguang117
        48
    dingyaguang117  
       2019-07-23 14:10:44 +08:00
    我的理解大概是这样的,await 只保证同一段代码前后执行顺序,但是不能保证各个协程同时 await 时候的顺序
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3912 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 00:56 · PVG 08:56 · LAX 16:56 · JFK 19:56
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.