最近在看这个视频
前面 生成器、yield from 、native coroutine 还理解的上,到了后面两节,event loop 与 future 的时候就有点懵。 event loop 还好,轮循检查有没有待执行任务,然后执行任务
但是结合了 future 以后,感觉脑子就转不过来了,啥时候让出权限,啥时候任务加进时间循环,啥时候恢复执行。
单独看片段还理解,连一起有点晕了。
想问问大佬们有没有纯文字 + 代码版的相关解释或者博文、教程之类的。
![]() |
1
cyaki 14 天前 via Android
写 curio 的作者油管上有不少关于这块的
|
2
lxy42 14 天前 via Android
500 Lines or Less
A Web Crawler With asyncio Coroutines A. Jesse Jiryu Davis and Guido van Rossum https://aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html |
3
thevita 14 天前
分享一下我的看法,我理解这些概念不太喜欢先深入细节,有全局视角再看细节
无栈协程的核心就是 把顺序代码变成一种状态机,不同语言的实现差异很大,但逻辑差不多 (其实我们如果不用 coroutine, 写事件驱动应用 就是手写这个状态机) await 就是状态转移点,从一个 await 完成 到下一个代码路径上的 await 就是一次状态转移 将这一小段代码封装起来 就叫 task, 这就是 事件循环执行的基本单元(不同语言实现也不一样,python 应该是依靠 生成器状态机来实现,rust ,c++ 则靠编译器) future/awaitable 作用是管理 task 之间的依赖关系,在某个 task 的 future done 的时候,将依赖它的 task 放进就绪队列等待执行(不同实现也不一样,比如 直接通过 callback ) 所以: - 啥时候让出权限: 一个 task 完成的时候 - 啥时候任务加进事件循环: 这个任务的依赖 future done 的时候 (实现可以都不一样,单实践效果一定是这样的) - 啥时候恢复执行: 进如 ready 队列了,就等待执行了,自于啥时候执行,就是 队列和 调度器的实现了,也都不一样 ---- 正好前段时间看了 foundationdb ,他们自己实现了一个 叫 flow 的语言,在 < c++20 上实现了无栈协程,它的编译器会把 flow 的代码编译成 C++ 的状态机,可以清晰的看到怎么把代码转成状态机 |
![]() |
4
009694 14 天前
你执行 await 语句的时候,就会出让执行权。也就是说,如果你在一组代码里面没有任何 await 语句的话,这段代码是完全同步的。 当你 await 的 io 事件发生之后,执行权就回到你的下一行代码了。 future 对象就是一个预留的桩子 告诉你“你委托我执行的异步代码,等下就用这个 future 对象获取结果”。
|
![]() |
5
Trim21 14 天前
曾经尝试过搓一个 eventloop ,然后因为懒得像 uvloop 一样搓到跟 asyncio 100%兼容所以就放弃了。
如果你不考虑网络 IO 的话,事件循环本身是非常简单的,实际上就是 event loop 上面的 call_soon 、call_at 以及 call_later 三个方法... 你可以继承一下现有的事件循环,然后在这三个方法上打 log ,然后写一段简单的 async/await 程序,就能看到你生成 future 之类的对象到底干了什么。 python 的 Future 对象有一个 c 版本,也有一个 python 版本的,你可以直接去看源码。看看他什么时候调用我前面说的几个方法。 |
![]() |
6
coldear 14 天前
|
![]() |
7
songray 14 天前
没那么复杂,比如我们有一个 async 函数 foo ,代码执行到 await 的时候,控制权就从 foo 函数让出到别的代码块了,同时向待完成列表里插入 foo 。
等到 foo 的 await 任务完成后,就会向 eventloop 中插入类似于 “foo 已经完成啦,你应该继续 foo 的后续操作”的 task 。 等到 eventloop 循环到这个 task ,就会恢复上下文(也可以说是状态)到 foo ,这也就是为啥无栈协程也可以看做是一种状态机。 希望我的解释比较明朗。 |
![]() |
10
sgld OP 用 ai 辅助修改了下代码,然后加了一些 print ,直观的了解了下执行流程
import time import random from collections import deque from itertools import count import heapq count_id = count(1) class Future: def __init__(self): self._done = False self._result = None self._callbacks = [] self._cancelled = False self._id = f'Future-{next(count_id)}' def add_done_callback(self, fn): if self._done: fn(self) else: self._callbacks.append(fn) def set_result(self, result): self._result = result self._done = True for cb in self._callbacks: cb(self) def __await__(self): if not self._done: print(f"Future {self._id} is not done, waiting...") # 这里的 self 是一个 Future 对象, 需要在调用时使用 await 关键字 yield self return self._result class Task(Future): def __init__(self, coro): super().__init__() self.coro = coro print(f"任务初始化, 任务 ID: {self._id}") loop.call_soon(self.run) def run(self): try: result = self.coro.send(None) except StopIteration as e: """执行""" print(f"任务 {self._id} 执行完毕, 结果: {e.value}") self.set_result(e.value) else: if isinstance(result, Future): result.add_done_callback(self._wakeup) print(f"Task {self._id} is waiting for Future {result._id}") def _wakeup(self, future: Future): """ This method is called when the future is done. It schedules the task to run again. """ print(f"等待完成, 唤醒任务 {self._id}, 结果: {future._result}") loop.call_soon(self.run) class EventLoop: def __init__(self): self._ready = deque() self._scheduled = [] self._stopped = False def call_soon(self, callback, *args): self._ready.append((callback, args)) def call_later(self, delay, callback, *args): heapq.heappush(self._scheduled, (time.time() + delay, callback, args)) def stop(self): self._stopped = True def create_task(self, coro): return Task(coro) def run_forever(self): while not self._stopped: self.run_once() def run_once(self): now = time.time() while self._scheduled and self._scheduled[0][0] <= now: _, cb, args = heapq.heappop(self._scheduled) self._ready.append((cb, args)) num = len(self._ready) for _ in range(num): # 取出一个任务, 执行它 cb, args = self._ready.popleft() print(f"----> 执行 {cb}({args}) ---->") cb(*args) async def smallrun(): print("Start smallrun") # 创建一个 Future 对象 # 代表一个将来的结果, 但是现在还不知道结果是什么 fut = Future() print(f"Future {fut._id} created") # 功能模拟 --- 随机延迟, 模拟 IO 操作 # IO 结束以后, 调用 fut.set_result(None) delay = random.random() loop.call_later(delay, fut.set_result, None) await fut print("End smallrun after", delay) return delay async def bigrun(): print("Start bigrun") delay = await smallrun() print("End bigrun with", delay*10) return delay * 10 async def main_task(): print("Main task start") result = await bigrun() print("Final result:", result) if __name__ == "__main__": loop = EventLoop() loop.create_task(main_task()) # 2.1 秒后停止事件循环 loop.call_later(2.2, loop.stop) loop.run_forever() |
![]() |
14
iorilu 13 天前
有本书很好 Python Concurrency with asyncio
当然这书内容太多, 不需要都看, 我也只看了一小部分足够了 除非是想独自开发 asyncio 相关的框架, 一般人了解下就行了 |