V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
sgld
V2EX  ›  Python

关于 Python 协程的 event loop 与 future

  •  
  •   sgld · 15 天前 · 1619 次点击

    最近在看这个视频

    Python AsyncIO 从入门到放弃

    前面 生成器、yield from 、native coroutine 还理解的上,到了后面两节,event loop 与 future 的时候就有点懵。 event loop 还好,轮循检查有没有待执行任务,然后执行任务

    但是结合了 future 以后,感觉脑子就转不过来了,啥时候让出权限,啥时候任务加进时间循环,啥时候恢复执行。

    单独看片段还理解,连一起有点晕了。

    想问问大佬们有没有纯文字 + 代码版的相关解释或者博文、教程之类的。

    17 条回复    2025-04-26 23:14:11 +08:00
    cyaki
        1
    cyaki  
       14 天前 via Android
    写 curio 的作者油管上有不少关于这块的
    lxy42
        2
    lxy42  
       14 天前 via Android
    500 Lines or Less
    A Web Crawler With asyncio Coroutines

    A. Jesse Jiryu Davis and Guido van Rossum

    https://aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html
    thevita
        3
    thevita  
       14 天前
    分享一下我的看法,我理解这些概念不太喜欢先深入细节,有全局视角再看细节

    无栈协程的核心就是 把顺序代码变成一种状态机,不同语言的实现差异很大,但逻辑差不多

    (其实我们如果不用 coroutine, 写事件驱动应用 就是手写这个状态机)

    await 就是状态转移点,从一个 await 完成 到下一个代码路径上的 await 就是一次状态转移

    将这一小段代码封装起来 就叫 task, 这就是 事件循环执行的基本单元(不同语言实现也不一样,python 应该是依靠 生成器状态机来实现,rust ,c++ 则靠编译器)

    future/awaitable 作用是管理 task 之间的依赖关系,在某个 task 的 future done 的时候,将依赖它的 task 放进就绪队列等待执行(不同实现也不一样,比如 直接通过 callback )

    所以:


    - 啥时候让出权限: 一个 task 完成的时候
    - 啥时候任务加进事件循环: 这个任务的依赖 future done 的时候 (实现可以都不一样,单实践效果一定是这样的)
    - 啥时候恢复执行: 进如 ready 队列了,就等待执行了,自于啥时候执行,就是 队列和 调度器的实现了,也都不一样

    ----


    正好前段时间看了 foundationdb ,他们自己实现了一个 叫 flow 的语言,在 < c++20 上实现了无栈协程,它的编译器会把 flow 的代码编译成 C++ 的状态机,可以清晰的看到怎么把代码转成状态机
    009694
        4
    009694  
       14 天前
    你执行 await 语句的时候,就会出让执行权。也就是说,如果你在一组代码里面没有任何 await 语句的话,这段代码是完全同步的。 当你 await 的 io 事件发生之后,执行权就回到你的下一行代码了。 future 对象就是一个预留的桩子 告诉你“你委托我执行的异步代码,等下就用这个 future 对象获取结果”。
    Trim21
        5
    Trim21  
       14 天前
    曾经尝试过搓一个 eventloop ,然后因为懒得像 uvloop 一样搓到跟 asyncio 100%兼容所以就放弃了。

    如果你不考虑网络 IO 的话,事件循环本身是非常简单的,实际上就是 event loop 上面的 call_soon 、call_at 以及 call_later 三个方法... 你可以继承一下现有的事件循环,然后在这三个方法上打 log ,然后写一段简单的 async/await 程序,就能看到你生成 future 之类的对象到底干了什么。

    python 的 Future 对象有一个 c 版本,也有一个 python 版本的,你可以直接去看源码。看看他什么时候调用我前面说的几个方法。
    songray
        7
    songray  
       14 天前
    没那么复杂,比如我们有一个 async 函数 foo ,代码执行到 await 的时候,控制权就从 foo 函数让出到别的代码块了,同时向待完成列表里插入 foo 。
    等到 foo 的 await 任务完成后,就会向 eventloop 中插入类似于 “foo 已经完成啦,你应该继续 foo 的后续操作”的 task 。
    等到 eventloop 循环到这个 task ,就会恢复上下文(也可以说是状态)到 foo ,这也就是为啥无栈协程也可以看做是一种状态机。
    希望我的解释比较明朗。
    sgld
        8
    sgld  
    OP
       14 天前
    @thevita 感谢大佬,我理解一下
    sgld
        9
    sgld  
    OP
       14 天前
    @Trim21 源码太复杂了,有涉及到很多情况,现在在看上面视频里面 up 写的简易版本的,
    sgld
        10
    sgld  
    OP
       14 天前
    用 ai 辅助修改了下代码,然后加了一些 print ,直观的了解了下执行流程

    import time
    import random
    from collections import deque
    from itertools import count
    import heapq

    count_id = count(1)

    class Future:
    def __init__(self):
    self._done = False
    self._result = None
    self._callbacks = []
    self._cancelled = False
    self._id = f'Future-{next(count_id)}'

    def add_done_callback(self, fn):
    if self._done:
    fn(self)
    else:
    self._callbacks.append(fn)

    def set_result(self, result):
    self._result = result
    self._done = True
    for cb in self._callbacks:
    cb(self)

    def __await__(self):
    if not self._done:
    print(f"Future {self._id} is not done, waiting...")
    # 这里的 self 是一个 Future 对象, 需要在调用时使用 await 关键字
    yield self
    return self._result

    class Task(Future):
    def __init__(self, coro):
    super().__init__()
    self.coro = coro
    print(f"任务初始化, 任务 ID: {self._id}")
    loop.call_soon(self.run)

    def run(self):
    try:
    result = self.coro.send(None)
    except StopIteration as e:
    """执行"""
    print(f"任务 {self._id} 执行完毕, 结果: {e.value}")
    self.set_result(e.value)
    else:
    if isinstance(result, Future):
    result.add_done_callback(self._wakeup)
    print(f"Task {self._id} is waiting for Future {result._id}")
    def _wakeup(self, future: Future):
    """
    This method is called when the future is done.
    It schedules the task to run again.
    """
    print(f"等待完成, 唤醒任务 {self._id}, 结果: {future._result}")
    loop.call_soon(self.run)

    class EventLoop:
    def __init__(self):
    self._ready = deque()
    self._scheduled = []
    self._stopped = False

    def call_soon(self, callback, *args):
    self._ready.append((callback, args))

    def call_later(self, delay, callback, *args):
    heapq.heappush(self._scheduled,
    (time.time() + delay, callback, args))

    def stop(self):
    self._stopped = True

    def create_task(self, coro):
    return Task(coro)

    def run_forever(self):
    while not self._stopped:
    self.run_once()

    def run_once(self):
    now = time.time()
    while self._scheduled and self._scheduled[0][0] <= now:
    _, cb, args = heapq.heappop(self._scheduled)
    self._ready.append((cb, args))

    num = len(self._ready)
    for _ in range(num):

    # 取出一个任务, 执行它
    cb, args = self._ready.popleft()
    print(f"----> 执行 {cb}({args}) ---->")
    cb(*args)

    async def smallrun():
    print("Start smallrun")

    # 创建一个 Future 对象
    # 代表一个将来的结果, 但是现在还不知道结果是什么
    fut = Future()
    print(f"Future {fut._id} created")
    # 功能模拟 --- 随机延迟, 模拟 IO 操作
    # IO 结束以后, 调用 fut.set_result(None)
    delay = random.random()
    loop.call_later(delay, fut.set_result, None)

    await fut
    print("End smallrun after", delay)
    return delay

    async def bigrun():
    print("Start bigrun")
    delay = await smallrun()
    print("End bigrun with", delay*10)
    return delay * 10

    async def main_task():
    print("Main task start")
    result = await bigrun()
    print("Final result:", result)

    if __name__ == "__main__":
    loop = EventLoop()
    loop.create_task(main_task())

    # 2.1 秒后停止事件循环
    loop.call_later(2.2, loop.stop)
    loop.run_forever()
    sgld
        11
    sgld  
    OP
       14 天前
    @lxy42 好的好的,我看看这个,🙏感谢资源
    sgld
        12
    sgld  
    OP
       14 天前
    @cyaki 是哪位呀
    cyaki
        13
    cyaki  
       14 天前
    @sgld David Beazley, 在 youtube 上搜就行
    iorilu
        14
    iorilu  
       13 天前
    有本书很好 Python Concurrency with asyncio

    当然这书内容太多, 不需要都看, 我也只看了一小部分足够了

    除非是想独自开发 asyncio 相关的框架, 一般人了解下就行了
    kivmi
        15
    kivmi  
       11 天前
    @thevita CV 大神
    kivmi
        16
    kivmi  
       11 天前
    @009694 future + callback 更好理解吧
    kivmi
        17
    kivmi  
       11 天前
    @songray 正解
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1818 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 16:19 · PVG 00:19 · LAX 09:19 · JFK 12:19
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.