上篇教程《深入理解Python异步编程(上)》中,我们深入学习了异步编程的相关概念,如阻塞/非阻塞、同步/异步、并行/并发;也学习了异步的难点与在Python中的发展历程,学习了asyncio
的原型,知道了事件循环(Event Loop)、回调(Callback)、协程(Coroutine)、未来对象(Future)、任务(Task)之间的联系,彻底弄清楚了异步编程是什么、为什么、怎么样。此篇教程将带领大家学习如何使用asyncio
。
提示:本文共2万余字,建议先走马观花看个大概,再细读重点关心的部分。当然,也欢迎从头到尾仔细阅读。
内容安排 上篇 了解 异步编程及其紧密相关的概念,如阻塞/非阻塞、同步/异步、并发/并行等 掌握epoll + Callback + Event loop是如何工作的 掌握 Python 是如何逐步从回调到生成器再到原生协程以支持异步编程的 中篇 掌握 协程与任务如何使用与管理(如调度与取消调度) 掌握 同步原语的使用(Lock、Event、Condition、Queue) 了解 Python3.6到3.12各版本的asyncio的变化 下篇 理解 回调、协程、绿程(Green-Thread)、线程对比总结 了解 Gevent/libev、uvloop/libuv 与asyncio的区别和联系 掌握 asyncio 在大型应用中的可扩展性和性能优化 注 :为减轻学习负担,本教程基于 Python 3.6,历史遗留的某些旧的、或不再推荐的用法将刻意避开。所以讨论细节时,除非文中特别提及其他版本做对比,可能只适应于 3.6。
注2 :本文撰写时是Python3.6发布不久,更新时已是Python3.12。这期间的变动主要是性能改进、提供了新的上下文管理器和任务处理方法,并不影响本系列文章的主线内容,本文“中篇”会在文末列出3.6到3.12各版本的主要变化。若因作者疏漏导致文中有版本不兼容的代码示例,恳请大家斧正。
1 原生协程语法 1.1 async def 在Python 3.5之后,引入了async/await
关键字。用它们定义的函数/方法被称为原生协程。需注意,确切讲async def
定义的是一个原生协程函数 ,而调用协程函数 得到的返回值才是该函数对应的协程 。不过在口头交流中,一般不会刻意区分这两者。
async def read_data(db):
pass
# 类型为 function,原生协程函数
type(read_data)
# 类型为 coroutine,调用协程函数返回的才是协程
coro = read_data(db)
type(coro)
要注意的是,async def
的函数内可以没有await
语句。另外,3.5 和 3.6 有所区别:
async def func():
yield 42 # Python 3.5 语法错误
async def func():
yield 42 # Python 3.6 正确,func() 是一个异步生成器
async def func():
yield from coro() # Python 3.5 语法错误
async def func():
yield from coro() # Python 3.6语法错误
1.2 await await
用于接受可被异步等待的 awaitable
的对象的返回值,返回值可以绑定到一个变量。例如:
async def read_data(db):
data = await db.fetch('SELECT ...')
...
可被异步等待的awaitable
对象有如下几种:
被@asyncio.coroutine
装饰的生成器函数调用后返回的生成器协程; 实现了__await__
魔术方法的对象;__await__
方法必须返回迭代器,否则会报TypeError
。实现了__await__
方法的对象也被称为类似未来对象 (Future-like)。 关键字await
后面只能跟着awaitable
对象,否则会报TypeError
。而且 await
必须在 async def
的函数内使用。那么问题来了:
async def say_hello():
print('in say_hello')
return 'Hello'
async def say_world():
print('in say_world')
return 'World'
async def say_helloworld():
print('in say_helloworld')
value = await say_hello() + await say_world()
return value
按上述代码,say_hello()
和 say_world()
都使用await
发起调用并接收返回值。那say_helloworld()
这个入口又该怎么调用?又如何接收到它的结果呢?
2 协程的调用 在《上篇》中,我们已经学习了 EventLoop、Task、Future、Coroutine
之间的联系。Future
用来放置异步操作的返回结果,Task
用来管理协程,把Coroutine
对象封装到Task
中,然后启动EventLoop
程序就开始执行了。接下来学习如何使用asyncio
来完成这一系列操作。
2.1 内置事件循环 在《上篇》中,我们学习到了如何让事件循环不用关心业务逻辑,而只做事件监听的技术操作。所以EventLoop
可以作为基础技术组件事先编写好,asyncio
就提供了对 EventLoop
的封装。
2.1.1 事件循环策略 Python 内置的事件循环比我们在《上篇》中自己写的强大。首先,它引入了事件循环策略 的概念。事件循环策略是全局对象,每进程一个。asyncio
提供了API可以程序员获取和更改每个Python进程内的事件循环策略。如此,让应用开发有了更多的选择,可以使用 asyncio
默认的策略,也可以使用第三方提供或自己编写的策略。
asyncio.get_event_loop_policy() # 获取当前的事件循环策略
asyncio.set_event_loop_policy(policy) # 设置当前的事件循环策略
我们可能听说过uvloop
是一个很高效的第三方事件循环框架,以上两个接口就给我们提供了利用uvloop
代替内置事件循环的可能。
policy = uvloop.EventLoopPolicy() # 获取 uvloop 提供的事件循环策略
asyncio.set_event_loop_policy(policy) # 用uvloop的循环策略代替内置策略
事件循环策略中,又引入了事件循环上下文 的概念。一个事件循环的上下文,就是指一个线程,即每个线程可以设置不同的事件循环 。
2.1.2 事件循环 asyncio
提供了如下三个接口来获取或设置事件循环:
default_loop = asyncio.get_event_loop() # 获取当前线程的事件循环
default_loop = asyncio.get_event_loop_policy().get_event_loop() # 与上一行等效
asyncio.set_event_loop(loop) # 为当前线程设置事件循环
asyncio.get_event_loop_policy().set_event_loop(loop) # 与上一行等效
new_loop = asyncio.new_event_loop() # 根据当前事件循环策略生成一个新的事件循环
new_loop = asyncio.get_event_loop_policy().new_event_loop() # 与上一行等效
调用 get_event_loop()
得到的是默认事件循环。如果是 Unix-like 系统,会调用asyncio.SelectorEventLoop()
得到基于epoll
或kqueue
选择机制的事件循环;如果是Windows系统,会调用asyncio.ProactorEventLoop()
得到基于IOPC
(I/O完成端口)的事件循环。
注意 ,这里我们遇到了asyncio
的第一个坑。每个线程可以设置不同的事件循环,但是每个进程又只能有一个事件循环策略。导致想要使用多种事件循环的情况混乱。具体现象下篇再解释。先记住经验1 :同一个Python进程中,最好只使用一种事件循环策略,且在多线程情况下只使用该策略生成的事件循环对象。
第二个容易被忽略的坑是,使用默认策略时,主线程能够 asyncio.get_event_loop()
得到默认事件循环,但是子线程内做此操作却不行。会报RuntimeError
,提示当前线程无事件循环。uvloop
的策略也是这样。再记住经验2 :子线程中有异步时,需在子线程内先设置事件循环,或将主线程中获取到的循环对象传递给子线程。
从经验1 和经验2 可以推导出经验3:一个Python进程最好只有一种事件循环策略,只使用一个事件循环对象。
上述经验在Python < 3.6.x 版本中有效,随着Python的发展也许这些经验会发生变化。
2.1.3 协程加入事件循环调度
在《上篇》教程中,我们在fetch()
协程内调用了 selector
并在其上面注册了事件和对应的回调,然后再把协程封装到Task
中,loop()
里使用了与fetch()
内相同的selector
,所以loop()
启动后即可监听fetch()
内的事件并持续运行。
在使用asyncio
时,没有亲自调用selector
注册事件,如何将协程封装为Task
并加入到asyncio
提供的EventLoop
上呢?
# 获取事件循环对象
loop = asyncio.get_event_loop()
loop.create_task(say_helloworld())
按上述代码,我们就将say_helloworld()
这个协程封装为了 Task
对象,并加入了loop
中开始调度。只等着loop
启动,say_helloworld()
就会被执行。
2.1.4 事件循环的启停 asyncio
的事件循环对象提供了下列方法控制它的启停和状态检查。
loop.run_forever()
:启动事件循环,直到stop()
方法被调用。若先调stop()
后调run_forever()
,则把已经加入到loop
中的任务全部执行完后退出退出;若先调run_forever()
后调 stop()
则把当前已加入loop
的任务执行完然后退出。
loop.run_until_complete(coro_or_future)
: 启动事件循环,直到传递给它的协程执行结束或未来对象set_result()
得到结果,并且返回协程或未来对象的结果,然后结束事件循环。故,此方法可以同时调度协程运行并得到返回值。如果在调用此方法前已经向loop
加入了其他任务,则其他任务也会执行。
loop.is_running()
:loop
是否在运行中。
loop.stop()
: 停止事件循环的运行,但不会立即停止,参看run_forever()
说明。
loop.is_closed()
: loop
是否已被关闭。
loop.close()
: 关闭loop
,所有已调度但未执行的任务将被强制清理,也不会等待正在执行的任务结束。这个操作是毁灭性的,需谨慎。当loop
被关闭之后,它则不可以再次运行,若要再次调度任务,必须使用新创建的loop
。若一个loop
正在运行中,则无法被关闭,调close()
是会报RuntimeError
的。
有些同学懵逼了,stop()
和 close()
为啥要存在这两种不同的方法?因为这是两个不同的概念。loop
就像汽车的引擎,stop()
是熄火,close()
是报废。当引擎出现故障时,需要把它报废,报废之后必须换新的才能用。如果只是正常的跑完了目标距离,可以熄火,有了新任务重新点火就跑。正在运行中的引擎是不可以强制报废的。
2.2 放在一起执行 现在我们终于知道了如何解决 1.2 节末尾的问题。如下代码所示:
import asyncio
# 根据当前事件循环策略获取事件循环
# 得到的是 asyncio 默认的 UnixSelectorEventLoop
loop = asyncio.get_event_loop()
...
loop.create_task(say_helloworld())
loop.run_forever()
上述代码完成了say_helloworld()
协程的调度和执行,say_hello()
和 say_world()
也会随之被调用。但是上述代码并不能得到 say_helloworld()
的返回值,怎么才能输出它的返回值'helloworld'
呢?
import asyncio
import uvloop
# 顺便演示替换事件循环的用法
# 用 uvloop 的事件循环策略作为 asyncio 的默认策略
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 根据当前事件循环策略获取事件循环
# 得到的是 uvloop.Loop
loop = asyncio.get_event_loop()
...
retval = loop.run_until_complete(say_helloworld())
print(retval)
上述代码同样完成了对 say_helloworld()
的调用,和前一种方案不同的是:事件循环被替换了,循环执行完任务以后会自动退出,接收到了协程的返回值。
2.3 总结 本节我们学习了如何定义原生协程,如何在协程内如何调用别的协程,如何在将协程封装为任务加入事件循环,如何启停事件循环,如何接收协程的返回值,如何替换asyncio
的默认事件循环。还介绍了一些应该注意的采坑经验。
到此为止,我们已经学习了利用asyncio
库调度协程运行的基本用法。接下来,我们将学习asyncio
提供的 Task
和 Future
提供了哪些接口以及如何使用。以免在面对复杂问题时不知所措。
3 Future 和 Task 3.1 回顾 asyncio 核心原理 在《上篇》中,我们学习了asyncio的核心运行原理,讲解了 EventLoop + Coroutine + Task + Future
是如何协同完成异步程序的调度和执行的。
下面我们看看《上篇》异步爬虫的关键运行过程,asyncio
的框架的核心也是如此。
asyncio 核心原理图 如上图所示,asyncio 封装了事件循环、任务对象、未来对象。程序员用协程编写需要异步处理的业务,然后把这些协程按 asyncio 提供的接口转换为Task
任务对象,再启动事件循环,当事件发生以后,给对应的Future
对象设置结果,再执行到Task
里的step()
... 这样一直循环下去,就能执行完所有异步任务。
聪明的读者看到此处,应该会有两个疑惑:
不是改为原生协程了吗,为何图上还写着 yield 解释生成器版本的协程? 在第2节的学习中,并没创建 Future
对象,怎么回事? 3.1.1 无需 EventLoop 执行原生协程的魔法 先解释第一个疑惑,为什么上述原理图中还画着生成器协程。在《上篇》教程中我们提及过,基于async/await
语法的原生协程背后和生成器协程复用着共同的实现。来看一段代码:
# 复用 1.2 节的 say_hello(), say_world(), say_helloworld()
# 新写一个 run() 函数
def run(coro):
try:
coro.send(None)
except StopIteration as e:
return e.value
result = run(say_helloworld())
print(result)
上述代码执行后会打印出HelloWorld
,没有EventLoop
也可以执行协程,我们做到了。其原因就是async/await
的背后还是生成器。可以看出,原生协程里的 return
其实是把返回值放到异常对象里,在处理异常时读出返回值。这点是不是和Tornado
一样?
本小节只是为了演示原生协程的背后和生成器的关系。实际编程过程中我们不要玩这个技巧。因为自己要正确处理协程的所有事情很不容易。还是乖乖地按照 asyncio 规定的接口去编程。
3.2 Future Future
是用于产生表示未来执行结果对象的类。所有的异步任务的返回结果,实际都会被封装到future
对象中。
3.2.1 Future与Task的瓜葛 通过《上篇》的学习,我们已经掌握了自己怎么写Future
和Task
的核心部分。重新审视一下设计理念,Future
代表的是“未来的执行结果”。Task
是为了把协程包装成任务,并驱动协程的执行,实际就是说Task
表示“未来将要执行的异步任务”。
若把“任务”和“未来对象”做完善,那“任务”当然需要知道自己是否还在执行、其结果是什么、如何注册或注销等等,“未来对象”也需要知道自己等待的结果好了没有,可以选择等或者不等这个结果。它们的行为很相似。从3.1节的原理图中可以看出Task
与Future
的关系紧密。
再审视一下代码,Task
的初始化方法中使用到了一个Future
对象,这个未来对象除了代表一个None
值,没其他用处。而且“任务”和“未来对象”那么多相似行为,Task
的主要不同只有step()
方法。
基于多种因素考虑,asyncio
里的Task
是派生自Future
的子类。3.1节第二个疑惑的答案就在此处,create_task()
的时候,实际上也创建了future
对象。
3.2.2 Future 未来对象 注意要和concurrent.futures.Future
区分开,这两者目前不完全兼容。concurrent.futures.Future
是为多线程和多进程执行异步任务而准备的,而asyncio.Future
是为单线程内用协程做异步编程而准备的。
要使用Future
对象,首先得学会创建Future
对象。显式地创建和获取它有如下两种方式。
方式1:fut = asyncio.Future(loop=None)
方式2:fut = loop.create_future()
方式1创建对象时可以传递loop
参数,若不传递则为默认值None
,将使用asyncio.get_event_loop()
获取事件循环。方式2创建的则使用调用create_future()
的loop
。
显式地创建出来的Future
对象没有和任何协程关联,只是被创建出来等待我们主动使用它。Future
也实现了__await__
方法,它们是可以被await
关键字调用的。
这两种创建未来对象的区别 在于,方式1的与其绑定的事件循环是默认用asyncio.get_event_loop()
得到的,而方式2的则是与调用create_future()
的loop
绑定。在实际使用时,根据需要选择创建方式。
下面了解 asyncio.Future
对象具有的方法:
cancel()
:取消future
并执行它的回调函数。若它已经拿到结果或已被取消,返回False
,否则,取消它并执行回调,再返回True
。
cancelled()
:若future
已被取消则返回True
。
done()
:若future
已经拿到结果,或抛了异常,或被取消成功都返回True
。
result()
:返future
代表的结果。若已被取消,则抛出CancelledError
;若还未拿到结果,则抛出InvalidStateError
;如果既拿到结果,也有异常,则抛出异常。
exception()
:返回在future
上设置的异常。如果没有则为None
。
add_done_callback(fn)
:为future
设置回调函数,这些回调函数将在future
拿到结果后执行。注意 ,这里只接受一个可调用的函数fn
,当执行回调的时候,只会给这个函数传递一个参数,就是future
自己,即fn(self)
。如果想给回调函数传递额外参数,需借助functools.partial
。
remove_done_callback(fn)
:从future
的回调函数列表里删除fn
的所有实例。如果用fn==callback
为True
,则该callback就会被移除。
set_result(result)
:为future
设置结果,并标记其状态为已完成。若future
已经被完成,则抛出InvalidStateError
。
set_exception(exception)
:为future
设置异常,并标记其状态为已完成。若future
已经被完成,则抛出InvalidStateError
。
3.2.3 Future 的使用场景 通过第2节的学习,我们学会了无需显式调用Future
和Task
类,也能够调度协程到EventLoop
上运行。但有些时候不够灵活,例如,我们想在某个Future
得到结果后,不仅仅是返回结果,而且要做一个其他操作,怎么办?这时候就需要显式地使用Future
对象。
import asyncio
loop = asyncio.get_event_loop()
async def take_exam(fut):
# 进行考试
await asyncio.sleep(1)
fut.set_result(100)
return 'Exam is completed.'
def check_score(fut):
score = fut.result()
if score >= 60:
print("Passed.")
else:
print("Failed.")
fut = asyncio.Future()
fut.add_done_callback(check_score)
task = asyncio.ensure_future(take_exam(fut))
retval = loop.run_until_complete(fut)
print(task.result()) # Exam is completed.
print(fut.result()) # 100
print(retval) # 100
在上述代码中,我们定义了take_exam()
协程函数,接收一个Future
对象作为参数,在协程内会进行异步非阻塞代替非阻塞I/O操作(用sleep()
代替),当异步操作完成以后,再设置fut
对象的值,最后该协程自己也返回一个数据。
asyncio.ensure_future(coro_or_future)
函数会将传递给它的协程封装为Task
,加入EventLoop
的调度,并返回该协程的Task
对象。故而task.result()
是协程return
的值。关于ensure_future()
的更多细节在《下篇》中继续讨论。
3.3 Task Task
主要用于管理需调度到EventLoop
上执行的协程对象。它的行为与Future
很类似,它实际也是Future
的子类,所以Future
具有的操作它也有。参考 3.2.1节。
显式地创建Task
对象有如下三种方式:
方式1:task = asyncio.Task(coro, *, loop=None)
方式2:task = asyncio.ensure_future(coro, *, loop=None)
方式3:task = loop.create_task(coro)
以上三种方式都能获取封装了coro
协程的task
对象,并且把task
调度到相应的事件循环上。区别在于,方式1和方式2可以接受一个可选关键字参数loop
,会把task
调度到传递的那个loop
上去,默认为asyncio.get_event_loop()
获取的loop
。而方式3则会将task
调度到调用create_task()
的loop
上。
3.3.1 ensure_future()
与create_task()
区别 ensure_future()
里实际调用的是create_task()
,create_task()
里实际调用了Task
来创建对象。为什么有了create_task()
以后还需要ensure_future()
呢?
如果传递给这两个函数的参数是协程,它们没什么区别。如果要把Future
或Future-like
(实现了__await__
方法)的对象传递给create_task()
就不行了,它只接受协程。ensure_future()
在接收到Future
对象时,什么也不做,直接返回该Future
对象,而接收到Future-like
对象时,需要先把Future-like
对象包装到一个协程里,再把协程传给create_task()
。
3.3.2 Task 对 Future 的扩展 Task
是Future
的子类,它除了具有Future
所具有的方法外,还扩展了如下几个方法:
classmethod all_tasks(loop=None)
:类方法,用于获取已调度到loop
上的所有任务。classmethod current_task(loop=None)
:类方法,用于获取loop
上正在执行的任务。get_stack(*, limit=None)
:获取任务对象的栈帧,若该任务对象状态已经为done
,则返回空列表。print_stack(*, limit=None, file=None)
:将任务对象的栈帧输出到文件。3.3.3 Task 的使用场景 得到Task
对象以后,可以通过add_done_callback()
来为它设置回调函数。所以可以通过 Task
把 Twisted 这种回调风格的框架和 asyncio 这种原生协程的框架结合起来。
一般地,在Python 3.6及其之后,推荐尽可能使用原生协程来做异步编程。而在这种情况下,我们无需直接调用Task
类的初始化方法来得到协程的任务对象,而是用create_task()
或ensure_future()
。
3.4 总结 本节我们学习了asyncio
库中Future
和Task
的核心概念和使用方式。Future
对象作为异步操作的结果占位符,为异步编程提供了极大的灵活性和控制能力。而Task
,作为Future
的子类,进一步封装了协程的调度和管理,使得异步编程更加高效和易于管理。通过深入理解这些概念,读者可以更好地掌握如何使用asyncio
来构建高效、可读性强的异步应用。
4 协程的调度与执行 4.1 单个协程的调度 在异步编程中,理解如何正确地调度和执行单个协程是很重要的。
4.1.1 协程的定义 在Python中,协程通过async def
定义。这种定义创建了一个协程函数,调用这个函数会返回一个协程对象。
async def my_coroutine():
# 协程的逻辑
...
4.1.2 调度单个协程 Python 3.7及更高版本中,可以使用asyncio.run()
来启动和调度协程。这个函数接收一个协程对象,运行它,并自动管理事件循环。
import asyncio
async def main():
# 协程的逻辑
...
# 启动协程
asyncio.run(main())
4.1.3 协程的等待与结果获取 在协程内部,可以使用await
来等待另一个协程的结果。await
会暂停当前协程的执行,直到等待的协程完成。
async def fetch_data():
# 模拟数据获取
await asyncio.sleep(2)
return "data"
async def main():
data = await fetch_data()
print(data)
asyncio.run(main())
4.1.4 错误处理 协程中可能出现的异常可以使用try...except
块来捕获和处理。
async def might_fail():
raise Exception("出错了!")
async def main():
try:
await might_fail()
except Exception as e:
print(f"错误捕获: {e}")
asyncio.run(main())
以下是一个完整的示例,展示了从定义协程函数到启动协程,并获取结果的流程:
import asyncio
async def compute_square(number):
await asyncio.sleep(1)
return number * number
async def main():
result = await compute_square(2)
print(f"结果: {result}")
asyncio.run(main())
通过以上步骤,我们展示了如何创建、调度,并执行单个协程,同时处理可能出现的错误。这为深入理解Python中的异步编程奠定了基础。
4.2 同时执行多个协程 前文我们已经学习了Future
和Task
如何使用以及它们的区别和联系,也知道如何利用EventLoop
执行异步任务。
但仍有个问题,回到《上篇》中的爬虫代码,之所以利用异步编程提高了它的整体效率,是因为我们一次性创建了10个不同的抓取任务,EventLoop
同时监听这10个异步任务的sock
对象的状态。同理,转换到asyncio
库时,我们也需要能一次性创建多个任务,并执行。
方式1:asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
方式2:asyncio.as_completed(fs, *, loop=None, timeout=None)
方式3:asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
4.2.1 asyncio.wait 用于等待多个未来对象或协程的执行。需注意本方法也是一个协程,需要用await
关键字来调用或使用EventLoop
调度协程执行的接口来执行。
asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
futures
: 必选参数,表示多个Future
或Coroutine
对象,如果是Coroutine
则会被封装为Task
。loop
: 可选参数,调度futures
执行的事件循环对象,默认为通过get_event_loop()
获取。timeout
:可选参数,等待futures
执行的超时时间(秒),默认为没有超时限制。注意,并不会抛出超时异常,只是结束wait
。return_when
: 可选参数,表示wait
返回的时机。可以取的值为FIRST_COMPLETED
, FIRST_EXCEPTION
, ALL_COMPLETED
,分别代表“执行完一个之后”、“碰到第一个异常之后”、“所有都完成之后”。asyncio.wait
返回一个包含两个元素的元组,分别为done
和pending
状态的未来对象或协程集合。默认情况下pending状态的集合为空,因为没有超时限制且等待了所有任务执行完成。
done, pending = await asyncio.wait([say_hello(), say_world()], timeout=1.5, return_when=asyncio.FIRST_COMPLETED)
上述代码演示了如何在协程内调用wait
等待多个异步任务的执行结果。示例中设置了1.5秒的超时时间,而且wait
的返回时机为FIRST_COMPLETED
第一个完成后。
注意 :超时并非严格的指协程从头执行到尾是否超过该时间限制,某些情况下done
和pending
集合中得到的结果非预期;例如协程中有计算密集型任务耗时较多,而非阻塞异步操作的时间又恰好与超时限制一致,此时最稳妥的办法是遍历这两个集合中的任务,根据task.done()
自行判断任务的状态。当然这种极端情况很少,不过我们需要知道存在这种问题,以免遇到done
和pending
集合非预期时不知所措,更详细的部分在《下篇》中探讨。
如果要获取已经执行完成的异步任务的返回值,可以取出done
集合中的Task
对象,通过task.result()
获取。
4.2.2 asyncio.as_completed asyncio.as_completed(fs, *, loop=None, timeout=None)
此函数接收一组Future
或Coroutine
对象,返回一个迭代器。可以接收loop
和timeout
参数,默认无超时限制。
其用法如下:
async def main():
values = []
for f in asyncio.as_completed([say_hello(), say_world()], timeout=2):
ret = await f
values.append(ret)
return values
需要注意三个点,一是迭代器返回的future或task对象的顺序与传参时的顺序无关,二是超时以后会抛出asyncio.TimeoutError
异常(与asyncio.wait()
函数的超时不同),三是迭代器中的每个future
对象最好都用await
等待其结果,否则本该被异步调用的任务没有被异步调用,程序可能会有非预期行为。
当有多个协程或未来对象需一个接一个地、顺序无关地执行,且需要设置超时限制时,本函数as_completed()
适用。否则,直接用for
循环迭代协程列表一个一个地await
就可以了。
4.2.3 asyncio.gather asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
本函数接收多个协程或未来对象,并返回一个future
对象,通过await
或loop
等待该future
的结果则可得到该组异步任务的结果列表,顺序与传参时对应。当return_exceptions
为False
时,则遇到异常就抛出,否则将异常作为结果返回到结果列表中,并不抛出异常。
当有多个协程或未来对象需同时执行,也要获取其返回结果时,本函数gather()
适用。
用法如下:
results = await asyncio.gather(say_hello(), say_world(), return_exceptions=True)
# 或者
results = loop.run_until_complete(asyncio.gather(*[say_hello(),say_world()]))
4.3 总结 本节我们学习了如何利用Future
和Task
对象对协程进行调度和执行。在 asyncio 中没有显式的调度器,只需要通过create_task
、ensure_future
、run_until_complete
等即可将协程加入loop
调度执行。我们还学习了单个协程和多个协程的调度执行。
5 同步操作 虽然大多数时候我们都是在单线程内使用协程并发地执行异步任务,但即便是单线程内,协程的调度以及完成顺序难以预测和控制。而在某些场景下,我们需要让一组有关联的协程按顺序执行,这时候我们就需要把异步的变成同步的。
asyncio
提供了锁 和队列 两种机制来把多个异步任务变成同步执行的。
5.1 锁 顾名思义,是一种保护措施。在计算机术语中,锁是指用来保护临界资源的机制 。临界资源是指不能被被多个程序同时访问/操作的数据(包括用数据结构来表示的文件、网络连接等任何资源)。例如存储了库存、银行账户余额的变量,它们就是临界资源,不能够被多个程序同时去修改。
当一段程序想要访问临界资源时,必须取得保护该临界资源的锁的权限,才可以进一步操作该临界资源。站在临界资源的角度,锁是保护它的,使临界资源在整个生命周期内保持正确性。站在异步并发程序的角度,锁是一种限制它访问临界资源的同步机制,以避免并发产生的副作用。
单纯地讲“锁”这个术语,就是上述的概念,还比较基础和抽象。为了应付软件的复杂性和沟通的效率,在不同的情况下还会衍生出不同类型的锁,其命名也花样百出。有些叫法是从临界资源是什么格式的数据角度出发,例如“表锁、行锁、文件锁”;有些叫法是从保护临界资源不被什么行为操作而变更的角度出发,例如“读锁、写锁”;还有些叫法是从锁的实现原理上命名,例如“乐观锁、悲观锁、自旋锁”;还有的叫法则是从锁是否可以被多个程序共享或互斥性质角度,如“排他锁、共享锁、独占锁、互斥锁”;还有的称呼是描述并发程序在获取临界资源时而进入了某种特殊状态,如“死锁、活锁、饥饿”;还有的锁的实现依赖于分布式系统,则称之为“分布式锁”……
希望大家不要对些数不清的锁望而生畏,其实锁的基本原理、基本特征、存在的意义就本节开头表述的那一句话而已,不过是针对具体场景、具体实现、或者观察角度不同而叫法不同。
就像生活中,有“电子锁、机械锁、自行车锁、门锁、自动锁、指纹锁、U型锁、环形锁、铜锁、金锁、智能锁、愚蠢锁……”。锁是什么?锁就是锁!计算机软件中也一样。
asyncio
为协程提供了5种具体的实现,分别是Lock
、Event
、Condition
、Semaphore
、BoundedSemaphore
。下面介绍它们的作用与用法。
5.1.1 Lock
asyncio.Lock
实例化以后可以得到原始的锁对象,它只具有locked
和unlocked
两种状态。刚初始化的Lock
对象的状态是unlocked
。可以通过acquire()
方法将状态从unlocked
更改为locked
,反之,通过release()
方法可以把locked
改为unlocked
。
Lock
对象主要会用到以下三个方法:
locked()
检查锁对象是否处于locked
状态,返回True
或False
。acquire()
加锁,把锁对象改为已加锁状态,并返回True
。如果多个协程都调用acquire()
方法来加锁,那么会按照调用顺序依次加锁。release()
释放锁,把锁对象从locked
改为unlocked
状态,然后返回,以便别的协程可以加锁。如果锁对象本就处于unlocked
状态,再调用release()
会报RuntimeError
。注意,acquire()
是一个协程,需按协程的方式调用。Lock
对象还实现了上下文管理器,可以使用with
语句。
示例代码如下,请按自己的想法多做一些修改尝试然后执行观察效果:
用生活中的场景类比,几个人(不同的任务)要吃同一口锅里的饭(共享数据),但只配了一把勺子(锁),只有拿到勺子的人可以去锅里打饭吃(访问或修改共享数据),而且一个人用完勺子(释放锁)后,该让谁(在争用锁的协程)接着用,是按先来后到的顺序。
根据这个比喻大家还可以思考一下,更多的人、更多的锅、更多的勺子、有的饭只能被一个勺子打、有的饭可以同时被多个勺子打等这多种场景应该如何处理?同样可以借鉴到编程中来,以下给出Lock
的使用场景:
互斥访问共享资源 :当多个协程需要访问相同的共享资源(例如共享数据结构、文件等)时,Lock
可以防止数据竞争和不一致性,确保每次只有一个协程能访问该资源。防止资源重入 :在某些情况下,一个协程在完成前不应被再次调用。使用Lock
可以防止这种重入,确保协程安全完成其工作。5.1.2 Event
Event
是事件 的实现。这种对象可以理解为事件通知器。多个协程可以利用它等待某个事件发生并获取通知。它内部有一个标志变量,初始化时为False
,调用它的wait()
方法后进入阻塞态,等待它的标志变为True
,即等待事件的发生。当一个事件发生后,可以通过set()
方法来改变标志状态为True
。
通过上述例子,可以看出Event
的作用跟比赛用的信号弹类似,信号弹爆炸了大家都得按它行动。一颗信号弹也只能用一次。以下给出Event
的使用场景:
协程间的信号通知 :当一个协程需要等待来自另一个协程的信号时,可以使用Event
。例如,在一个数据处理流程中,数据生产者可以通过设置Event
来通知数据消费者开始处理。状态变更通知 :如果系统的某部分状态发生变化(例如配置更新、系统就绪等),并且这个状态变化需要通知给多个协程,Event
可以作为一个简单的通知机制。5.1.3 Condition
Condition
是条件变量 的实现,它可以让一个或多个协程一直等待某个条件达成以后再继续运行。Conditon
和Event
很像,都是通过一个信号让别的协程继续运行下去,但有不同点:
Event
是一次性通知所有阻塞在该事件上的协程,而Condition
可以分别通知其中的一个或多个,而未被通知的继续等待。Condition
底层实现使用了Lock
对象,而Event
没有。关于上述第1点我们参考如下示例:
如果仅按上述示例的用途,Condition
和Event
几乎没有差异,尤其是用cond.notify_all()
的时候,效果跟Event
一毛一样,这不是重复造轮子吗?那么Condition
的特别之处有啥?
源于上述第2点不同。那为啥Condition
就得依靠锁来实现呢?其实条件变量 设计的本意比Event
复杂,它通常会和三种对象关联:指示条件是否已满足的对象、串行化访问该对象的互斥对象、等待条件的条件变量 。
条件变量通常和决定是否成立的共享对象关联,所以条件变量的底层实现使用了互斥锁对象Lock
。提供了acquire()
和release()
方法,当某个协程想要改动共享数据,告诉别人条件已修改,就必须使用acquire
来取得修改权。
Condition
对象还提供了coroutine wait_for(predicate)
方法,参数predicate
需要是一个直接调用就可以得到返回值的函数。当predicate()
的返回值的布尔上下文为真时,代码将继续执行。上述例子我们还可以换成如下写法:
本小节讲解了 asyncio
的同步机制之一:条件变量(Condition)。并展示了其三种主要使用方法,每种使用方法都适用于不同的情景,需在实践中不断体会。以下给出常用场景:
生产者-消费者 问题:在生产者和消费者模式中,Condition
可以用来确保消费者只在有数据可消费时才消费,同时允许生产者在生产新数据时通知消费者。这种情况下,Condition
用于同步生产者和消费者对共享资源(如队列)的访问。等待多个协程完成特定任务 :在某些情况下,可能需要等待多个协程完成它们的任务,直到一个特定的条件被满足(如数据达到特定状态)。这时,Condition
可以用来阻塞一个协程,直到另一个协程改变了状态并通知该条件对象。顺序控制 :如果有多个协程必须按特定顺序执行,Condition
可以用来控制执行流程,确保每个协程在前一个协程完成特定任务后才开始执行。定期检查共享资源 :在某些应用中,可能需要定期检查共享资源的状态,并在状态达到特定条件时执行操作。Condition
的wait_for()
方法可以在这种场景下很好地使用,它允许协程在条件满足时被唤醒并执行相应的操作。在这些场景中使用Condition
的主要优势在于其能够提供比简单的Lock
或Event
更复杂的同步机制,允许在复杂的协程间同步场景中精确控制执行流程。
5.1.4 Semaphore
Semaphore
信号量 ,原本是铁路交通系统中的一个术语,后被荷兰计算机科学家Dijkstra引入计算机科学领域。用于控制仅支持有限个用户同时操作的共享资源 ,在计算机系统中有着广泛的应用。
信号量对象的内部会维护一个计数值,取值范围是任意正整数,默认为1。当一个协程中对信号量进行一次acquire()
,其计数值就减1,release()
一次则加1。当计数值减至0时,后来的协程将阻塞在acquire()
操作上,直到有其他协程release()
该信号量。
当信号量对象内的计数值初始化为大于等于2的正整数,称其为一般信号量 或计数信号量 ;计数器初始化为1时,则其变动范围只有{0,1}两种可能,又被称为二进制信号量 ,二进制信号量又被称为互斥锁 。虽然asyncio
中的Lock
和Semaphore
分别由两个不同的类实现,但其源代码十分相似。从原理上我们已经知道,互斥锁是信号量的一个特例。
threading
模块也提供了 Semaphore
,原理都一样,使用方法也非常相似。不再赘述。
asyncio信号量 上图示例演示了 Semaphore
对象的用法,只允许3个worker同时被调度执行,第四个和第五个worker必须等待前面的release()之后,有“名额”了才能执行。以下给出信号量的使用场景:
限制资源访问 :在有限资源(如数据库连接、网络带宽)的环境中,使用信号量可以限制同时访问这些资源的协程数量,防止资源过度使用。控制并发度 :在执行并发操作时,如API调用或网络请求,信号量可以用来控制同时运行的操作数量,以避免超出服务器或API的负载能力。5.1.5 BoundedSemaphore
BoundedSemaphore
称为有界信号量 ,它和一般信号量只有在计数值管理上存在细微差别。有界信号量的计数值的初始化值就是上限,不允许越界,而Semaphore
的计数值在初始化之后却是可以通过多次release()
来提高上限。
以下给出有界信号量的使用场景:
严格控制资源访问上限 :在需要严格限制同时访问某个资源的最大协程数时使用,确保资源不会因为程序错误而被过度使用。防止程序逻辑错误 :在设计需要精确控制资源访问计数的场景时,BoundedSemaphore
可以帮助捕捉程序中的逻辑错误,如不恰当的release()调用。资源池管理 :管理有限资源的池(例如数据库连接池),确保池中资源数量不会因错误操作而超出预设的界限。5.2 队列 队列本质是一个用于存储容器 ,它的使用模式是“存进去——取出来 ”,正是因为这样的过程,所以队列的基本职责就是“缓冲、暂存 ”。
在此基础上,让存于其中的元素们具有某种特征的存取顺序 ,就构成了不同类型的队列,常见的有先进先出队列(FIFO Queue),优先级队列(Priority Queue),后进先出队列(LIFO Queue) 。
由队列的存储 实质可以看出,存储组件(如磁盘、数据库)具有的考量点它都具有,比如需考察队列的读写能力(I/O吞吐能力)、存储容量、可靠性(数据防缺损)等等。
正因为是“存进去——取出来 ”模式,故而队列扮演的角色就是协调数据存入方与数据取出方 ,另一个称呼是数据的生产者和消费者 。那么什么时候需要用到队列?很简单,既然是协调生产者和消费者,这两者不协调的时候就让它上!
那具体该如何思考?之前我们讲过,认识事物,从其时间结构和空间结构着手 ,当生产者与消费者在时间或空间上不协调时,队列就可以发挥作用。比如,生产者与消费者的工作速率不匹配,或者它们无法在同一个时间点直接交互(比如消费者数量不够),就是时间上的不协调。具体场景如高并发应用,产生请求的速率太高(QPS很高)而处理请求的程序(服务端的线程/协程数有限),所谓削峰填谷 是也。再比如,生产者不论主动或被动地无法直接与消费者交互,这就是空间不协调,比如程序解耦 ,跨域(跨作用域、线程、进程、网络域等等,不同级别的程序,有不同级别的队列)通信等。
我们只有像上面这样探究,从根本性质出发,才不会被各种表象所迷惑,也不会人云亦云而遇到困难时手足无措。对事物的本质认识越深刻,掌握它越轻松,越能够应付千变万化。
现在我们要学习的是Python标准库asyncio
里提供的异步队列 ,无他,不过是“存——取 ”这两个操作支持异步罢了。
5.2.1 Queue
asyncio.Queue
是一个搭配协程使用的先进先出 队列,跟搭配线程使用的quque.Queue
和搭配进程使用的multiprocessing.Queue
很相似,只是搭配的对象不同,进而实现细节不同。
asyncio.Queue(maxsize=0,*,loop=None)
有2个初始化参数,loop
指明调度该队列的事件循环对象,maxsize
限制队列的最大长度 。当maxsize <= 0 时,队列无限长度;当maxsize>0时,如果已经存入队列的元素数量超过限制(队列已满),则后来的元素想put()
存入队列,则会阻塞,直到别的元素被get()
取出留出空位。
asyncio.Queue
的常用方法如下:
empty()
检查队列是否为空,为空返回True,否则返回False。full()
检查队列是否已满,满了返回True,否则返回False,无限长队列只会返回False。coroutine get()
从队列中取出元素,如果队列为空,则一直等到有元素可取。
get_nowait()
从队列中取出元素并立即返回该元素,如果队列为空则抛出QueueEmpty
异常,而不是等待有元素可取出。 coroutine put(item)
向队列中存入元素,若队满则等着,直到有空位再存入。put_nowait()
向队列中存入元素,若队列已满,则抛出QueueFull
异常。qsize()
返回队列中已有元素的数量,注意这个值并不准确,因为非阻塞队列在被统计时,也有元素在不断地进出。coroutine join()
和 task_done()
这两个方法是 Python 3.4.4 版才新加入的,它俩需要搭配使用。这两方法稍显鸡肋,后文分析。Queue
示例代码:
大家先忽略上述代码中第13、14、26、27行,即调用 task_done()
和 join()
的地方, 实践中也不必写这几行代码,它们对元素的存取并无影响 。剩下的代码很容易理解,producer 不断地存入元素,只放了5个元素后退出,而 consumer 不断地取出元素进行处理,直到队列为空时退出。
join()
和task_done()
的探讨(不关心可忽略) :Queue
内部维护了一个名为_unfinished_tasks
的计数变量,其初始值为0,每当调用put()
或者put_nowait()
时该变量的值加1,而调用task_done()
时,该变量值减1。调用join()
后,若该值大于0,则会阻塞着直到该值变为0,然后返回join()
处继续往下执行。
问题来了,名为“未完成”的计数变量其增加是因为存入元素,而较少却并不是因为其直接取出元素,通常做法是取出元素进行处理之后,调用task_done()
来减少该值。如果对这两个方法不熟悉的同学,可能并不会将它们配对使用,就会出错,比如程序执行到join()
处就不继续执行了,即便队列中元素已经取出完毕。
鸡肋的另一点原因,因为队列中不定时地有元素在进进出出,并不能确切地知道_unfinished_tasks
变量何时变为0。那么join()
阻塞后再被唤醒的时机不好控制,如果刻意控制则会使代码逻辑变得晦涩。
关于join()
和task_done()
的讨论我们就此打住,像更深入探讨的同学欢迎留言。驹哥更推荐使用asyncio.Event
或者其他同步机制来显式而明确地控制协程在某种条件下的切换时机。而join()
和task_done()
背后的实现就是基于asyncio.Event
的。
5.2.2 PriorityQueue PriorityQueue
的对象所具有的方法跟 Queue
完全一样,只是元素取出的顺序是按优先级取出。通常用 (priority number, data)
这样的二元组表示一个元素,注意,priority number
值越小优先级越高 。
PriorityQueue
底层实现依赖于heapq
,实际上,队列中个元素的比较是按**富比较(Rich-Compare)**的方式进行的,所以存入其中的元素不仅可以是上述二元组的形式,还可以是字符串、整数、小数、自定义的对象等各种凡是实现了富比较
方法的对象。
但需注意付比较的细节规则,比如(1,2,3)
不可以和(1,2,'3')
比较,因为他们的第三个元素的数据类型不同,整数无法与字符串直接比较。但(1,2)
是可以和(1,2,3)
比较的。关于富比较
的更多细节,本微信公众号"驹说码事 "以后再发文解释。在彻底理解富比较之前,要存入优先级队列的元素先纯数字或二元组表示。一般情况下我们都应该这么做,让程序足够简明。
5.2.3 LifoQueue
LIFO(Last-In-First-Out)顾名思义它是后进先出 队列。它所拥有的操作方法也跟Queue
完全一样,仅仅是元素取出的顺序是越后存入队列的越先被取出。
其底层实现依赖于内置数据类型list
,put()
方法里直接调用list.append(element)
,get()
方法则直接调用了list.pop()
。
5.2.4 异步编程中Queue的使用场景 任务调度 :在异步应用中,用于在生产者和消费者之间分发任务,特别是当任务生产和消费的速率不一致时。流量控制 :当处理高并发请求时,asyncio.Queue可以作为缓冲区,帮助控制数据流,以避免过载。异步工作流程 :用于实现复杂的异步工作流程,如管道(pipeline)模式,其中数据逐步通过一系列的处理阶段。事件处理 :在事件驱动的应用中,用于管理和分发事件消息。5.3 总结 本节内容对asyncio
支持的同步操作的总结非常全面和详尽,介绍了asyncio
中多种同步机制的作用和应用场景。从锁(Lock
)的基本概念和用途,事件(Event
)的信号通知机制,条件变量(Condition
)的更复杂同步控制,到信号量(Semaphore
和BoundedSemaphore
)的资源访问控制,以及队列(Queue
, PriorityQueue
, LifoQueue
)在异步编程中的应用,这些内容详细描述了asyncio
库中不同同步操作的特点和使用方法。每个部分都配以合适的示例和解释,有助于深入理解异步编程中同步机制的重要性和实际应用。
6 回调函数的调度和执行 即便asyncio
提供了协程,但在有些情况下还是规避不了跟普通函数交互,即用不同函数写的回调函数。而且有些第三异步库的主要编程模式还是基于回调的,为了让这类框架要与asyncio
兼容也促使asyncio
提供了调度常规函数的方法。
6.1 调度接口 asyncio
调度回调函数有以下三种方式:
方式1: EventLoop.call_soon(callback, *args)
方式2: EventLoop.call_later(delay, callback, *args)
方式3: EventLoop.call_at(when, callback, *args)
方式1 call_soon
是立即调度callback
函数到EventLoop
中,将在下一次事件循环中被执行;方式2 call_later
是延迟delay
秒后执行;方式3 call_at
是在when
时刻执行。
注意,asyncio
内部维护了一个单调递增时钟EventLoop.time()
,其实就是time.monotonic()
, call_later
的delay
参数和call_at
的when
参数都是以它为标准,而不是time.time()
。最大的影响在于call_at()
的when
参数并不能 直接传递某个具体的时间戳。
另外有个注意点,上述调度函数中的*args
是传递给callback
的参数,并不能接收关键字参数。如果要传递关键字参数时,则需使用functools.partical()
方法封一层。
三种方式的示例代码如下:
上述代码中需特别留意之处是定时调用
call_at
函数,一定不能直接传递 datetime
对象的时间戳。其实 call_later
是基于 call_at
实现的,请读者自己思考下实现方案。
6.2 周期性回调 asyncio
本身并没有提供周期性执行回调函数的方法,不像Tornado
提供了PeriodicCallback
类。按 PEP 3156
的解释,asyncio
若是提供此方法是多此一举,如果是简单情况,那么不断地call_later()
自身就好了,也可以在循环里sleep()
,总之实现起来特别容易。如果是精度要求高的复杂情况,每种情境下边界要求又不一样,所以asyncio
里提供通用方法也不合适。
实际项目中往往要求很简单,精度也只需要控制在秒级,下面给出一段代码来实现类似Tornaodo
的PeriodicCallback
。
6.3 回调处理
前面介绍了如何通过三种方式(call_soon()
, call_later()
, call_at()
)去调度回调函数,下面我们介绍协程如何被用作回调函数,以及这些回调函数被调度后,又该如何控制?比如想取消怎么办?
6.3.1 协程作回调函数 在asyncio
中,协程可以通过特定的方法被用作回调函数。这通常涉及到将协程封装成一个可以被事件循环调用的普通函数。这可以通过asyncio.ensure_future
或loop.create_task
来实现。这些函数接受一个协程对象,并返回一个Task
对象,这个Task
对象可以被当作回调函数使用。
示例代码如下:
async def my_coroutine():
# 协程逻辑
...
loop = asyncio.get_event_loop()
task = loop.create_task(my_coroutine())
loop.call_soon(task)
6.3.2 回调处理器 在Python的asyncio
模块中,Handle
和TimerHandle
是与事件循环调度回调相关的类:
asyncio.Handle
用途 :Handle是一个回调包装对象,由loop.call_soon()和loop.call_soon_threadsafe()返回。方法 :
get_context(): 返回与句柄相关联的contextvars.Context对象。这是Python 3.12中的新功能。 cancel(): 取消回调。如果回调已经被取消或执行,此方法无效。 cancelled(): 如果回调已被取消,则返回True。这是Python 3.7中的新功能。 asyncio.TimerHandle
用途 :TimerHandle是Handle的子类,由loop.call_later()和loop.call_at()返回,用于调度具有特定延迟或计划时间的回调。方法 :
when(): 返回计划的回调时间(以浮点秒为单位)。时间是绝对时间戳,使用与loop.time()相同的时间参考。这是Python 3.7中的新功能。 这俩用于管理和控制事件循环中的回调。例如,可能会使用call_later()来调度一些需要在将来某个时刻执行的操作,并使用返回的TimerHandle
来管理这个操作(如取消或查询计划时间)。
示例代码:
import asyncio
import contextvars
# 定义一个简单的回调函数
def my_callback(name):
print(f"Hello, {name}!")
# 设置一个事件循环
loop = asyncio.get_event_loop()
# 调用 call_soon() 来计划回调的执行
handle = loop.call_soon(my_callback, "Alice")
# 检查回调是否被取消
print(f"Cancelled: {handle.cancelled()}")
# 取消回调
handle.cancel()
# 使用 call_later() 来延迟回调的执行
timer_handle = loop.call_later(5, my_callback, "Bob")
# 获取计划执行的时间
print(f"Scheduled at: {timer_handle.when()}")
# 取消计划的回调
timer_handle.cancel()
# 运行一次事件循环以使调度的回调有机会执行
loop.run_until_complete(asyncio.sleep(0.1))
loop.close()
6.4 异步读文件 在asyncio中,由于直接的文件IO操作通常不是非阻塞的,因此asyncio并没有提供直接的异步文件读取API。然而,我们可以通过使用线程池来实现异步读文件的效果。asyncio提供了loop.run_in_executor方法,该方法可以在指定的执行器(例如线程池)中运行给定的函数。
代码示例如下:
import asyncio
async def read_file_async(file_path):
loop = asyncio.get_event_loop()
with open(file_path, 'r') as file:
return await loop.run_in_executor(None, file.read)
# 使用协程读取文件
async def main():
content = await read_file_async('path/to/your/file.txt')
print(content)
asyncio.run(main())
上面只是个基本示例,仿佛看不出来这种异步读文件的用处是什么。首先,在广泛以异步编程为主导的应用中,提倡每一个操作都是一部的,避免阻塞其他协程,这对提高系统的整体效率和响应性非常重要,所以把文件操作也改为非阻塞IO操作 。进一步,异步读取使得CPU可以在等待文件IO完成时执行其他任务,从而提高了资源的利用率 。在处理大文件时,异步读取可以显著提高性能。在用户界面中,也可以即时响应,避免UI卡顿,提供更流畅的用户体验 。
7 asyncio和多线程和多进程搭配 本节将学习为什么有了协程可以大幅提高程序性能的时候,还需要关心多线程和多进程?又如何结合使用?
7.1 理解使用场景 其一,对于I/O密集型或需要执行阻塞调用的任务,例如文件读写或者阻塞的网络通信,使用多线程可以避免阻塞整个事件循环,提高整体性能。
其二,在处理CPU密集型任务时,例如大规模数学计算,使用多进程可以有效利用多核处理器,因为Python的全局解释器锁(GIL)限制了同一时间只有一个线程执行Python字节码。
7.2 结合多线程使用 在使用asyncio
时,会遇到需要执行阻塞I/O操作或调用非异步兼容的库函数的情况。在这种情况下,将这些操作放在单独的线程中执行是一种有效的策略,以避免阻塞整个事件循环。asyncio
提供了loop.run_in_executor()
函数,可以线程池中执行阻塞调用。
run_in_executor
的第一个参数为None则默认在线程池中调度,当需要自定义线程池时可以再手工创建线程池对象传入。
示例代码:
import asyncio
def blocking_io():
# 执行阻塞I/O操作
print("start blocking I/O")
time.sleep(1)
print("end blocking I/O")
async def main():
loop = asyncio.get_running_loop()
# 在默认的线程池中运行阻塞I/O函数
await loop.run_in_executor(None, blocking_io)
print("main continues")
asyncio.run(main())
结合多线程使用的注意事项 需要确保线程安全尤其重要。多个线程可能会同时访问和修改共享资源,如全局变量,从而引发竞态条件。竞态条件可能导致数据不一致或程序行为异常。
可以采取的措施如下:
使用锁 :利用threading.Lock
或asyncio.Lock
(取决于是在普通线程还是在异步协程内)来确保在任何时刻只有一个线程可以访问共享资源。在读操作远多于写操作的场景中,使用读写锁(如threading.RLock)可以提高效率,因为它允许多个读操作同时进行,只在写操作时才需要独占访问。避免共享状态 :尽可能设计无状态的函数或使用局部变量(如threading.local),减少共享状态的使用,从而减少线程间的依赖和交互。线程安全的数据结构 :使用线程安全的队列(如queue.Queue)或其他数据结构(collections.deque)来管理线程间的数据交换。7.3 结合多进程使用 当您有CPU密集型任务时,如大量数学计算或数据处理,最好使用多进程来利用多核CPU。asyncio可以通过concurrent.futures.ProcessPoolExecutor
与多进程结合使用,将计算密集型任务分配到单独的进程中执行。
示例代码:
import asyncio
import concurrent.futures
import math
def compute_some_thing(numbers):
# 一些CPU密集型的计算
return [math.sqrt(number) for number in numbers]
async def main():
numbers = range(10)
loop = asyncio.get_running_loop()
# 创建一个进程池执行器
with concurrent.futures.ProcessPoolExecutor() as pool:
# 在进程池中运行计算密集型任务
result = await loop.run_in_executor(pool, compute_some_thing, numbers)
print("Custom computation results:", result)
asyncio.run(main())
结合多进程使用的注意事项 在结合多进程使用时,线程安全问题的影响会减少,因为每个进程都有自己独立的内存空间,进程间一般不会直接共享相同内存资源。因此,进程间通常不会直接引起竞态条件或数据不一致问题。但引入了新的挑战:
进程间通信 :使用适当的机制(如队列、管道)进行进程间通信。multiprocessing
模块,其中的Queue
和Pipe
等可以用于进程间的数据传递。资源共享 :如果必须共享资源(如文件、数据库),确保每个进程都正确地管理其对这些资源的访问,比如该同步的需要使用锁。同步和状态管理 :在多个进程需要协调工作时,例如在使用共享数据库或文件系统时,需要考虑合适的同步机制,比如使用文件锁或数据库事务来确保操作的原子性和一致性。数据序列化 :在进程间传递数据时,需要进行序列化和反序列化,这可能会引入性能瓶颈,尤其是大量数据时。因此,需要优化序列化过程或减少需要传输的数据量,比如使用比JSON、XML更紧凑许多的protobuff协议。8 常用的异步编程模式和最佳实践 8.1 常用编程模式 生产者-消费者模式 在异步编程中,此模式非常常见,尤其适用于数据的生产和消费速率不一致的场景。它可以有效管理任务队列,提高数据处理效率。
发布-订阅模式 这是事件驱动编程中的经典模式,适用于多个消费者对事件源的响应。在异步编程中,此模式有助于解耦事件的发布者和订阅者。
异步回调模式 前面第6节提到过的内容,尽管Python的async/await语法减少了回调的需求,但在与一些旧的异步库或API交互时,异步回调仍然很重要。
在这里有些同学可能会问:“生产者-消费者模式”和“发布-订阅模式”有什么区别?好像都会用到MQ?
确实在现代的服务端技术中,这两种模式因为高级消息队列的功能丰富已经没有了明显的界限。但在语义明确的特定的上下文中,我们却不会混用这两个术语。生产者-消费者模式更侧重于数据流的处理,数据驱动,队列用于存储待处理的数据项,消费者必须处理自己接受到的数据;发布-订阅模式侧重于消息或事件的分发,事件/消息驱动,队列用作消息代理或事件通道,订阅者只关心自己感兴趣的主题。
8.2 最佳实践和设计建议 模块化设计 :将不同的功能分割成独立的模块或协程,确保代码清晰和可维护。每个协程很小 :尽量满足单一职责原则,让每个协程(或被封装的异步任务)必须尽快返回。避免阻塞调用 :确保不在异步代码中调用阻塞操作,以避免阻塞事件循环。I/O密集型上线程池 :注意确保线程安全。详本文见第7节。CPU密集型上进程池 :注意进程间通信、资源共享和性能。详本文见第7节。利用async/await :尽可能使用async和await关键字,以增强代码的可读性和简洁性,不再推荐其他协程方案。异步上下文管理 :使用异步上下文管理器来处理资源的初始化和清理(with语句)。正确管理任务 :合理使用Task
来管理协程,并注意任务的取消和异常处理。避免死锁 :注意异步同步原语(如锁和事件)的使用,以防止死锁。异常和超时处理正确 :确保异常被捕获并处理,避免未处理的异常导致程序崩溃,确保程序在合适的时间已执行完毕和正确退出。9 Python3.6到3.12中asyncio的变化 由于本文的绝大部分内容写成于4年多以前(这也许打破了技术圈放鸽子的世界记录),为了打消各位看官们心中的疑虑(文中的内容是否过时了?)特此整理了3.6到3.12期间asyncio的变化。我们的内容时不过时的,只是性能有提升、操作接口更简单、安全性更好了。在开发中最大的变化就是某些函数的调用,要从loop.function()
改为asyncio.function
,诸如此类。
asyncio模块被正式纳入稳定API,不再是临时状态。它接收了新特性、显著的可用性和性能改进,以及大量的错误修复。 asyncio模块继续获得新特性和显著的可用性和性能改进。 asyncio和decimal模块开始支持并使用上下文变量(新的contextvars模块引入了对上下文变量的支持)。特别是,活动的小数上下文现在存储在一个上下文变量中,这允许异步代码中的小数操作使用正确的上下文。 asyncio.run()从临时API变为稳定API,用于执行协程并在自动管理事件循环时返回结果。 在Windows上,默认事件循环变为 ProactorEventLoop。 ProactorEventLoop 现在支持UDP和可以被 KeyboardInterrupt 打断。 添加了 asyncio.Task.get_coro() 和任务命名功能。 引入了支持 Happy Eyeballs 算法的 asyncio.loop.create_connection()。 移除了 reuse_address 参数的支持,因为在UDP中有重大安全隐患。 新增了 shutdown_default_executor() 协程,用于关闭默认执行者。 新增了 PidfdChildWatcher,一个特定于Linux的子进程监视器。 引入了 asyncio.to_thread() 协程,用于在单独线程中运行IO绑定函数。 更新了 asyncio.wait_for() 的取消行为。 对 ssl.SSLSocket 的不兼容方法调用现在会引发 TypeError。 添加了缺失的 connect_accepted_socket() 方法。 标记了asynchat, asyncore, smtpd这些模块为已弃用,并在这些模块中添加了导入时的DeprecationWarning。 新增了TaskGroup类,这是一个异步上下文管理器,它可以在退出时等待一组任务全部完成。对于新代码,推荐使用它而不是直接使用create_task()和gather()。 新增了timeout(),这是一个用于设置异步操作超时的异步上下文管理器。对于新代码,推荐使用它而不是直接使用wait_for()。 新增了Runner类,它公开了run()使用的机制。 在asyncio库的同步原语中新增了Barrier类和相关的BrokenBarrierError异常。 在asyncio.loop.create_connection()中新增了all_errors关键字参数,允许将多个连接错误作为ExceptionGroup抛出。 为现有基于流的连接升级到TLS新增了asyncio.StreamWriter.start_tls()方法。 向事件循环添加了原始数据报套接字函数:sock_sendto()、sock_recvfrom()和sock_recvfrom_into(),这些在SelectorEventLoop和ProactorEventLoop中都有实现。 为Task添加了cancelling()和uncancel()方法,主要用于内部使用,特别是由TaskGroup使用。
提高了写入套接字的性能。asyncio现在避免了向套接字写入时不必要的复制,并在平台支持的情况下使用sendmsg()。 添加了asyncio.eager_task_factory()和asyncio.create_eager_task_factory()函数,允许事件循环选择积极任务执行,使某些用例的速度提高2到5倍。 在Linux上,如果os.pidfd_open()可用且功能正常,asyncio默认使用asyncio.PidfdChildWatcher而不是asyncio.ThreadedChildWatcher。 事件循环现在为每个平台使用最佳可用的子进程观察器(如果支持则为asyncio.PidfdChildWatcher,否则为asyncio.ThreadedChildWatcher),因此不建议手动配置子进程观察器。 在asyncio.run()中添加了loop_factory参数,允许指定自定义事件循环工厂。 添加了asyncio.current_task()的C实现,速度提高了4到6倍。 asyncio.iscoroutine()现在对生成器返回False,因为asyncio不支持基于生成器的遗留协程。 asyncio.wait()和asyncio.as_completed()现在接受生成任务的生成器。 从Python 3.6到3.12,asyncio的主要发展方向是增强其稳定性、可用性和性能,同时不断引入新特性。重点包括改善事件循环机制、增强异步任务管理、支持更灵活的上下文变量处理,以及提升网络编程性能。这些改进使asyncio更加高效和易用。
10 总结 本文精华内容主要包括Python asyncio
库的基础概念、使用方法和最佳实践。asyncio
是Python用于编写异步并发代码的库,使用async
和await
语法。主要学习点包括:
原生协程语法 :使用async def
定义原生协程函数,await
用于等待协程执行完成。协程的调用 :通过事件循环和任务(Task
)调用和管理协程。Future和Task :Future
作为异步操作的结果占位符,Task
管理和调度协程。协程的调度与执行 :单个协程的调度、同时执行多个协程,以及相关错误处理。同步操作 :使用锁(Lock
)、事件(Event
)、条件变量(Condition
)、信号量(Semaphore
)等同步机制。队列 :使用异步队列(Queue
, PriorityQueue
, LifoQueue
)在协程之间传递数据。回调函数的调度和执行 :通过事件循环调度回调函数,处理回调函数。asyncio与多线程/多进程结合 :结合多线程处理阻塞I/O操作,多进程处理CPU密集型任务。常用异步编程模式和最佳实践 :包括生产者-消费者、发布-订阅等模式,以及异步编程的最佳实践。Python3.6至3.12中asyncio的变化 :asyncio
库的更新和改进。本文帮助读者理解和掌握Python异步编程的关键概念、技术和最佳实践,特别是对于需要处理I/O密集型或并发任务的应用程序开发者。
*END*
这里是 驹说码事,分享程序猿的码路历程
感谢您的关注