社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

Python源码学习Schedule

GoT阳仔 • 6 年前 • 96 次点击  
阅读 28

Python源码学习Schedule

关于我
编程界的一名小小程序猿,目前在一个创业团队任team lead,技术栈涉及Android、Python、Java和Go,这个也是我们团队的主要技术栈。 联系:hylinux1024@gmail.com

上一篇介绍了一个简单的Python调度器的使用,后来我翻阅了一下它的源码,惊奇的发现核心库才一个文件,代码量短短700行不到。这是绝佳的学习材料。
让我喜出望外的是这个库的作者竟然就是我最近阅读的一本书《Python Tricks》的作者!现在就让我们看看大神的实现思路。

0x00 准备

项目地址

github.com/dbader/sche…

将代码checkout到本地

环境

PyCharm+venv+Python3

0x01 用法

这个在上一篇也介绍过了,非常简单

import schedule

# 定义需要执行的方法
def job():
    print("a simple scheduler in python.")

# 设置调度的参数,这里是每2秒执行一次
schedule.every(2).seconds.do(job)

if __name__ == '__main__':
    while True:
        schedule.run_pending()

# 执行结果
a simple scheduler in python.
a simple scheduler in python.
a simple scheduler in python.
...
复制代码

这个库的文档也很详细,可以浏览 schedule.readthedocs.io/ 了解库的大概用法

0x02 项目结构

(venv) ➜  schedule git:(master) tree -L 2
.
...
├── requirements-dev.txt
├── schedule
│   └── __init__.py
├── setup.py
├── test_schedule.py
├── tox.ini
└── venv
    ├── bin
    ├── include
    ├── lib
    ├── pip-selfcheck.json
    └── pyvenv.cfg

8 directories, 18 files

复制代码
  • schedule目录下就一个__init__.py文件,这是我们需要重点学习的地方。
  • setup.py文件是发布项目的配置文件
  • test_schedule.py是单元测试文件,一开始除了看文档外,也可以从单元测试中入手,了解这个库的使用
  • requirements-dev.txt 开发环境的依赖库文件,如果核心的库是不需要第三方的依赖的,但是单元测试需要
  • venv是我checkout后创建的,原本的项目是没有的

0x03 schedule

我们知道__init__.py是定义Python包必需的文件。在这个文件中定义方法、类都可以在使用import命令时导入到工程项目中,然后使用。

schedule 源码

以下是schedule会用到的模块,都是Python内部的模块。

import collections
import datetime
import functools
import logging
import random
import re
import time

logger = logging.getLogger('schedule')
复制代码

然后定义了一个日志打印工具实例

接着是定义了该模块的3个异常类的结构体系,是由Exception派生出来的,分别是ScheduleErrorScheduleValueErrorIntervalError

class ScheduleError(Exception):
    """Base schedule exception"""
    pass

class ScheduleValueError(ScheduleError):
    """Base schedule value error"""
    pass

class IntervalError(ScheduleValueError):
    """An improper interval was used"""
    pass

复制代码

还定义了一个CancelJob的类,用于取消调度器的继续执行

class CancelJob(object):
    """
    Can be returned from a job to unschedule itself.
    """
    pass

复制代码

例如在自定义的需要被调度方法中返回这个CancelJob类就可以实现一次性的任务

# 定义需要执行的方法
def job():
    print("a simple scheduler in python.")
    # 返回CancelJob可以停止调度器的后续执行
    return schedule.CancelJob
复制代码

接着就是这个库的两个核心类SchedulerJob

class Scheduler(object):
    """
    Objects instantiated by the :class:`Scheduler <Scheduler>` are
    factories to create jobs, keep record of scheduled jobs and
    handle their execution.
    """
    
class Job(object):
    """
    A periodic job as used by :class:`Scheduler`.

    :param interval: A quantity of a certain time unit
    :param scheduler: The :class:`Scheduler <Scheduler>` instance that
                      this job will register itself with once it has
                      been fully configured in :meth:`Job.do()`.

    Every job runs at a given fixed time interval that is defined by:

    * a :meth:`time unit <Job.second>`
    * a quantity of `time units` defined by `interval`

    A job is usually created and returned by :meth:`Scheduler.every`
    method, which also defines its `interval`.
    """
复制代码

Scheduler是调度器的实现类,它负责调度任务(job)的创建和执行。

Job则是对需要执行任务的抽象。

这两个类是这个库的核心,后面我们还会看到详细的分析。
接下来就是默认调度器default_scheduler和任务列表jobs的创建。

# The following methods are shortcuts for not having to
# create a Scheduler instance:

#: Default :class:`Scheduler <Scheduler>` object
default_scheduler = Scheduler()

#: Default :class:`Jobs <Job>` list
jobs = default_scheduler.jobs  # todo: should this be a copy, e.g. jobs()?
复制代码

在执行import schedule后,就默认创建了default_scheduler。而Scheduler 的构造方法为

def __init__(self):
    self.jobs = []
复制代码

在执行初始化时,调度器就创建了一个空的任务列表。

在文件的最后定义了一些链式调用的方法,使用起来也是非常人性化的,值得学习。 这里的方法都定义在模块下,而且都是封装了default_scheduler实例的调用。

def every(interval=1):
    """Calls :meth:`every <Scheduler.every>` on the
    :data:`default scheduler instance <default_scheduler>`.
    """
    return default_scheduler.every(interval)


def run_pending():
    """Calls :meth:`run_pending <Scheduler.run_pending>` on the
    :data:`default scheduler instance <default_scheduler>`.
    """
    default_scheduler.run_pending()


def run_all(delay_seconds=0):
    """Calls :meth:`run_all <Scheduler.run_all>` on the
    :data:`default scheduler instance <default_scheduler>`.
    """
    default_scheduler.run_all(delay_seconds=delay_seconds)


def clear(tag=None):
    """Calls :meth:`clear <Scheduler.clear>` on the
    :data:`default scheduler instance <default_scheduler>`.
    """
    default_scheduler.clear(tag)


def cancel_job(job):
    """Calls :meth:`cancel_job <Scheduler.cancel_job>` on the
    :data:`default scheduler instance <default_scheduler>`.
    """
    default_scheduler.cancel_job(job)


def next_run():
    """Calls :meth:`next_run <Scheduler.next_run>` on the
    :data:`default scheduler instance <default_scheduler>`.
    """
    return default_scheduler.next_run


def idle_seconds():
    """Calls :meth:`idle_seconds <Scheduler.idle_seconds>` on the
    :data:`default scheduler instance <default_scheduler>`.
    """
    return default_scheduler.idle_seconds
复制代码

我们看下入口方法run_pending(),从本文一开头的Demo可以知道这个是启动调度器的方法。这里它执行了default_scheduler中的方法。

default_scheduler.run_pending()
复制代码

所以我们就把目光定位到Scheduler类的相应方法

def run_pending(self):
    """
    Run all jobs that are scheduled to run.

    Please note that it is *intended behavior that run_pending()
    does not run missed jobs*. For example, if you've registered a job
    that should run every minute and you only call run_pending()
    in one hour increments then your job won't be run 60 times in
    between but only once.
    """
    runnable_jobs = (job for job in self.jobs if job.should_run)
    for job in sorted(runnable_jobs):
        self._run_job(job)
复制代码

这个方法中首先从jobs列表将需要执行的任务过滤后放在runnable_jobs列表,然后将其排序后顺序执行内部的_run_job(job)方法

def _run_job(self, job):
    ret = job.run()
    if isinstance(ret, CancelJob) or ret is CancelJob:
        self.cancel_job(job)
复制代码

_run_job方法中就调用了job类中的run方法,并根据返回值判断是否需要取消任务。

这时候我们要看下Job类的实现逻辑。

首先我们要看下Job是什么时候创建的。还是从Demo中的代码入手

schedule.every(2).seconds.do(job)
复制代码

这里先执行了schedule.every()方法

def every(interval=1):
    """Calls :meth:`every <Scheduler.every>` on the
    :data:`default scheduler instance <default_scheduler>`.
    """
    return default_scheduler.every(interval)
复制代码

这个方法就是scheduler类中的every方法

def every(self, interval=1):
    """
    Schedule a new periodic job.

    :param interval: A quantity of a certain time unit
    :return: An unconfigured :class:`Job <Job>`
    """
    job = Job(interval, self)
    return job
复制代码

在这里创建了一个任务job,并将参数intervalscheduler实例传入到构造方法中,最后返回job实例用于实现链式调用。

跳转到Job的构造方法

def __init__(self, interval, scheduler=None):
    self.interval = interval  # pause interval * unit between runs
    self.latest = None  # upper limit to the interval
    self.job_func = None  # the job job_func to run
    self.unit = None  # time units, e.g. 'minutes', 'hours', ...
    self.at_time = None  # optional time at which this job runs
    self.last_run = None  # datetime of the last run
    self.next_run = None  # datetime of the next run
    self.period = None  # timedelta between runs, only valid for
    self.start_day = None  # Specific day of the week to start on
    self.tags = set()  # unique set of tags for the job
    self.scheduler = scheduler  # scheduler to register with
复制代码

主要初始化了间隔时间配置、需要执行的方法、调度器各种时间单位等。

执行every方法之后又调用了seconds这个属性方法

@property
def seconds(self):
    self.unit = 'seconds'
    return self
复制代码

设置了时间单位,这个设置秒,当然还有其它类似的属性方法minuteshoursdays等等。

最后就是执行了do方法

def do(self, job_func, *args, **kwargs):
    """
    Specifies the job_func that should be called every time the
    job runs.

    Any additional arguments are passed on to job_func when
    the job runs.

    :param job_func: The function to be scheduled
    :return: The invoked job instance
    """
    self.job_func = functools.partial(job_func, *args, **kwargs)
    try:
        functools.update_wrapper(self.job_func, job_func)
    except AttributeError:
        # job_funcs already wrapped by functools.partial won't have
        # __name__, __module__ or __doc__ and the update_wrapper()
        # call will fail.
        pass
    self._schedule_next_run()
    self.scheduler.jobs.append(self)
    return self
复制代码

在这里使用functools工具的中的偏函数partial将我们自定义的方法封装成可调用的对象

然后就调用_schedule_next_run方法,它主要是对时间的解析,按照时间对job排序,我觉得这个方法是本项目中的技术点,逻辑也是稍微复杂一丢丢,仔细阅读就可以看懂,主要是对时间datetime的使用。由于篇幅,这里就不再贴出代码。

这里就完成了任务job的添加。然后在调用run_pending方法中就可以让任务执行。

0x04 总结一下

schedule 库定义两个核心类SchedulerJob。在导入包时就默认创建一个Scheduler对象,并初始化任务列表。
schedule模块提供了链式调用的接口,在配置schedule参数时,就会创建任务对象job,并会将job添加到任务列表中,最后在执行run_pending方法时,就会调用我们自定义的方法。 这个库的核心思想是使用面向对象方法,对事物能够准确地抽象,它总体的逻辑并不复杂,是学习源码很不错的范例。

0x05 学习资料

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/35992
 
96 次点击