本文通過開源項目schedule來學習定時任務如何工作
schedule簡介
先來看下做做提供的一個例子
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)
注解
- 每隔10分鍾執行一次任務
- 每隔一個小時執行一次任務
- 每天10:30執行一次任務
- 每周一執行一次任務
- 每周三13:15執行一次任務
- 每小時的第17分鍾時執行一次任務
schedule源碼學習
可看到有三個類CancelJob、Scheduler、Job,對源碼的分析也將圍繞這三個類展開
CancelJob
class CancelJob(object):
pass
可以看到這是一個空類,這個類的作用是當job執行函數返回一個CancelJob類型的對象時,執行完之后就會被Schedule移除,簡單說就是只會執行一次
Scheduler
Scheduler類源碼
這里為使代碼精簡、緊湊,刪除了注釋
class Scheduler(object):
def __init__(self):
self.jobs = []
def run_pending(self):
runnable_jobs = (job for job in self.jobs if job.should_run)
for job in sorted(runnable_jobs):
self._run_job(job)
def run_all(self, delay_seconds=0):
for job in self.jobs:
self._run_job(job)
time.sleep(delay_seconds)
def clear(self):
del self.jobs[:]
def cancel_job(self, job):
try:
self.jobs.remove(job)
except ValueError:
pass
def every(self, interval=1):
job = Job(interval)
self.jobs.append(job)
return job
def _run_job(self, job):
ret = job.run()
if isinstance(ret, CancelJob) or ret is CancelJob:
self.cancel_job(job)
@property
def next_run(self):
if not self.jobs:
return None
return min(self.jobs).next_run
@property
def idle_seconds(self):
return (self.next_run - datetime.datetime.now()).total_seconds()
Scheduler作用
就是在job執行的時候執行它
函數解釋
- run_pending:運行所有可以運行的任務
- run_all:運行所有任務,不管是否應該運行
- clear:刪除所有調度的任務
- cancle_job:刪除一個任務
- every:創建一個調度任務,返回一個job
- _run_joib:運行一個job
- next_run:獲取下一個要運行任務的時間,這里使用的min去得到最近將執行的job,之所以這樣使用,是Job重載了__lt__方法,這樣寫起來很簡潔
- idle_second:還有多少秒即將開始運行任務
Job
Job是整個定時任務的核心. 主要功能就是根據創建Job時的參數,得到下一次運行的時間. 代碼如下,稍微有點長(會省略部分代碼,可以看源碼):
class Job(object):
def __init__(self, interval):
self.interval = interval # pause interval * unit between runs
self.job_func = None # the job job_func to run
self.unit = None # time units, e.g. 'minutes', 'hours', ...
self.at_time = None # optional time at which this job runs
self.last_run = None # datetime of the last run
self.next_run = None # datetime of the next run
self.period = None # timedelta between runs, only valid for
self.start_day = None # Specific day of the week to start on
def __lt__(self, other):
return self.next_run < other.next_run
def minute(self):
assert self.interval == 1, 'Use minutes instead of minute'
return self.minutes
@property
def minutes(self):
self.unit = 'minutes'
return self
@property
def hour(self):
assert self.interval == 1, 'Use hours instead of hour'
return self.hours
@property
def hours(self):
self.unit = 'hours'
return self
@property
def day(self):
assert self.interval == 1, 'Use days instead of day'
return self.days
@property
def days(self):
self.unit = 'days'
return self
@property
def week(self):
assert self.interval == 1, 'Use weeks instead of week'
return self.weeks
@property
def weeks(self):
self.unit = 'weeks'
return self
@property
def monday(self):
assert self.interval == 1, 'Use mondays instead of monday'
self.start_day = 'monday'
return self.weeks
def at(self, time_str):
assert self.unit in ('days', 'hours') or self.start_day
hour, minute = time_str.split(':')
minute = int(minute)
if self.unit == 'days' or self.start_day:
hour = int(hour)
assert 0 <= hour <= 23
elif self.unit == 'hours':
hour = 0
assert 0 <= minute <= 59
self.at_time = datetime.time(hour, minute)
return self
def do(self, job_func, *args, **kwargs):
self.job_func = functools.partial(job_func, *args, **kwargs)
try:
functools.update_wrapper(self.job_func, job_func)
except AttributeError:
# job_funcs already wrapped by functools.partial won't have
# __name__, __module__ or __doc__ and the update_wrapper()
# call will fail.
pass
self._schedule_next_run()
return self
@property
def should_run(self):
return datetime.datetime.now() >= self.next_run
def run(self):
logger.info('Running job %s', self)
ret = self.job_func()
self.last_run = datetime.datetime.now()
self._schedule_next_run()
return ret
def _schedule_next_run(self):
assert self.unit in ('seconds', 'minutes', 'hours', 'days', 'weeks')
self.period = datetime.timedelta(**{self.unit: self.interval})
self.next_run = datetime.datetime.now() + self.period
#還有很多....
參數的含義:
- interval:間隔多久,每interval秒或分等
- job_func:job執行函數
- unit:間隔單元,比如minutes,hours
- at_time:job具體執行時間點,比如10:30等
- last_run:job上次執行時間
- next_run:job下一次即將執行時間
- period:距離下次運行間隔時間
- start_day:周的特殊天,也就是monday等的含義
各種方法- __lt__:比較哪個job最先即將執行, Scheduler中next_run方法里使用min會用到, 有時合適的使用python這些特殊方法可以簡化代碼,看起來更pythonic.
- second、seconds的區別就是second時默認interval ==1,即schedule.every().second和schedule.every(1).seconds是等價的,作用就是設置unit為seconds. minute和minutes、hour和hours、day和days、week和weeks也類似.
- monday: 設置start_day 為monday, unit 為weeks,interval為1. 含義就是每周一執行job. 類似 tuesday、wednesday、thursday、friday、saturday、sunday一樣.
- at: 表示某天的某個時間點,所以不適合minutes、weeks且start_day 為空(即單純的周)這些unit. 對於unit為hours時,time_str中小時部分為0.
- do: 設置job對應的函數以及參數, 這里使用functools.update_wrapper去更新函數名等信息.主要是functools.partial返回的函數和原函數名稱不一樣.具體可以看看官網文檔. 然后調用_schedule_next_run去計算job下一次執行時間.
- should_run: 判斷job是否可以運行了.依據是當前時間點大於等於job的next_run
- _schedule_next_run: 這是整個job的定時的邏輯部分是計算job下次運行的時間點的.描述一下流程:
- 計算下一次執行時間:
這里根據unit和interval計算出下一次運行時間. 舉個例子,比如schedule.every().hour.do(job, message='things')下一次運行時間就是當前時間加上一小時的間隔.
- 但是當start_day不為空時,即表示某個星期. 這時period就不能直接加在當前時間了. 看代碼:
其中days_ahead表示job表示的星期幾與當表示的星期幾差幾天. 比如今天是星期三,job表示的是星期五,那么days_ahead就為2,最終self.next_run效果就是在now基礎上加了2天.
- 當at_time不為空時, 需要更新執行的時間點,具體就是計算時、分、秒然后調用replace進行更新. 這里對unit為days或hours進行特殊處理:
當已經過了執行時間的話的話,unit為days的話減去一天, unit為hours的話減去一小時. 這樣可以保證任務今天運行.
- 后面還有一句代碼:
這句的含義時對於像monday這些定時任務特殊情況的處理. 舉個例子, 今天是星期四12:00,創建的job是星期四13:00, days_ahead <=7 這個條件滿足,最終next_run實際加了7,這樣的話這個任務就不會運行了. 所以這一步實際就是把7減掉. 看上去有點繞, 實際只要把days_ahead <= 0改為days_ahead < 0這句代碼就不用了.
學習總結
通過學習schedule,可以看到實現一個基礎的任務定時調度就是根據job的配置計算執行時間和執行job. 代碼里我認為比較好的地方有:
- __lt__的使用,這樣min函數直接應用在job上.
- @property是代碼更簡潔
- 返回self支持連綴操作,像schedule.every(10).minutes.do(job)看起來很直接.
- 時間部分完全是根據datetime實現的,有很多很好用的函數.
的確需要好好學習下
轉載:https://zhuanlan.zhihu.com/p/23086148