python任務調度之schedule


本文通過開源項目schedule來學習定時任務如何工作

schedule簡介

先來看下做做提供的一個例子

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

注解

  • 每隔10分鍾執行一次任務
  • 每隔一個小時執行一次任務
  • 每天10:30執行一次任務
  • 每周一執行一次任務
  • 每周三13:15執行一次任務
  • 每小時的第17分鍾時執行一次任務

schedule源碼學習

可看到有三個類CancelJob、Scheduler、Job,對源碼的分析也將圍繞這三個類展開

CancelJob

class CancelJob(object):
    pass

可以看到這是一個空類,這個類的作用是當job執行函數返回一個CancelJob類型的對象時,執行完之后就會被Schedule移除,簡單說就是只會執行一次

Scheduler

Scheduler類源碼
這里為使代碼精簡、緊湊,刪除了注釋

class Scheduler(object):
    def __init__(self):
        self.jobs = []
    def run_pending(self):
        runnable_jobs = (job for job in self.jobs if job.should_run)
        for job in sorted(runnable_jobs):
            self._run_job(job)
    def run_all(self, delay_seconds=0):
        for job in self.jobs:
            self._run_job(job)
            time.sleep(delay_seconds)
    def clear(self):
        del self.jobs[:]
    def cancel_job(self, job):
        try:
            self.jobs.remove(job)
        except ValueError:
            pass
    def every(self, interval=1):
        job = Job(interval)
        self.jobs.append(job)
        return job
    def _run_job(self, job):
        ret = job.run()
        if isinstance(ret, CancelJob) or ret is CancelJob:
            self.cancel_job(job)
    @property
    def next_run(self):
        if not self.jobs:
            return None
        return min(self.jobs).next_run
    @property
    def idle_seconds(self):
        return (self.next_run - datetime.datetime.now()).total_seconds()

Scheduler作用

就是在job執行的時候執行它
函數解釋

  • run_pending:運行所有可以運行的任務
  • run_all:運行所有任務,不管是否應該運行
  • clear:刪除所有調度的任務
  • cancle_job:刪除一個任務
  • every:創建一個調度任務,返回一個job
  • _run_joib:運行一個job
  • next_run:獲取下一個要運行任務的時間,這里使用的min去得到最近將執行的job,之所以這樣使用,是Job重載了__lt__方法,這樣寫起來很簡潔
  • idle_second:還有多少秒即將開始運行任務

Job

Job是整個定時任務的核心. 主要功能就是根據創建Job時的參數,得到下一次運行的時間. 代碼如下,稍微有點長(會省略部分代碼,可以看源碼):

class Job(object):
    def __init__(self, interval):
        self.interval = interval  # pause interval * unit between runs
        self.job_func = None  # the job job_func to run
        self.unit = None  # time units, e.g. 'minutes', 'hours', ...
        self.at_time = None  # optional time at which this job runs
        self.last_run = None  # datetime of the last run
        self.next_run = None  # datetime of the next run
        self.period = None  # timedelta between runs, only valid for
        self.start_day = None  # Specific day of the week to start on
    def __lt__(self, other):
        return self.next_run < other.next_run
    def minute(self):
        assert self.interval == 1, 'Use minutes instead of minute'
        return self.minutes
    @property
    def minutes(self):
        self.unit = 'minutes'
        return self
    @property
    def hour(self):
        assert self.interval == 1, 'Use hours instead of hour'
        return self.hours
    @property
    def hours(self):
        self.unit = 'hours'
        return self
    @property
    def day(self):
        assert self.interval == 1, 'Use days instead of day'
        return self.days
    @property
    def days(self):
        self.unit = 'days'
        return self
    @property
    def week(self):
        assert self.interval == 1, 'Use weeks instead of week'
        return self.weeks
    @property
    def weeks(self):
        self.unit = 'weeks'
        return self
    @property
    def monday(self):
        assert self.interval == 1, 'Use mondays instead of monday'
        self.start_day = 'monday'
        return self.weeks
    def at(self, time_str):
        assert self.unit in ('days', 'hours') or self.start_day
        hour, minute = time_str.split(':')
        minute = int(minute)
        if self.unit == 'days' or self.start_day:
            hour = int(hour)
            assert 0 <= hour <= 23
        elif self.unit == 'hours':
            hour = 0
        assert 0 <= minute <= 59
        self.at_time = datetime.time(hour, minute)
        return self
    def do(self, job_func, *args, **kwargs):
        self.job_func = functools.partial(job_func, *args, **kwargs)
        try:
            functools.update_wrapper(self.job_func, job_func)
        except AttributeError:
            # job_funcs already wrapped by functools.partial won't have
            # __name__, __module__ or __doc__ and the update_wrapper()
            # call will fail.
            pass
        self._schedule_next_run()
        return self
    @property
    def should_run(self):
        return datetime.datetime.now() >= self.next_run
    def run(self):
        logger.info('Running job %s', self)
        ret = self.job_func()
        self.last_run = datetime.datetime.now()
        self._schedule_next_run()
        return ret
    def _schedule_next_run(self):
        assert self.unit in ('seconds', 'minutes', 'hours', 'days', 'weeks')
        self.period = datetime.timedelta(**{self.unit: self.interval})
        self.next_run = datetime.datetime.now() + self.period
        #還有很多....

參數的含義:

  • interval:間隔多久,每interval秒或分等
  • job_func:job執行函數
  • unit:間隔單元,比如minutes,hours
  • at_time:job具體執行時間點,比如10:30等
  • last_run:job上次執行時間
  • next_run:job下一次即將執行時間
  • period:距離下次運行間隔時間
  • start_day:周的特殊天,也就是monday等的含義
    各種方法
  • __lt__:比較哪個job最先即將執行, Scheduler中next_run方法里使用min會用到, 有時合適的使用python這些特殊方法可以簡化代碼,看起來更pythonic.
  • second、seconds的區別就是second時默認interval ==1,即schedule.every().second和schedule.every(1).seconds是等價的,作用就是設置unit為seconds. minute和minutes、hour和hours、day和days、week和weeks也類似.
  • monday: 設置start_day 為monday, unit 為weeks,interval為1. 含義就是每周一執行job. 類似 tuesday、wednesday、thursday、friday、saturday、sunday一樣.
  • at: 表示某天的某個時間點,所以不適合minutes、weeks且start_day 為空(即單純的周)這些unit. 對於unit為hours時,time_str中小時部分為0.
  • do: 設置job對應的函數以及參數, 這里使用functools.update_wrapper去更新函數名等信息.主要是functools.partial返回的函數和原函數名稱不一樣.具體可以看看官網文檔. 然后調用_schedule_next_run去計算job下一次執行時間.
  • should_run: 判斷job是否可以運行了.依據是當前時間點大於等於job的next_run
  • _schedule_next_run: 這是整個job的定時的邏輯部分是計算job下次運行的時間點的.描述一下流程:
  • 計算下一次執行時間:

這里根據unit和interval計算出下一次運行時間. 舉個例子,比如schedule.every().hour.do(job, message='things')下一次運行時間就是當前時間加上一小時的間隔.
  • 但是當start_day不為空時,即表示某個星期. 這時period就不能直接加在當前時間了. 看代碼:

其中days_ahead表示job表示的星期幾與當表示的星期幾差幾天. 比如今天是星期三,job表示的是星期五,那么days_ahead就為2,最終self.next_run效果就是在now基礎上加了2天.
  • 當at_time不為空時, 需要更新執行的時間點,具體就是計算時、分、秒然后調用replace進行更新. 這里對unit為days或hours進行特殊處理:

當已經過了執行時間的話的話,unit為days的話減去一天, unit為hours的話減去一小時. 這樣可以保證任務今天運行.
  • 后面還有一句代碼:

這句的含義時對於像monday這些定時任務特殊情況的處理. 舉個例子, 今天是星期四12:00,創建的job是星期四13:00, days_ahead <=7 這個條件滿足,最終next_run實際加了7,這樣的話這個任務就不會運行了. 所以這一步實際就是把7減掉. 看上去有點繞, 實際只要把days_ahead <= 0改為days_ahead < 0這句代碼就不用了.

學習總結

通過學習schedule,可以看到實現一個基礎的任務定時調度就是根據job的配置計算執行時間和執行job. 代碼里我認為比較好的地方有:

  • __lt__的使用,這樣min函數直接應用在job上.
  • @property是代碼更簡潔
  • 返回self支持連綴操作,像schedule.every(10).minutes.do(job)看起來很直接.
  • 時間部分完全是根據datetime實現的,有很多很好用的函數.

的確需要好好學習下
轉載:https://zhuanlan.zhihu.com/p/23086148


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM