社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

Python学习之路37-使用asyncio包处理并发

VPointer • 7 年前 • 578 次点击  
阅读 13

Python学习之路37-使用asyncio包处理并发

《流畅的Python》笔记。

本篇主要讨论asyncio包,这个包使用事件循环驱动的协程实现并发。

1. 前言

本篇主要介绍如果使用asyncio包将上一篇中线程版的“国旗下载”程序改为协程版本,通过异步非阻塞来实现并发。

说实话,我在读这部分内容的时候是懵逼的,书中阻塞非阻塞、同步异步的概念和我之前的理解有很大差异。之前一直以为同步就意味着阻塞,异步就意味着非阻塞。但其实,阻塞非阻塞与同步异步并没有本质的联系。

同步(Synchronizing)异步(Asynchronizing)是对指令而言的,也就是程序(理解成“函数”会更好一些)。以含有I/O操作的函数为例(被调用方),如果这个函数要等到I/O操作结束,获取了数据,才返回到调用方,这就叫同步(绝大部分函数都同步);反之,不等I/O执行完毕就返回到调用方,获取的数据以其他方式转给调用方,这就叫异步。

阻塞(Blocking)非阻塞(Non-Blocking)是对进程线程而言(为了简洁,只以“线程”为例)。因为某些原因(比如I/O),线程被挂起(被移出CPU),这就叫阻塞;反之,即使因为这些原因,线程依然不被挂起(不被移出CPU),这就叫非阻塞。

可见,这两组概念一共可以组成四种不同情况:同步阻塞(常见),同步非阻塞(不常见),异步阻塞(不常见),异步非阻塞(常见)。

仍以上述I/O函数为例:

  • 如果这个函数的I/O请求已发出,只是单纯地在等服务器发回数据,线程也只是单纯地在等这个函数返回结果,CPU将会把这个线程挂起,这就叫做同步阻塞
  • 如果这个函数中调用的是一个执行复杂计算的子函数,此时,函数依然在等结果没有返回,但线程并不是没有运行,不会被CPU挂起,这就叫做同步非阻塞(“CPU以轮询的方式查看I/O是否结束”更能说明这种情况,但这已是很古老的方式了);
  • 如果这个函数在I/O请求没得到结果之前就返回了,但线程依然在等这个结果(在函数体之外等待使用这个数据),这就叫异步阻塞
  • 如果这个函数在没得到结果之前返回了,线程继续执行其他函数,这就叫做异步非阻塞。更具体一点,这种情况对应的是使用回调实现异步非阻塞的情况;而Python中还有一种情况,也是本篇要讲的,就是使用协程实现异步非阻塞:协程在得到结果前依然不返回,但线程并没有等待,而是去执行其他协程。协程看起来就像同步一样。

由于之前并没有遇到代码世界中的同步非阻塞异步阻塞这两种情况,所以我也不确定上述这两种情况的例子是否准确,欢迎大佬留言指导。但这四种情况在现实生活中就很常见了,下面举个在某处看到的例子:

  • 老张把一普通水壶接上水放火上,眼睛直勾勾盯着等水开,不干其他事,这叫同步阻塞
  • 老张依然用一普通水壶烧水,但把水壶放火上后去客厅看电视,时不时回来看水烧好了没有,这叫同步非阻塞
  • 老张用一能响的水壶烧水,没盯着看,但也没干其他事,只是在那儿发愣。水烧好后,壶可劲儿的响,老张一惊,取走水壶,这叫异步阻塞
  • 老张用一能响的水壶烧水,把壶放火上后去客厅看电视,等壶响了再去拿壶,这叫异步非阻塞

从这四个例子可以看出,阻不阻塞是对老张而言的,在计算机中对应的就是进程线程;同步异步是对水壶而言的,在计算机中对应的就是函数。

有了上述概念后,我们接下来将使用asyncio包,将之前下载国旗的程序改为协程版本。

2. 异步

之前我们使用线程实现了并发下载数据,它是同步阻塞的,因为一到I/O操作,线程就被阻塞,然后调入新的线程。现在,我们将实现一个异步非阻塞版本。但从上述介绍知道,异步有两种方式:回调和协程。本文并不会实现回调版本的“下载国旗”,提出回调只是为了和协程进行比较。

2.1 回调

举个例子说明回调。在调用函数A时除了传入必要的参数外,还传入一个参数:函数B。A中有一些费时的操作,比如I/O,A在没得到结果之前就返回,而将等待结果以及进行后续处理的事情交给函数B。这个过程就是回调,函数B就称为回调函数

这种编程方式不太符合人的思维习惯,代码也不易于理解,情况一复杂,就很可能遇到**“回调地狱”**:多层嵌套回调。下面是一个JavaScript中使用回调的例子,它嵌套了3层:

// 代码2.1
api_call1(request1, function (response1){  // 多么痛的领悟
    var request2 = step1(response1);  // 第一步
    api_call2(request2, function (response2){
        var request3 = step2(response2);  // 第二步
        api_call3(request3, function (response3){
            step(response3);  // 第三步
        })
    })
})
复制代码

api_call1api_call2api_call3都是库函数,用于异步获取结果。JavaScript中常用匿名函数作为回调函数。下面我们使用Python来实现上述代码,上述三个匿名函数分别命名为stage1stage2stage3

# 代码2.2
def stage1(response1):
    request2 = step1(response1)
    api_call2(request2, stage2)

def stage2(response2):
    request3 = step2(response2)
    api_call3(request3, stage3)

def stage3(response3):
    step3(response3)

api_call1(request1, stage1)  # 代码从这里开始执行
复制代码

可见,即使用Python写,也不容易理解,这要是再多嵌套几层,不逼疯已经不错了。而且,如果要在stage2中使用request2,还得使用闭包,这就又变成了嵌套定义函数的情况。并且上述代码还没有考虑抛出异常的情况:在基于回调的API中,这个问题的解决办法是为每个异步调用注册两个回调,一个用于处理操作成功时返回的结果,一个用于处理错误。可以看出,一旦涉及错误处理,回调将更可怕。

2.2 协程

现在我们用协程来改写上述代码:

# 代码2.3
import asyncio

@asyncio.coroutine
def three_stages


    
(request1):
    response1 = yield from api_call1(request1)
    request2 = step1(response1)
    response2 = yield from api_call2(request2)
    request3 = step2(response2)
    response3 = yield from api_call3(request3)
    step3(response3)

loop = asyncio.get_event_loop()
loop.create_task(three_stages(request1))
复制代码

与前面两个版本的回调相比,这个版本的代码将3个步骤依次写在同一函数中,易于理解,这样看起来是不是也更像同步函数?如果要处理异常,只需要相应的yield from语句处添加try/except即可。

但也别急着把这称为“协程天堂”,因为:

  • 不能使用常规函数,必须使用协程,而且要习惯yield from语句;
  • 不能直接调用协程。即,不能像直接调用api_call1(request1)那样直接调用three_stages(request1),必须使用事件循环(上面的loop)来驱动协程。

但不管怎样,代码读起来和写起来比回调简单多了,尤其是嵌套回调。

小技巧:读协程的代码时,为了便于理解代码的意思,可以直接将yield from关键字忽略掉。

2.3 下载国旗批量版

下面我们开始实现协程版本的“下载国旗”。

为了将其改为协程版本,我们不能使用之前的requests包,因为它会阻塞线程,改为使用aiohttp包。为了尽量保持代码的简洁,这里不处理异常。下方是完整的代码,代码中我们使用了新语法。以下代码的基本思路是:在一个单线程程序中使用主循环一次激活队列中的协程,各个协程向前执行几步,然后把控制权让给主循环,主循环再激活队列中的下一个协程

# 代码2.4
import aiohttp, os, sys, time, asyncio   # 代码中请勿这么写,这里只是为了减少行数

POP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR").split()
BASE_URL = "http://flupy.org/data/flags"
DEST_DIR = "downloads/"

def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, "wb") as fp: 
        fp.write(img)

def show(text):
    print(text, end=" ")
    sys.stdout.flush()

async def get_flag(cc):   # aiohttp只支持TCP和UDP请求
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    async with aiohttp.ClientSession() as session: # <1> 开启一个会话
        async with session.get(url) as resp:   # 发送请求
            image = await resp.read()   # 读取请求
    return image

async def download_one(cc):
    image = await get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + ".gif")
    return cc

def download_many(cc_list):
    loop = asyncio.get_event_loop()   # 获取事件循环
    to_do = [download_one(cc) for cc in sorted(cc_list)]  # 生成协程列表
    wait_coro = asyncio.wait(to_do)   # 将协程包装成Task类,wait_coro并不是运行结果!而是协程!
    res, _ = loop.run_until_complete(wait_coro) # 驱动每个协程运行
    loop.close()   # 循环结束
    return len(res)

def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = "\n{} flags downloaded in {:.2f}s"
    print(msg.format(count, elapsed))

if __name__ == "__main__":
    main(download_many)

# 结果:
VN TR FR DE IN ID RU NG CN EG BR MX PH CD IR PK ET JP BD US 
20 flags downloaded in 1.27s
复制代码

解释:

①这里使用了新的语法async/await。再Python3.5之前,如果想定义一个协程只能延用函数的定义方式def,然后在定义体里面使用yieldyield from。如果想把一个函数更明确地声明为协程(或者说异步函数),还可以使用asyncio中的coroutine装饰器,但这么做是不是挺麻烦的?从Python3.5起,可以明确**使用async来定义协程(异步函数)**和异步生成器。使用async则可以省略掉@asyncio.coroutine装饰器;在用async修饰的协程的定义体中可以使用yield关键字,但不能使用yield from,它必须被替换为await,即使yield from后面只是一个普通的生成器;从由async修饰的协程或生成器中获取数据时,必须使用await

②如果要使用@asyncio.coroutine装饰器明确声明协程,那么在协程定义体内部只能使用yield from,不能使用yield,因为使用到yield的地方已经在asyncio中全部封装成了函数或者方法。最新版的@asyncio.coroutine也可以装饰async修饰的协程,这种情况下coroutine不做任何事,只是原封不动的返回被装饰的协程。

③ <1>处的代码之所以改用async with(异步上下文管理器),是因为新版asyncio并不支持书中的旧语法yield from aiohttp.request("GET", url)。关于async/awaitasync with/async for的相关内容将在后续文章中介绍,这里只需要知道async对应于@asyncio.coroutineawait对应于yield from即可。

④我们将get_flag改成了协程版本,并使用aiohttp来实现异步请求;download_one函数也随之变成了协程版本。

download_many只是一个普通函数,它要驱动协程运行。在这个函数中,我们通过asyncio.get_event_loop()创建事件循环(实质就是一个线程)来驱动协程的运行。接着生成含20个download_one协程的协程列表to_do,随后再调用asyncio.wait(to_do)将这个协程列表包装成一个wait协程,取名为wait_corowait协程会将to_do中所有的协程包装成Task对象(Future的子类),再形成列表。最后,我们通过loop.run_until_complete(wait_coro)驱动协程wait_coro运行。整个的驱动链是这样的:loop.run_until_complete驱动协程wait_corowait_coro再在内部驱动20个协程。

wait协程最后会返回一个元组,第一个元素是完成的协程数,第二个是未完成的协程数loop.run_until_complete返回传入的协程的返回值(实际代码是Future.result())。有点绕,其实就是wait_coro最后返回一个元组给run_until_completerun_until_complete再把这个值返回给调用方。

⑦在上一篇中,我们知道concurrent.futures中有一个Future,且通过它的result方法获取最后运行的结果;在asyncio包中,不光有Future,还有它的子类Task,但获取结果通常并不是调用result方法,而是通过yield fromawait,即yield from future获取结果。asyncio.Future类的result方法没有参数,不能设置超时时间;如果调用resultfuture还未运行完毕,它并不会阻塞去等待结果,而是抛出asyncio.InvalidStateError异常。

2.4 下载国旗改进版

上一篇中,我们除了使用Executor.map()批量处理线程之外,我们还使用了concurrent.futures.as_completed()挨个迭代运行完的线程返回的结果。asyncio也实现了这个方法,我们将使用这个函数改写上方的代码。

还有一个问题:我们往往只关注了网络I/O请求,常常忽略本地的I/O操作。线程版本中的save_flag函数也是会阻塞线程的,因为它操作了磁盘。但由于图片太小,速度太快,我们感觉并不明显,如果换成更高像素的图片,这种速度差异就会很明显。我们将会以某种方式使其避免阻塞线程。下面是改写的代码:

# 代码2.5
import asyncio, os, sys, time, aiohttp

async def download_one(cc, semaphore):
    async with semaphore:
        image = await get_flag(cc)
    loop = asyncio.get_event_loop()
    loop.run_in_executor(None, save_flag, image, cc + ".gif")
    return cc

async def download_coro(cc_list, concur_req):


    

    semaphore = asyncio.Semaphore(concur_req)  # 它是一个信号量,用于控制并发量
    to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
    to_do_iter = asyncio.as_completed(to_do)
    for future in to_do_iter:
        res = await future
        print("Downloaded", res)

def download_many(cc_list, concur_req):  # 变化不大
    loop = asyncio.get_event_loop()
    coro = download_coro(cc_list, concur_req)
    loop.run_until_complete(coro)
    loop.close()

if __name__ == "__main__":
    t0 = time.time()
    download_many(POP20_CC, 1000)  # 第二个参数表示最大并发数
    print("\nDone! Time elapsed {:.2f}s.".format(time.time() - t0))

# 结果:
Downloaded BD
Downloaded CN
-- snip --
Downloaded US

Done! Time elapsed 1.21s.
复制代码

上述代码有3个地方值得关注:

  • asyncio.as_completed()元素为协程的可迭代对象为参数,但自身并不是协程,只是一个生成器。它在内部将传入的协程包装成Task,然后返回一个生成器,产出协程的返回值。这个生成器按协程完成的顺序生成值(先完成先产出),而不是按协程在迭代器中的顺序生成值。
  • asyncio.Semaphore是个信号量类,内部维护这一个计数器,调用它的acquire方法(这个方法是个协程),计数器减一;对其调用release方法(这个方法不是协程),计数器加一;当计数器为0时,会阻塞调用这个方法的协程。
  • 我们将save_flag函数放到了其他线程中,loop.run_in_executor()的第一个参数是Executor实例,如果为None,则使用事件循环的默认ThreadPoolExecutor实例。余下的参数是可调用对象,以及可调用对象的位置参数。

3. 总结

本章开篇介绍了阻塞非阻塞、同步异步的概念,然后介绍了异步的两种实现方式:回调和协程。并通过代码比较了回调和协程的实现方式。然后我们使用asyncioaiohttp两个库,将之前线程版本的下载国旗程序改为了协程版本。可惜我也是刚接触协程不久,写的内容不一定准确,尤其是关于asyncio的内容,这个库之前是一点都没接触过。后面我会专门研究Python中的协程,以及asyncio的实现,争取把这部分内容彻底搞懂。


迎大家关注我的微信公众号"代码港" & 个人网站 www.vpointer.net ~


今天看啥 - 高品质阅读平台
本文地址:http://www.jintiankansha.me/t/LXtrnuKrNP
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/23002
 
578 次点击