社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

Python 神器 Celery 源码阅读 (1)

Python编程时光 • 2 年前 • 270 次点击  

Celery是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的工具。Celery 是一款消息队列工具,可用于处理实时数据以及任务调度。

Celery在github上有18k的star和4.1k的fork,非常受欢迎;截止当前总共发布了216个版本,最近一个版本是19天前,更新非常活跃。如果你需要一个python实现的任务调度框架,首推就是它了。

从这周开始,我们一起阅读celery的源码,学习如何使用celery,了解分布式任务调度框架是如何构建,深入celery的实现细节。celery代码量比较大,预计需要3~4周的时间吧。话不多说,一起开始,本周内容主要包括下面几个部分:

  • 任务应用场景
  • celery的项目结构
  • promise库的实现
  • 小结
  • 小技巧

另外非常重要的一点是,考虑到之前的文章,在源码和解读上做的不够好,这次做一个小小的改进。就是我会把项目源码增加我自己的注释,上传到github上。这样想了解细节实现的小伙伴请使用【阅读原文】。我的源码阅读项目 yuanmahui,也欢迎大家在github上点赞支持❤。

celery应用场景

我们有这样一个flask实现的web服务,其中hello接口,需要进行外部调用(比如发送发送短信验证码之类)。这里我们使用 time.sleep 模拟这个耗时的操作:

import time

from flask import Flask

app = Flask(__name__)


@app.route('/')
def hello():
    time.sleep(1)  # 模拟一下耗时操作
    return 'Hello, World!'

可以使用ab来验证这个接口的耗时情况:

# ab -n 10 -c 5 http://127.0.0.1:5000/
...
Requests per second:    3.30 [#/sec] (mean)
Time per request:       1513.100 [ms] (mean)
Time per request:       302.620 [ms] (mean, across all concurrent requests)
Transfer rate:          0.54 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.1      0       0
Processing:  1007 1008   1.2   1008    1010
Waiting:     1005 1007   1.0   1007    1008
Total:       1007 1008   1.2   1009    1010
...

测试显示这个接口的平均耗时需要1.5秒左右,响应缓慢。同时,这样的接口还会导致前端页面的卡顿。要解决这个问题,就可以使用任务调度的方式,把这个耗时操作转换成背景任务,同时及时返回http响应, 调整方法如下:

....

@app.route('/')
def hello():
    # time.sleep(1)
    do_task(id(request))  # 启动背景任务
    return 'Hello, World!'


def do_task(index):
    t = threading.Thread(target=lambda idx: time.sleep(1), args=(index,))
    t.start()

do_task中新开了一个任务线程去执行这个 time.sleep 操作,当前的线程在启动任务线程后立即返回。再次使用ab对接口耗时进行测试:

...
Time per request:       6.304 [ms] (mean)
...

可以发现使用任务方式后,hello接口的响应效率有了巨大的提升。当然这个简单的任务调度还有2个问题:

  1. 任务执行的结果没法返回给前端
  2. 任务和web服务在一个进程里执行,效率不会太高 Celery的分布式任务调度就可以比较好的解决这2个问题。

celery的项目结构

celery我们选用 5.0.5 版本,首先requirements/default.txt文件描述主要依赖下面几个库:

  • billiard celery项目提供的一个多进程池的实现
  • kombu celery项目提供的一个消息库,可以对接不同的消息队列,比如RabbitMQ,Redis
  • vine celery项目提供的一个promise实现,可以处理任务的组合和pipline等

celery支持下面3种工作模式:

  • beat 使用定时心跳的方式启动
  • multi 使用集群方式启动,会形成多个工作进程
  • worker 普通的工作进程方式启动

celery任务执行的结果也支持多种存储方式:

  • Mongo
  • Redis
  • Elasticsearch
  • ...

celery的并发也支持多种实现

  • 多进程的fork
  • gevent
  • 多线程
  • eventlet
  • ...

celery支持工作流

  • 可以根据函数签名进行调度
  • 可以支持链式任务
  • 可以支持分组,和弦...

celery的项目结构就简单介绍这些,后续章节再进行详细介绍。

promise库的实现

promise在异步任务中非常重要,所以celery有个vine项目实现了promise功能,在开始celery之前,我们先扫清这些外围障碍。

promise 简介

Promise是一个对象,它代表了一个异步操作的最终完成或者失败。我觉得MDN中的介绍非常好,我们先了解它,再对比看看Python中如何实现它。比如有一个创建音频文件的操作,成功和失败的时候使用不同的输出:

// 成功的回调函数
function successCallback(result) {
  console.log("音频文件创建成功: " + result);
}

// 失败的回调函数
function failureCallback(error) {
  console.log("音频文件创建失败: " + error);
}

createAudioFileAsync(audioSettings, successCallback, failureCallback)

传统的方式就是使用callback方式调用,把正确和错误的回调传入执行函数createAudioFileAsync中。如果使用Promise方式就会变成:

# 创建一个Promise对象
const promise = createAudioFileAsync(audioSettings);
# 执行这个Promise对象
promise.then(successCallback, failureCallback);

虽然上面代码使用的是JavaScript,我相信熟悉python的你也可以正确理解。就上面的例子,还不容易看出Promise的优点。继续看下面的例子:

doSomething(function(result) {
  doSomethingElse(result, function(newResult) {
    doThirdThing(newResult, function(finalResult) {
      console.log('Got the final result: ' + finalResult);
    }, failureCallback);
  }, failureCallback);
}, failureCallback);

这里是一个多重回调的实现,先处理doSomething,收到结果后再处理doSomethingElse,最后再执行doThirdThing。仅仅3层回调还可以接收,如果回调多了以后就形成回调地狱,代码丑陋且难用。如果使用Promise实现,就会变成:

doSomething().then(function(result) {
  return doSomethingElse(result);
})
.then(function(newResult) {
  return doThirdThing(newResult);
})
.then(function(finalResult) {
  console.log('Got the final result: ' + finalResult);
})
.catch(failureCallback);

可以发现使用Promise方式后,代码会变扁平,非常清爽。这里展示了Promise的2个特点:

  • then函数执行后返回的是一个Promise对象
  • Promise可以进行链式(chain)调用

简单理解Promise后,我们再回头看doSomething,doSomethingElse和doThirdThing,如果这是3个任务,需要按顺序调度执行,后者需要前者的执行结果作为参数?所以可以知道,从逻辑上讲,Promise功能对于任务调度,非常重要。这种实现方式,和语言其实无关。

Promise的实现

先单元测试用例看从promise的使用:

def test_signal(self):
    # mock一个函数
    callback = Mock(name='callback')
    # 创建一个Promise对象
    a = promise()
    # 使用then函数添加callback
    a.then(callback)
    # 执行Promise对象
    a(42)
    # 函数被调用,参数为42
    callback.assert_called_once_with(42)

不难看出python版本的Promise使用和JavaScript版本没有太大区别。promise的构造函数如下:

class promise:
    ...
    def __init__(self, fun=None, args=None, kwargs=None,
             callback=None, on_error=None, weak=False,
             ignore_result=False):
    self.weak = weak
    self.ignore_result = ignore_result
    # 要执行的函数
    self.fun = self._get_fun_or_weakref(fun=fun, weak=weak)
    # 注意位置参数是元祖,这样才可以在call里叠加
    self.args = args or ()
    # 关键字参数是字典
    self.kwargs = kwargs or {}
    # ready,failed,cancelled 三个状态,默认都是false
    self.ready = False
    self.failed = False
    self.value = None
    self.reason = None
    # Optimization
     # Most promises will only have one callback, so we optimize for this
    # case by using a list only when there are multiple callbacks.
    #   s(calar) pending / l(ist) pending
    # 单个callback/多个callback
    self._svpending = None
    self._lvpending = None
    self.on_error = on_error
    self.cancelled = False
    # 可见callback可以通过参数传递,也可以通过then函数传递
    if callback is not None:
        self.then(callback)

每个Promise有3个状态位:ready,cancelled和failed,默认都是false。

重点就是then函数:

def then(self, callback, on_error=None):
    # callback是普通函数,就用promise再嵌套一下
    if not isinstance(callback, Thenable):
        callback = promise(callback, on_error=on_error)
    if self.cancelled:
        callback.cancel()
        return callback
    if self.failed:
        callback.throw(self.reason)
    elif self.ready:
        args, kwargs = self.value
        callback(*args, **kwargs)
    if self._lvpending is None:
        svpending = self._svpending
        if svpending is not None:
            self._svpending, self._lvpending = None, deque([svpending])
        else:
            # 初始复制callback给_svpending
            # 就是一种递归
            self._svpending = callback
            return callback
    # 添加到右侧
    self._lvpending.append(callback)
    # 返回的是一个promise可以继续then,实现a.then(fun_x).then(fun_y).then(fun_z)这样的链式调用
    return callback

在JavaScript版本已经介绍过Promise的2个特性就是,then返回一个新的Promise对象,又由于返回的是Promise对象,又可以继续执行then函数添加新的callback链式调用。

Promise对象的执行就在魔法函数 *__call__:

def __call__(self, *args, **kwargs):
    retval = None
    if self.cancelled:
        return
    # 叠加参数
    final_args = self.args + args if args else self.args
    final_kwargs = dict(self.kwargs, **kwargs) if kwargs else self.kwargs
    # self.fun may be a weakref
    fun = self._fun_is_alive(self.fun)
    if fun is not None:
        try:
            if self.ignore_result:
                fun(*final_args, **final_kwargs)
                ca = ()
                ck = {}
            else:
                # 执行函数
                retval = fun(*final_args, **final_kwargs)
                self.value = (ca, ck) = (retval,), {}
        except Exception:
            # 异常
            return self.throw()
    else:
        self.value = (ca, ck) = final_args, final_kwargs
    # 更改ready状态
    self.ready = True
    svpending = self._svpending
    # 执行callback,把fun执行的结果往callback里传入这样形成pipeline
    if svpending is not None:
        try:
            svpending(*ca, **ck)
        finally:
            self._svpending = None
    else:
        lvpending = self._lvpending
        try:
            while lvpending:
                # 从左开始执行
                p = lvpending.popleft()
                p(*ca, **ck)
        finally:
            self._lvpending = None
    return retval

上述代码主要步骤有:

  • 合并Promise对象的参数和函数调用参数
  • 如果有初始函数,则执行初始函数
  • 更改Promise对象的ready状态
  • 执行callback

链式调用的示例,请看:

def test_chained(self):

    def add(x, y):
        return x + y

    def pow2(x):
        return x ** 2

    adder = Mock(name='adder')
    adder.side_effect = add

    power = Mock(name='multiplier')
    power.side_effect = pow2

    final = Mock(name='final')

    p = promise()
    # 链式调用(注意是有序的)
    p.then(adder).then(power).then(final)

    p(42, 42)
    assert p.value == ((42, 42), {})
    adder.assert_called_with(42, 42)
    power.assert_called_with(84)
    final.assert_called_with(7056)

Barrier的实现

vine还提供了一个叫做barrier的实现,处理多个Promise对象的串行化,下面是单元测试:




    
class test_barrier:

    def setup(self):
        self.m1, self.m2, self.m3 = Mock(), Mock(), Mock()
        self.ps = [promise(self.m1), promise(self.m2), promise(self.m3)]

    def test_evaluate(self):
        # 需要执行4才才变成ready
        x = barrier(self.ps)
        x()
        assert not x.ready
        x()
        assert not x.ready
        x.add(promise())
        x()
        assert not x.ready
        x()
        assert x.ready
        x()
        x()
        # 已经执行完成继续添加会报错
        with pytest.raises(ValueError):
            x.add(promise())

使用barrier后,4个Promise对象的调用可以串行化,并且可以单步执行。执行一次x()消耗一个Promise。

barrier的构造函数:

class barrier:
    
    def __init__(self, promises=None, args=None, kwargs=None,
                 callback=None, size=None):
        # Promise的实现
        self.p = promise()
        self.args = args or ()
        self.kwargs = kwargs or {}
        self._value = 0
        self.size = size or 0
        if not self.size and promises:
            # iter(l) calls len(l) so generator wrappers
            # can only return NotImplemented in the case the
            # generator is not fully consumed yet.
            plen = promises.__len__()
            if plen is not NotImplemented:
                self.size = plen
        self.ready = self.failed = False
        self.reason = None
        self.cancelled = False
        self.finalized = False
        # 列表推导式
        [self.add_noincr(p) for p in promises or []]
        self.finalized = bool(promises or self.size)
        if callback:
            self.then(callback)

barrier重点是默认有一个promise实现,用来作为整个批处理的尾部。参数中的promises列表通过add_noincr函数形成调用链:

def add_noincr(self, p):
    if not self.cancelled:
        # 已经完成了就不能够再添加了
        if self.ready:
            raise ValueError('Cannot add promise to full barrier')
        # 其实就是then().then().then() 添加到自己之前,自己主要执行最开始定义的callback
        p.then(self)

每执行一次进行计数,直到执行完成后更改状态和执行自身(尾部)的Promise对象

def __call__(self, *args, **kwargs):
    # 判断是否已经执行完成:ready和cancelled
    if not self.ready and not self.cancelled:
        self._value += 1
        if self.finalized and self._value >= self.size:
            self.ready = True
            self.p(*self.args, **self.kwargs)

小结

我们了解的celery是python实现的一个任务调度系统,在github上广受欢迎,更新活跃。学习可以使用任务调度方式,帮助我们处理web服务中一些耗时任务。简单了解celery项目的一些特点,从celery的依赖项目vine开始,了解Promise在任务调度系统中的应用。最后从vine项目源码中学习,如何创建一个Promise系统。

小技巧

对于抽象的实现,在Python中除了可以使用继承方式,还可以使用组合Mixin。比如下面:

class Thenable(Callable, metaclass=abc.ABCMeta):  # pragma: no cover
    ...
    @abc.abstractmethod
    def then(self, on_success, on_error=None):
        """成功和失败的2个回调"""
        raise NotImplementedError()
    
class CanThen:

    def then(self, x, y):
        pass
        
assert isinstance(CanThen(), Thenable)

可以看到CanThen实现then函数后,就可以被认定为Thenable的实现,但是CanThen并未继承自Thenable。这个魔法主要是由ABCMeta的register和__subclasshook__两个方法实现:

class Thenable(Callable, metaclass=abc.ABCMeta): 
    
    ...
    
    @classmethod
    def __subclasshook__(cls, C):
        # 也由ABCMeta提供
        if cls is Thenable:
            if any('then' in B.__dict__ for B in C.__mro__):
                return True
        return NotImplemented

    @classmethod
    def register(cls, other):
        # overide to return other so `register` can be used as a decorator
        # 这个register方法是由ABCMeta提供,其实现类使用装饰器方式
        # https://docs.python.org/zh-cn/3/library/abc.html
        type(cls).register(cls, other)
        return other

@Thenable.register
class promise:
    pass

assert isinstance(promise(lambda x: x), Thenable)

promise类经过Thenable.register类装饰圈注释一下后,就可以被认定位Thenable的实现,并不需要显示的编写继承。

参考链接

  • Celery中文手册 https://www.celerycn.io/
  • JavaScript的promise实现 https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Guide/Using_promises
  • 源码汇 https://github.com/game404/yuanmahui





Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/120532
 
270 次点击