基础知识

image

进程

操作系统分配资源的最小单位。

线程

程序执行的最小单位。共享进程级的资源。切换由操作系统负责调度

协程

由程序员自己管理的轻量级线程,对内核来说不可见。

同步IO

do _something()
f = open('file', 'r')
r = f.read()  # 线程阻塞,等待IO操作结果
# IO操作完成后线程才能继续执行
do_something()

异步IO

异步IO模型需要一个消息循环,在消息循环中,主线程不断重复“读取消息-处理消息”这一过程。

loop = get_event_loop()
while True:
    event = loop.get_event()
    process_event(event)

消息模型其实早在应用在桌面应用程序中了。一个GUI程序的主线程就负责不停地读取消息并处理消息。所有的键盘、鼠标等消息都被发送到GUI程序的消息队列中,然后由GUI程序的主线程处理。 在“发出IO请求”到收到“IO完成”的这段时间里,同步IO模型下,主线程只能挂起,但异步IO模型下,主线程并没有休息,而是在消息循环中继续处理其他消息。这样,在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。

asyncio

asyncio的编程模型就是一个消息循环。从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。

注意:这里的coroutine是没有运行的,扔到事件循环中才能运行,在事件循环中运行的称作task。

import asyncio

@asyncio.coroutine
def hello():
    print("Hello world!")
    # 异步调用asyncio.sleep(1):
    r = yield from asyncio.sleep(1)
    print("Hello again!")

# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()

解析:

@asyncio.coroutine把一个generator标记为coroutine类型。

然后,我们就把这个coroutine扔到EventLoop中执行。 hello()会首先打印出Hello world!,然后,yield from语法可以让我们方便地调用另一个generator

由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。

asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。

异步原理:

所有协程都运行在同一个线程里,由事件循环统一调度。当一个协程在等待 I/O 时,会主动交出当前线程的执行权,回到事件循环。事件循环则利用这段等待时间去执行其他协程,并依赖操作系统的通知来感知 I/O 完成,进而通过回调机制唤醒相应的协程

生成器的嵌套

yield from是生成器的嵌套,python早先利用生成器的嵌套实现coroutine。

  • 调用方:调用委托生成器的代码
  • 委托生成器:包含yield from表达式的生成器函数
  • 子生成器:yield from右边的生成器函数 yield from将实现两个生成器的嵌套,分别是委托生成器和子生成器。其中委托生成器,只起一个桥梁作用,它建立的是一个双向通道,它并没有权利也没有办法,对子生成器yield回来的内容做拦截,而是将子生成器yield的数据传递给调用方。只有子生成器return后,yield from左边的变量才会被赋值,委托生成器才能继续进行。yield from可以让我们避免让我们自己处理各种料想不到的异常,而让我们专注于业务代码的实现。

协程 vs 生成器

  • 生成器以迭代的方式生产数据
  • 协程消耗数据
  • 协程和迭代无必然联系

协程 vs 对象

协程看起来类似一个处理数据的对象。

  • 协程是一个function定义
  • 协程更快

新API

asyncawait是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:

  1. @asyncio.coroutine替换为async
  2. yield from替换为await

新增概念:

  • 可以被await的对象是awaitable类型对象,其实现了__await__方法,本质是代码逻辑。
  • Future对象,是一个用来存放“未来结果”的容器。Future 实现了底层协议,允许协程 await 它。当你 await future 时,协程会暂停,直到有人调用 future.set_result() 给它填入结果。
  • 协程和Future都可以被await,await协程是将控制权在当前线程内从父协程转移到子协程,await Future基于回调的异步挂起,控制权立即从协程转移回事件循环。

简单说:

  • 事件循环是调度者;
  • 协程是执行者;
  • Future是令牌,是媒介;

实现一个协程: 异步等待

asyncio.sleep(1)这个最简单的异步操作,它背后的核心代码大致如下:

async def sleep(delay, result=None):
    # 1. 获取当前正在运行的事件循环
    loop = events.get_running_loop()
    
    # 2. 创建一个 Future 对象
    # 这个 Future 就是用来“挂起”当前协程的占位符
    future = loop.create_future()
    
    # 3. 核心魔法:注册一个定时器
    # 告诉事件循环:"请在 delay 秒后,执行 _set_result_unless_cancelled 函数,
    # 并把 future 和 result 作为参数传给它"
    # h 是一个 TimerHandle(定时器句柄),代表这个闹钟
    h = loop.call_later(delay, futures._set_result_unless_cancelled, future, result)
    
    # 4. 等待 Future 完成
    # 这里会触发协程挂起,控制权交还给事件循环
    try:
        return await future
    finally:
        # 5. 清理工作
        # 无论是因为超时醒来,还是被外部取消了,都要取消这个闹钟
        # 防止任务被取消后,闹钟响了还在尝试唤醒一个已经死掉的协程
        h.cancel()

流程梳理:

  1. 当其他协程调用 await asyncio.sleep(1)。
  2. sleep 函数内部创建了一个 Future,并调用 await future。
  3. 协程暂停,并把“唤醒自己”的回调注册到这个 Future 上,然后交还控制权。
  4. 事件循环在delay秒后,执行定时器回调 futures._set_result_unless_cancelled(future, result)。
  5. 这个回调函数内部会调用 future.set_result(result)。
  6. set_result 触发了回调链,最终调用了协程的“唤醒方法”,协程被重新加入事件循环的待执行队列。
  7. 事件循环在下一轮调度中,恢复该协程的执行。

实现一个协程:异步网络接收。

async def my_raw_recv(sock, nbytes):
    """
    这是一个完全手写的异步 I/O 函数。
    它没有使用 asyncio.open_connection,而是直接操作 socket 和 event loop。
    """
    # 1. 获取当前运行的事件循环
    loop = asyncio.get_running_loop()
    
    # 2. 创建一个 Future,用来代表“未来的数据结果”
    # 这就是 await 等待的那个东西
    future = loop.create_future()
    
    # 3. 核心魔法:
    # 告诉事件循环:“请监听这个 socket 的读取事件。
    # 一旦有数据了,请把数据填进 future,并标记 future 为完成。”
    # reader=sock.recv 是同步回调,但 loop.sock_recv 是异步注册
    loop.add_reader(sock.fileno(), lambda: future.set_result(sock.recv(nbytes)))
    
    # 4. 挂起当前协程,等待 future 完成
    # 当事件循环检测到 sock 可读时,future 会 set_result,这里就会醒来
    data = await future
    
    # 5.  cleanup:数据拿到了,把监听取消掉(防止重复触发)
    loop.remove_reader(sock.fileno())
    
    return data

当使用各种异步io库时,内部其实就是做了上面这些事

  1. 创建 Future。
  2. 注册 I/O 事件(通过 add_reader 或定时器)。
  3. 返回 Future 供 await。

协程中的异常

异常延迟抛出: 协程中的异常有时可能在程序结束时才抛出。原因是python垃圾回收机制。消息循环的异常处理器只有在Task被释放时才执行。当用户代码中Task被引用时,直到程序结束时才释放,进行垃圾回收,触发异常处理器。 异常及时处理方法:

  • 对关心异常的Task,主动await
  • 推荐采用callback接口,当Taskfinished的时候自动执行回调函数,处理异常。用Task.result()判断是否发生异常

常用代码

async for

async for用于遍历异步迭代器,异步迭代器依次返还可等待对象,调用者从每个可等待对象中检索结果。

异步迭代器实现了__aiter__()方法和__anext__方法。__anext__方法返回可等待对象,直到迭代结束返回StopAsyncIteration

for循环假定每个可等待对象都会有返回值,async forawait 可等待对象,获得其返回值。

和普通的for循环用法接近,但不可以互换。示例:

...
# traverse an asynchronous iterator
async for item in async_iterator:
    print(item)

# build a list of results
results = [item async for item async_iterator]

以下示例按顺序打印0~9。

# SuperFastPython.com
# example of async for with an asynchronous generator
import asyncio

# define an asynchronous generator
async def async_generator():
    # normal loop
    for i in range(10):
        # block to simulate doing work
        await asyncio.sleep(1)
        # yield the result
        yield i

# define a simple coroutine
async def custom_coroutine():
    # asynchronous for loop
    async for item in async_generator():
        # report the result
        print(item)

# start
asyncio.run(custom_coroutine())

async with

创建和使用异步的上下文管理器。

异步上下文管理器能够等待enter和exit方法,其实现了**__aenter__()__aexit__()**方法。

示例:

...
# create and use an asynchronous context manager
async with AsyncContextManager() as manager:
    ...

等价于:

...
# create or enter the async context manager
manager = await AsyncContextManager()
try:
    # do something
        ...
finally:
    # close or exit the context manager
    await manager.close()

和普通的with语句类似,但是不可互换。

# SuperFastPython.com
# example of async with and an asynchronous context manager
import asyncio

# define an asynchronous context manager
class CustomContextManager:
    # enter the async context manager
    async def __aenter__(self):
        # report a message
        print('>entering the context manager')
        # block for a moment
        await asyncio.sleep(0.5)

    # exit the async context manager
    async def __aexit__(self, exc_type, exc, tb):
        # report a message
        print('>exiting the context manager')
        # block for a moment
        await asyncio.sleep(0.5)

# define a simple coroutine
async def custom_coroutine():
    # report a message
    print('before the context manager')
    # create and use the asynchronous context manager
    async with CustomContextManager() as manager:
        # report the result
        print(f'within the manager')
    # report a message
    print('after the context manager')

# start
asyncio.run(custom_coroutine())

<
Previous Post
并发执行
>
Next Post
描述器