异步IO和协程
基础知识

进程
操作系统分配资源的最小单位。
线程
程序执行的最小单位。共享进程级的资源。切换由操作系统负责调度
协程
由程序员自己管理的轻量级线程,对内核来说不可见。
同步IO
do _something()
f = open('file', 'r')
r = f.read() # 线程阻塞,等待IO操作结果
# IO操作完成后线程才能继续执行
do_something()
异步IO
异步IO模型需要一个消息循环,在消息循环中,主线程不断重复“读取消息-处理消息”这一过程。
loop = get_event_loop()
while True:
event = loop.get_event()
process_event(event)
消息模型其实早在应用在桌面应用程序中了。一个GUI程序的主线程就负责不停地读取消息并处理消息。所有的键盘、鼠标等消息都被发送到GUI程序的消息队列中,然后由GUI程序的主线程处理。 在“发出IO请求”到收到“IO完成”的这段时间里,同步IO模型下,主线程只能挂起,但异步IO模型下,主线程并没有休息,而是在消息循环中继续处理其他消息。这样,在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。
asyncio
asyncio的编程模型就是一个消息循环。从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。
注意:这里的coroutine是没有运行的,扔到事件循环中才能运行,在事件循环中运行的称作task。
import asyncio
@asyncio.coroutine
def hello():
print("Hello world!")
# 异步调用asyncio.sleep(1):
r = yield from asyncio.sleep(1)
print("Hello again!")
# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()
解析:
@asyncio.coroutine把一个generator标记为coroutine类型。
然后,我们就把这个coroutine扔到EventLoop中执行。
hello()会首先打印出Hello world!,然后,yield from语法可以让我们方便地调用另一个generator。
由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。
当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。
异步原理:
所有协程都运行在同一个线程里,由事件循环统一调度。当一个协程在等待 I/O 时,会主动交出当前线程的执行权,回到事件循环。事件循环则利用这段等待时间去执行其他协程,并依赖操作系统的通知来感知 I/O 完成,进而通过回调机制唤醒相应的协程
生成器的嵌套
yield from是生成器的嵌套,python早先利用生成器的嵌套实现coroutine。
- 调用方:调用委托生成器的代码
- 委托生成器:包含yield from表达式的生成器函数
- 子生成器:yield from右边的生成器函数
yield from将实现两个生成器的嵌套,分别是委托生成器和子生成器。其中委托生成器,只起一个桥梁作用,它建立的是一个双向通道,它并没有权利也没有办法,对子生成器yield回来的内容做拦截,而是将子生成器yield的数据传递给调用方。只有子生成器return后,yield from左边的变量才会被赋值,委托生成器才能继续进行。yield from可以让我们避免让我们自己处理各种料想不到的异常,而让我们专注于业务代码的实现。
协程 vs 生成器
- 生成器以迭代的方式生产数据
- 协程消耗数据
- 协程和迭代无必然联系
协程 vs 对象
协程看起来类似一个处理数据的对象。
- 协程是一个function定义
- 协程更快
新API
async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:
- 把
@asyncio.coroutine替换为async; - 把
yield from替换为await。
新增概念:
- 可以被await的对象是
awaitable类型对象,其实现了__await__方法,本质是代码逻辑。 - Future对象,是一个用来存放“未来结果”的容器。Future 实现了底层协议,允许协程 await 它。当你 await future 时,协程会暂停,直到有人调用 future.set_result() 给它填入结果。
- 协程和Future都可以被await,await协程是将控制权在当前线程内从父协程转移到子协程,await Future基于回调的异步挂起,控制权立即从协程转移回事件循环。
简单说:
- 事件循环是调度者;
- 协程是执行者;
- Future是令牌,是媒介;
实现一个协程: 异步等待
asyncio.sleep(1)这个最简单的异步操作,它背后的核心代码大致如下:
async def sleep(delay, result=None):
# 1. 获取当前正在运行的事件循环
loop = events.get_running_loop()
# 2. 创建一个 Future 对象
# 这个 Future 就是用来“挂起”当前协程的占位符
future = loop.create_future()
# 3. 核心魔法:注册一个定时器
# 告诉事件循环:"请在 delay 秒后,执行 _set_result_unless_cancelled 函数,
# 并把 future 和 result 作为参数传给它"
# h 是一个 TimerHandle(定时器句柄),代表这个闹钟
h = loop.call_later(delay, futures._set_result_unless_cancelled, future, result)
# 4. 等待 Future 完成
# 这里会触发协程挂起,控制权交还给事件循环
try:
return await future
finally:
# 5. 清理工作
# 无论是因为超时醒来,还是被外部取消了,都要取消这个闹钟
# 防止任务被取消后,闹钟响了还在尝试唤醒一个已经死掉的协程
h.cancel()
流程梳理:
- 当其他协程调用 await asyncio.sleep(1)。
- sleep 函数内部创建了一个 Future,并调用 await future。
- 协程暂停,并把“唤醒自己”的回调注册到这个 Future 上,然后交还控制权。
- 事件循环在delay秒后,执行定时器回调 futures._set_result_unless_cancelled(future, result)。
- 这个回调函数内部会调用 future.set_result(result)。
- set_result 触发了回调链,最终调用了协程的“唤醒方法”,协程被重新加入事件循环的待执行队列。
- 事件循环在下一轮调度中,恢复该协程的执行。
实现一个协程:异步网络接收。
async def my_raw_recv(sock, nbytes):
"""
这是一个完全手写的异步 I/O 函数。
它没有使用 asyncio.open_connection,而是直接操作 socket 和 event loop。
"""
# 1. 获取当前运行的事件循环
loop = asyncio.get_running_loop()
# 2. 创建一个 Future,用来代表“未来的数据结果”
# 这就是 await 等待的那个东西
future = loop.create_future()
# 3. 核心魔法:
# 告诉事件循环:“请监听这个 socket 的读取事件。
# 一旦有数据了,请把数据填进 future,并标记 future 为完成。”
# reader=sock.recv 是同步回调,但 loop.sock_recv 是异步注册
loop.add_reader(sock.fileno(), lambda: future.set_result(sock.recv(nbytes)))
# 4. 挂起当前协程,等待 future 完成
# 当事件循环检测到 sock 可读时,future 会 set_result,这里就会醒来
data = await future
# 5. cleanup:数据拿到了,把监听取消掉(防止重复触发)
loop.remove_reader(sock.fileno())
return data
当使用各种异步io库时,内部其实就是做了上面这些事
- 创建 Future。
- 注册 I/O 事件(通过 add_reader 或定时器)。
- 返回 Future 供 await。
协程中的异常
异常延迟抛出:
协程中的异常有时可能在程序结束时才抛出。原因是python垃圾回收机制。消息循环的异常处理器只有在Task被释放时才执行。当用户代码中Task被引用时,直到程序结束时才释放,进行垃圾回收,触发异常处理器。
异常及时处理方法:
- 对关心异常的
Task,主动await - 推荐采用callback接口,当
Taskfinished的时候自动执行回调函数,处理异常。用Task.result()判断是否发生异常
常用代码
async for
async for用于遍历异步迭代器,异步迭代器依次返还可等待对象,调用者从每个可等待对象中检索结果。
异步迭代器实现了__aiter__()方法和__anext__方法。__anext__方法返回可等待对象,直到迭代结束返回StopAsyncIteration 。
for循环假定每个可等待对象都会有返回值,async for会await 可等待对象,获得其返回值。
和普通的for循环用法接近,但不可以互换。示例:
...
# traverse an asynchronous iterator
async for item in async_iterator:
print(item)
# build a list of results
results = [item async for item async_iterator]
以下示例按顺序打印0~9。
# SuperFastPython.com
# example of async for with an asynchronous generator
import asyncio
# define an asynchronous generator
async def async_generator():
# normal loop
for i in range(10):
# block to simulate doing work
await asyncio.sleep(1)
# yield the result
yield i
# define a simple coroutine
async def custom_coroutine():
# asynchronous for loop
async for item in async_generator():
# report the result
print(item)
# start
asyncio.run(custom_coroutine())
async with
创建和使用异步的上下文管理器。
异步上下文管理器能够等待enter和exit方法,其实现了**__aenter__()和__aexit__()**方法。
示例:
...
# create and use an asynchronous context manager
async with AsyncContextManager() as manager:
...
等价于:
...
# create or enter the async context manager
manager = await AsyncContextManager()
try:
# do something
...
finally:
# close or exit the context manager
await manager.close()
和普通的with语句类似,但是不可互换。
# SuperFastPython.com
# example of async with and an asynchronous context manager
import asyncio
# define an asynchronous context manager
class CustomContextManager:
# enter the async context manager
async def __aenter__(self):
# report a message
print('>entering the context manager')
# block for a moment
await asyncio.sleep(0.5)
# exit the async context manager
async def __aexit__(self, exc_type, exc, tb):
# report a message
print('>exiting the context manager')
# block for a moment
await asyncio.sleep(0.5)
# define a simple coroutine
async def custom_coroutine():
# report a message
print('before the context manager')
# create and use the asynchronous context manager
async with CustomContextManager() as manager:
# report the result
print(f'within the manager')
# report a message
print('after the context manager')
# start
asyncio.run(custom_coroutine())