异步IO和协程
基础知识
进程
操作系统分配资源的最小单位。
线程
程序执行的最小单位。共享进程级的资源。切换由操作系统负责调度
协程
由程序员自己管理的轻量级线程,对内核来说不可见。
同步IO
do _something()
f = open('file', 'r')
r = f.read() # 线程阻塞,等待IO操作结果
# IO操作完成后线程才能继续执行
do_something()
异步IO
异步IO模型需要一个消息循环,在消息循环中,主线程不断重复“读取消息-处理消息”这一过程。
loop = get_event_loop()
while True:
event = loop.get_event()
process_event(event)
消息模型其实早在应用在桌面应用程序中了。一个GUI程序的主线程就负责不停地读取消息并处理消息。所有的键盘、鼠标等消息都被发送到GUI程序的消息队列中,然后由GUI程序的主线程处理。 在“发出IO请求”到收到“IO完成”的这段时间里,同步IO模型下,主线程只能挂起,但异步IO模型下,主线程并没有休息,而是在消息循环中继续处理其他消息。这样,在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。
asyncio
asyncio
的编程模型就是一个消息循环。从asyncio
模块中直接获取一个EventLoop
的引用,然后把需要执行的协程扔到EventLoop
中执行,就实现了异步IO。
注意:这里的coroutine是没有运行的,扔到事件循环中才能运行,在事件循环中运行的称作task。
import asyncio
@asyncio.coroutine
def hello():
print("Hello world!")
# 异步调用asyncio.sleep(1):
r = yield from asyncio.sleep(1)
print("Hello again!")
# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()
解析:
@asyncio.coroutine
把一个generator
标记为coroutine
类型。
然后,我们就把这个coroutine
扔到EventLoop
中执行。
hello()
会首先打印出Hello world!
,然后,yield from
语法可以让我们方便地调用另一个generator
。
由于asyncio.sleep()
也是一个coroutine,所以线程不会等待asyncio.sleep()
,而是直接中断并执行下一个消息循环。
当asyncio.sleep()
返回时,线程就可以从yield from
拿到返回值(此处是None),然后接着执行下一行语句。
生成器的嵌套
yield from
是生成器的嵌套,python早先利用生成器的嵌套实现coroutine。
- 调用方:调用委托生成器的代码
- 委托生成器:包含yield from表达式的生成器函数
- 子生成器:yield from右边的生成器函数
yield from
将实现两个生成器的嵌套,分别是委托生成器和子生成器。其中委托生成器,只起一个桥梁作用,它建立的是一个双向通道,它并没有权利也没有办法,对子生成器yield
回来的内容做拦截,而是将子生成器yield
的数据传递给调用方。只有子生成器return
后,yield from
左边的变量才会被赋值,委托生成器才能继续进行。yield from
可以让我们避免让我们自己处理各种料想不到的异常,而让我们专注于业务代码的实现。
协程 vs 生成器
- 生成器以迭代的方式生产数据
- 协程消耗数据
- 协程和迭代无必然联系
协程 vs 对象
协程看起来类似一个处理数据的对象。
- 协程是一个function定义
- 协程更快
新API
async
和await
是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:
- 把
@asyncio.coroutine
替换为async
; - 把
yield from
替换为await
。 可以被await的对象是awaitable
类型对象,其实现了__await__
方法。
协程中的异常
异常延迟抛出:
协程中的异常有时可能在程序结束时才抛出。原因是python垃圾回收机制。消息循环的异常处理器只有在Task
被释放时才执行。当用户代码中Task
被引用时,直到程序结束时才释放,进行垃圾回收,触发异常处理器。
异常及时处理方法:
- 对关心异常的
Task
,主动await
- 推荐采用callback接口,当
Task
finished的时候自动执行回调函数,处理异常。用Task.result()
判断是否发生异常
常用代码
async for
async for
用于遍历异步迭代器,异步迭代器依次返还可等待对象,调用者从每个可等待对象中检索结果。
异步迭代器实现了__aiter__()
方法和__anext__
方法。__anext__
方法返回可等待对象,直到迭代结束返回StopAsyncIteration
。
for循环假定每个可等待对象都会有返回值,async for
会await
可等待对象,获得其返回值。
和普通的for循环用法接近,但不可以互换。示例:
...
# traverse an asynchronous iterator
async for item in async_iterator:
print(item)
# build a list of results
results = [item async for item async_iterator]
以下示例按顺序打印0~9。
# SuperFastPython.com
# example of async for with an asynchronous generator
import asyncio
# define an asynchronous generator
async def async_generator():
# normal loop
for i in range(10):
# block to simulate doing work
await asyncio.sleep(1)
# yield the result
yield i
# define a simple coroutine
async def custom_coroutine():
# asynchronous for loop
async for item in async_generator():
# report the result
print(item)
# start
asyncio.run(custom_coroutine())
async with
创建和使用异步的上下文管理器。
异步上下文管理器能够等待enter和exit方法,其实现了**__aenter__()和__aexit__()**方法。
示例:
...
# create and use an asynchronous context manager
async with AsyncContextManager() as manager:
...
等价于:
...
# create or enter the async context manager
manager = await AsyncContextManager()
try:
# do something
...
finally:
# close or exit the context manager
await manager.close()
和普通的with语句类似,但是不可互换。
# SuperFastPython.com
# example of async with and an asynchronous context manager
import asyncio
# define an asynchronous context manager
class CustomContextManager:
# enter the async context manager
async def __aenter__(self):
# report a message
print('>entering the context manager')
# block for a moment
await asyncio.sleep(0.5)
# exit the async context manager
async def __aexit__(self, exc_type, exc, tb):
# report a message
print('>exiting the context manager')
# block for a moment
await asyncio.sleep(0.5)
# define a simple coroutine
async def custom_coroutine():
# report a message
print('before the context manager')
# create and use the asynchronous context manager
async with CustomContextManager() as manager:
# report the result
print(f'within the manager')
# report a message
print('after the context manager')
# start
asyncio.run(custom_coroutine())