社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

最新的 Python 异步到底是怎么实现的?本篇为你深度解析!

进击的Coder • 5 年前 • 472 次点击  

转载来源

公众号:Python 学习开发

阅读本文大概需要 9 分钟。


简述

PEP492 引入了对 Python 3.5 的原生协程和 async/await 句法的支持。本次提案添加了对异步生成器的支持进而来扩展 Python 的异步功能。

理论和目标

常规生成器(在 PEP 255 中引入)的实现,使得编写复杂数据变得更优雅,它们的行为类似于迭代器。
当时没有提供async for使用的异步生成器。 编写异步数据生成器变得非常复杂,因为必须定义一个实现 __aiter__ 和 __anext__ 的方法,才能在 async for 语句中使用它。
为了说明异步生成器的重要性,专门做了性能测试,测试结果表明使用异步生成器要比使用异步迭代器快 2 倍多。
下面的代码是演示了在迭代的过程中等待几秒:

class Ticker:
    """Yield numbers from 0 to `to` every `delay` seconds."""

    def __init__(self, delay, to):
        self.delay = delay
        self.i = 0
        self.to = to

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= self.to:
            raise StopAsyncIteration
        self.i += 1
        if i:
            await asyncio.sleep(self.delay)
        return i

我们那可以使用下面的代码实现同样的功能:

async def ticker(delay, to):
    """Yield numbers from 0 to `to` every `delay` seconds."""
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

详细说明

异步生成器

我们直到在函数中使用一个或多个 yield 该函数将变成一个生成器。

def func():            # 方法
    return

def genfunc():         # 生成器方法
    yield

我们提议使用类似的功能实现下面异步生成器:

async def coro():      # 一个协程方法
    await smth()

async def asyncgen():  # 一个异步生成器方法
    await smth()
    yield 42

调用异步生成器函数的结果是异步生成器对象,它实现了 PEP 492 中定义的异步迭代协议。
注意:在异步生成器中使用非空 return 语句会引发 SyntaxError 错误。

对异步迭代协议的支持

该协议需要实现两种特殊方法:
__aiter__ 方法返回一个异步迭代器。
__anext__ 方法返回一个 awaitable 对象,它使用 StopIteration 异常来捕获 yield 的值,使用 StopAsyncIteration 异常来表示迭代结束。
异步生成器定义了这两种方法。 让我们实现一个一个简单的异步生成器:

import asyncio
async def genfunc():
    yield 1
    yield 2

gen = genfunc()

async def start():
    assert gen.__aiter__() is gen
    assert await gen.__anext__() == 1
    assert await gen.__anext__() == 2
    await gen.__anext__()  # This line will raise StopAsyncIteration.

if __name__ == '__main__':
    asyncio.run(start())

终止

PEP 492 提到需要使用事件循环或调度程序来运行协程。 因为异步生成器是在协程使用的,所以还需要创建一个事件循环来运行。
异步生成器可以有 try..finally 块,也可以用 async with 异步上下文管理代码快。 重要的是提供一种保证,即使在部分迭代时,也可以进行垃圾收集,生成器可以安全终止。

async def square_series(con, to):
    async with con.transaction():
        cursor = con.cursor(
            'SELECT generate_series(0, $1) AS i', to)
        async for row in cursor:
            yield row['i'] ** 2

async for  i in square_series(con, 1000):
    if i == 100:
        break

上面代码演示了异步生成器在 async with中 使用,然后使用 async for 对异步生成器对象进行迭代处理,同时我们也可以设置一个中断条件。
square_series() 生成器将被垃圾收集,并没有异步关闭生成器的机制,Python 解释器将无法执行任何操作。
为了解决这个问题,这里提出以下改进建议:
1.在异步生成器上实现一个 aclose 方法,返回一个特殊 awaittable 对象。 当 awaitable 抛出 GeneratorExit 异常的时候,抛出到挂起的生成器中并对其进行迭代,直到发生 GeneratorExit 或 StopAsyncIteration。这就是在常规函数中使用 close 方法关闭对象一样,只不过 aclose 需要一个事件循环去执行。
2.不要在异步生成器中使用 yield 语句,只能用 await。
3.在sys模块中加两个方法:set_asyncgen_hooks() and get_asyncgen_hooks().
sys.set_asyncgen_hooks() 背后的思想是允许事件循环拦截异步生成器的迭代和终结,这样最终用户就不需要关心终结问题了,一切正常。
sys.set_asyncgen_hooks() 可以结束两个参数
firstiter:一个可调用的,当第一次迭代异步生成器时将调用它。
finalizer:一个可调用的,当异步生成器即将被 GC 时将被调用。
当第一迭代异步生成器时,它会引用到当前的 finalizer。
当异步生成器即将被垃圾收集时,它会调用其缓存的 finalizer。假想在事件循环激活异步生成器开始迭代的时候, finalizer 将调用一个 aclose() 方法.
例如,以下是如何修改 asyncio 以允许安全地完成异步生成器:

# asyncio/base_events.py

class BaseEventLoop:

    def run_forever(self):
        ...
        old_hooks = sys.get_asyncgen_hooks()
        sys.set_asyncgen_hooks(finalizer=self._finalize_asyncgen)
        try:
            ...
        finally:
            sys.set_asyncgen_hooks(*old_hooks)
            ...

    def _finalize_asyncgen(self, gen):
        self.create_task(gen.aclose())

第二个参数 firstiter,允许事件循环维护在其控制下实例化的弱异步生成器集。这使得可以实现“shutdown”机制,来安全地打开的生成器并关闭事件循环。
sys.set_asyncgen_hooks() 是特定线程,因此在多个事件循环并行的时候是安全的。
sys.get_asyncgen_hooks() 返回一个带有 firstiter 和 finalizer 字段的类似于类的结构。

asyncio

asyncio 事件循环将使用 sys.set_asyncgen_hooks() API 来维护所有被调度的弱异步生成器,并在生成器被垃圾回收时侯调度它们的 aclose() 方法。
为了确保 asyncio 程序可以可靠地完成所有被调度的异步生成器,我们建议添加一个新的事件循环协程方法 loop.shutdown_asyncgens()。 该方法将使用 aclose() 调用关闭所有当前打开的异步生成器。
在调用loop.shutdown_asyncgens() 方法之后,首次迭代新的异步生成器,事件循环就会发出警告。 我们的想法是,在请求关闭所有异步生成器之后,程序不应该执行迭代新异步生成器的代码。
下面是一个关于如何使用 Ashutdown_asyncgens 的例子:

try:
    loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())#关闭所有异步迭代器
    loop.close()

异步生成器对象

该对象以标准 Python 生成器对象为模型。 本质上异步生成器的行为复制了同步生成器的行为,唯一的区别在于 API 是异步的。
定义了以下方法和属性:
1.agen.__aiter__(): 返回 agen.
2.agen.__anext__(): 返回一个 awaitable 对象, 调用一次异步生成器的元素。
3.agen.asend(val): 返回一个 awaitable 对象,它在 agen 生成器中推送 val对象。 当 agen 还没迭代时,val 必须为 None。
上面的方法类似同步生成器的使用。
代码例子:




    
import asyncio


async def gen():
    await asyncio.sleep(0.1)
    v = yield 42
    print(v)
    await asyncio.sleep(0.2)



async def start():
    g = gen()

    await g.asend(None)  # Will return 42 after sleeping
    # for 0.1 seconds.

    await g.asend('hello')  # Will print 'hello' and
    # raise StopAsyncIteration
    # (after sleeping for 0.2 seconds.)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

4.agen.athrow(typ, [val, [tb]]): 返回一个 awaitable 对象, 这会向 agen 生成器抛出一个异常。
代码如下:

import asyncio


async def gen():
    try:
        await asyncio.sleep(0.1)
        yield 'hello'
    except IndexError:
        await asyncio.sleep(0.2)
        yield 'world'


async def start ():
    g = gen()
    v = await g.asend(None)
    print(v)  # Will print 'hello' after
    # sleeping for 0.1 seconds.

    v = await g.athrow(IndexError)
    print(v)  # Will print 'world' after
    # $ sleeping 0.2 seconds.


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

5.agen.aclose(): 返回一个 awaitable 对象, 调用该方法会抛出一个异常给生成器。

import asyncio


async def gen():
    try:
        await asyncio.sleep(0.1)
        v = yield 42
        print(v)
        await asyncio.sleep(0.2)
    except:
         print("运行结束"


async def start():
    g = gen()
    v=await g.asend(None)
    print(v)
    await g.aclose() #不做异常处理会报错


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

6.agen.__name__ and agen.__qualname__:可以返回异步生成器函数的名字。

async def gen():
    try:
        await  asyncio.sleep(0.1)
        v = yield 42
        print(v)
        await asyncio.sleep(0.2)
    except:
         print("运行结束")

async def start():
    g = gen()
    print(g.__aiter__())#输出async_generator对象
    print(g.__name__)#输出gen
    print(g.__qualname__)#输出gen

其他的方法

agen.ag_await: 正等待的对象(None). 类似当前可用的 gi_yieldfrom for generators and cr_await for coroutines.
agen.ag_frame, agen.ag_running, and agen.ag_code: 同生成器一样

StopIteration and StopAsyncIteration 被替换为 RuntimeError,并且不上抛。

源码实现细节

异步生成器对象(PyAsyncGenObject)与 PyGenObject 共享结构布局。 除此之外,参考实现还引入了三个新对象:
PyAsyncGenASend:实现 __anext__ 和 asend() 方法的等待对象。
PyAsyncGenAThrow:实现 athrow() 和 aclose() 方法的等待对象。
PyAsyncGenWrappedValue:来自异步生成器的每个直接生成的对象都隐式地装入此结构中。 这就是生成器实现如何使用常规迭代协议从使用异步迭代协议生成的对象中分离出的对象。 PyAsyncGenASend和 PyAsyncGenAThrow 是 awaitable 对象(它们有 __await__ 方法返回 self)类似于 coroutine 的对象(实现__iter__,__ next__,send() 和 throw() 方法)。 本质上,它们控制异步生成器的迭代方式。

PyAsyncGenASend and PyAsyncGenAThrow

PyAsyncGenASend 类似生成器对象驱动 __anext__ and asend() 方法,实装了异步迭代协议。
agen.asend(val) 和 agen.__anext__() 返回一个 PyAsyncGenASend 对象的一个引用。 (它将引用保存回父类 agen 对象。)
数据流定义如下:
1.首次调用 PyAsyncGenASend.send(val) 时, val将 推入到父类 agen 对象 (PyGenObject 利用现有对象。)
对 PyAsyncGenASend 对象进行后续迭代,将 None 推送到 agen。
2.首次调用 _PyAsyncGenWrappedValue 对象时,它将被拆箱,并且以未被装饰的值作为参数会引发 StopIteration 异常。
3.异步生成器中的 return 语句引发 StopAsyncIteration 异常,该异常通过 PyAsyncGenASend.send() 和 PyAsyncGenASend.throw() 方法传播。
4.PyAsyncGenAThrow与PyAsyncGenASend非常相似。 唯一的区别是PyAsyncGenAThrow.send() 在第一次调用时会向父类 agen 对象抛出异常(而不是将值推入其中。)

新的标准库方法和Types

1.types.AsyncGeneratorType -- 判断是否是异步生成器对象
2.sys.set_asyncgen_hooks() 和 sys.get_asyncgen_hooks()--
在事件循环中设置异步生成器终结器和迭代拦截器。
3.inspect.isasyncgen() 和 inspect.isasyncgenfunction() :方法内省。
4.asyncio 加入新方法: loop.shutdown_asyncgens().
5.collections.abc.AsyncGenerator: 抽象基类的添加。

是否支持向后兼容

该提案完全支持向后兼容
在 python3.5,async def 里使用 yield 会报错,因此在 python3.6 引入了安全的异步生成器

性能展示

常规生成器




    
import time


def gen():
    i = 0
    while i 100000000:
        yield i
        i += 1


if __name__ == '__main__':
    start = time.time()
    list(gen())
    end = time.time()
    print("totals time"end - start)

输出

totals time 14.837260007858276

15s 左右

异步迭代器的改进

import time
import asyncio

N = 10 ** 7
class AIter:
    def __init__(self):
        self.i = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= N:
            raise StopAsyncIteration
        self.i += 1
        return i

async def start():
    [_ async for _ in AIter()]
if __name__ == '__main__':
    s=time.time()
    loop=asyncio.get_event_loop()
    try:
     loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
    e=time.time()
    print("total time",e-s)

输出

total time 5.441649913787842

很明显迭代异步生成器的速度比迭代普通生成器不只是快了两倍。
我们可以做一个更简单的异步生成器

import time
import asyncio

async def ticker(delay, to):
    """Yield numbers from 0 to `to` every `delay` seconds."""
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

async def start():
    async for item in ticker(0.000001,100):
        print(item)
if __name__ == '__main__':
    s=time.time()
    loop=asyncio.get_event_loop()
    try:
     loop.run_until_complete(start())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
    e=time.time()
    print("total time",e-s)

设计中要注意的事项

内建函数: aiter() and anext()
最初,PEP 492 将 __aiter__ 定义为应返回等待对象的方法,从而产生异步迭代器。
但是,在CPython 3.5.2中,重新定义了 __aiter__ 可以直接返回异步迭代器。
为了避免破坏向后兼容性,决定 Python 3.6 将支持两种方式:__aiter__ 仍然可以在发出 DeprecationWarning 时返回等待状态。由于 Python 3.6 中 __aiter__ 的这种双重性质,我们无法添加内置的 aiter()  的同步实现。 因此,建议等到 Python 3.7。

异步list/dict/set 推导式

将放在单独的 pep 中也就是后来的 pep530.

异步 yield from

对于异步生成器,yield from 也不那么重要,因为不需要提供在协程之上实现另一个协同程序协议的机制。为了组合异步生成器,可以使用 async for简化这个过程:

async def g1():
    yield 1
    yield 2

async def g2():
    async for v in g1():
        yield v

为了 asend() 和 athrow() 是必须的

它们可以使用异步生成器实现类似于 contextlib.contextmanager 的概念。 例如,可以实现以下模式:

@async_context_manager
async def ctx():
    await open()
    try:
        yield
    finally:
        await close()

async with ctx():
    await ...

另一个原因是从 __anext__ 对象返回的对象来推送数据并将异常抛出到异步生成器中,很难正确地执行此操作。 添加显式的asend()和athrow()更获取异常后的数据。
在实现方面,asend() 是 __anext__ 更通用的版本,而 athrow() 与 aclose() 非常相似。 因此,为异步生成器定义这些方法不会增加任何额外的复杂性。

代码示例

async def ticker(delay, to):
    for i in range(to):
        yield i
        await asyncio.sleep(delay)


async def run():
    async for i in ticker(110):
        print(i)


import asyncio
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(run())
finally:
    loop.close()

这代码将打出 0-9,每个数字之间的间隔为 1s。

提议者

Guido, 2016 年9 月 6 日

参考资料

[1]    https://github.com/1st1/cpython/tree/async_gen
[2]    https://mail.python.org/pipermail/python-dev/2016-September/146267.html
[3]    http://bugs.python.org/issue28003

版权声明

本文章翻译整理自,pep525
Source: https://github.com/python/peps/blob/master/pep-0525.txt
本文由 CXA 翻译自: 2019 年 1 月 25 日,翻译能力有限还请大家多多指教。

推荐阅读

1

跟繁琐的命令行说拜拜!Gerapy分布式爬虫管理框架来袭!

2

跟繁琐的模型说拜拜!深度学习脚手架 ModelZoo 来袭!

3

只会用Selenium爬网页?Appium爬App了解一下

4

妈妈再也不用担心爬虫被封号了!手把手教你搭建Cookies池


崔庆才

静觅博客博主,《Python3网络爬虫开发实战》作者

隐形字

个人公众号:进击的Coder

长按识别二维码关注

这里“阅读原文”,查看更多


今天看啥 - 高品质阅读平台
本文地址:http://www.jintiankansha.me/t/sa1r0rQUWC
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/28425
 
472 次点击