1 簡介
APScheduler的全稱是Advanced Python Scheduler。它是一個輕量級的 Python 定時任務調度框架。同時,它還支持異步執行、后台執行調度任務。本人小小的建議是一般項目用APScheduler,因為不用像Celery那樣再單獨啟動worker、beat進程,而且API也很簡潔。
Apscheduler基於Quartz的一個python定時任務框架,實現Quart的所有功能,相關的接口調用起來比較方便,目前其提供了基於日期、固定時間間隔以及corntab類型的任務,並且同時可進行持久化任務;同時它提供了多種不同的調用器,方便開發者根據自己的需求進行使用,也方便與數據庫等第三方的外部持久化儲存機制進行協同工作,非常強大。
基本原理
總的來說,主要是利用python threading Event和Lock鎖來寫的。scheduler在主循環(main_loop)中,反復檢查是否有需要執行的任務,完成任務的檢查函數為 _process_jobs,主要有那個幾個步驟:
1、詢問儲存的每個 jobStore,是否有到期要執行的任務。
@abstractmethod def get_due_jobs(self, now): """ Returns the list of jobs that have ``next_run_time`` earlier or equal to ``now``. The returned jobs must be sorted by next run time (ascending). :param datetime.datetime now: the current (timezone aware) datetime :rtype: list[Job] """

2、due_jobs不為空,則計算這些jobs中每個job需要運行的時間點,時間一到就提交給submit作任務調度。

3、在主循環中,如果不間斷地調用,而實際上沒有要執行的job,這會造成資源浪費。因此在程序中,如果每次掉用 _process_jobs后,進行了預先判斷,判斷下一次要執行的job(離現在最近的)還要多長時間,作為返回值告訴main_loop, 這時主循環就可以去睡一覺,等大約這么長時間后再喚醒,執行下一次 _process_jobs。
2 使用步驟
APScheduler 使用起來還算是比較簡單。運行一個調度任務只需要以下三部曲。
- 新建一個 schedulers (調度器) 。
- 添加一個調度任務(job stores)。
- 運行調度任務。
下面是執行每 2 秒報時的簡單示例代碼:
import datetime import time from apscheduler.schedulers.background import BackgroundScheduler def timedTask(): print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) if __name__ == '__main__': # 創建后台執行的 schedulers scheduler = BackgroundScheduler() # 添加調度任務 # 調度方法為 timedTask,觸發器選擇 interval(間隔性),間隔時長為 2 秒 scheduler.add_job(timedTask, 'interval', seconds=2) # 啟動調度任務 scheduler.start() while True: print(time.time()) time.sleep(5)
傳遞參數的方式有元組(tuple)、列表(list)、字典(dict)
注意:不過需要注意采用元組傳遞參數時后邊需要多加一個逗號
#基於list
scheduler.add_job(job2, 'interval', seconds=5, args=['a','b','list'], id='test_job4')
#基於tuple
scheduler.add_job(job2, 'interval', seconds=5, args=('a','b','tuple',), id='test_job5')
#基於dict
scheduler.add_job(job3, 'interval', seconds=5, kwargs={'f':'dict', 'a':1,'b':2}, id='test_job6)
3 基礎組件
APScheduler 有四種組件,分別是:調度器(scheduler),作業存儲(job store),觸發器(trigger),執行器(executor)。
-
schedulers(調度器)
它是任務調度器,屬於控制器角色。它配置作業存儲器和執行器可以在調度器中完成,例如添加、修改和移除作業。把下面三個組件作為參數,通過創建調度器實例來運行 -
triggers(觸發器)
描述調度任務被觸發的條件。不過觸發器完全是無狀態的。 -
job stores(作業存儲器)
任務持久化倉庫,默認保存任務在內存中,也可將任務保存都各種數據庫中,任務中的數據序列化后保存到持久化數據庫,從數據庫加載后又反序列化。 -
executors(執行器)
負責處理作業的運行,它們通常通過在作業中提交指定的可調用對象到一個線程或者進城池來進行。當作業完成時,執行器將會通知調度器。
3.1 schedulers(調度器)
我個人覺得 APScheduler 非常好用的原因。它提供 7 種調度器,能夠滿足我們各種場景的需要。例如:后台執行某個操作,異步執行操作等。調度器分別是:
- BlockingScheduler : 調度器在當前進程的主線程中運行,也就是會阻塞當前線程。
- BackgroundScheduler : 調度器在后台線程中運行,不會阻塞當前線程。
- AsyncIOScheduler : 結合
asyncio模塊(一個異步框架)一起使用。 - GeventScheduler : 程序中使用
gevent(高性能的Python並發框架)作為IO模型,和GeventExecutor配合使用。 - TornadoScheduler : 程序中使用
Tornado(一個web框架)的IO模型,用ioloop.add_timeout完成定時喚醒。 - TwistedScheduler : 配合
TwistedExecutor,用reactor.callLater完成定時喚醒。 - QtScheduler : 你的應用是一個 Qt 應用,需使用QTimer完成定時喚醒。
負責將上面幾個組件聯系在一起,一般在應用中只有一個調度器,程序開發者不會直接操作觸發器、作業存儲或執行器,而是利用調度器提供了處理這些合適的接口,作業存儲和執行器的配置都是通過在調度器中完成的。
開發者也不必直接操作任務儲存器、執行器以及觸發器,因為調度器提供了統一的接口,通過調度器就可以操作組件,比如任務的增刪改查。
調度器工作流程:

3.2 triggers(觸發器)
某一個工作到來時引發的事件,包含調度的邏輯,每一個作業都有它自己的觸發器,用於決定哪個作業任務會執行,除了它們初始化配置之外,其完全是無狀態的。總的來說就是一個任務應該在什么時候執行
APScheduler 有三種內建的 trigger:
1)date 觸發器
date 是最基本的一種調度,作業任務只會執行一次。它表示特定的時間點觸發。它的參數如下:
| 參數 | 說明 |
|---|---|
| run_date (datetime 或 str) | 作業的運行日期或時間 |
| timezone (datetime.tzinfo 或 str) | 指定時區 |
date 觸發器使用示例如下:
from datetime import datetime from datetime import date from apscheduler.schedulers.background import BackgroundScheduler def job_func(text): print(text) scheduler = BackgroundScheduler() # 在 2017-12-13 時刻運行一次 job_func 方法 scheduler .add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text']) # 在 2017-12-13 14:00:00 時刻運行一次 job_func 方法 scheduler .add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text']) # 在 2017-12-13 14:00:01 時刻運行一次 job_func 方法 scheduler .add_job(job_func, 'date', run_date='2017-12-13 14:00:01', args=['text']) scheduler.start()
2)interval 觸發器
固定時間間隔觸發。即每隔多久執行一次,interval 間隔調度,參數如下:
| 參數 | 說明 |
|---|---|
| weeks (int) | 間隔幾周 |
| days (int) | 間隔幾天 |
| hours (int) | 間隔幾小時 |
| minutes (int) | 間隔幾分鍾 |
| seconds (int) | 間隔多少秒 |
| start_date (datetime 或 str) | 開始日期 |
| end_date (datetime 或 str) | 結束日期 |
| timezone (datetime.tzinfo 或str) | 時區 |
interval 觸發器使用示例如下:
import datetime from apscheduler.schedulers.background import BackgroundScheduler def job_func(): print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) scheduler = BackgroundScheduler() # 每隔兩分鍾執行一次 job_func 方法 scheduler .add_job(job_func, 'interval', seconds=2) # 在 2017-12-13 14:00:01 ~ 2017-12-13 14:00:10 之間, 每隔兩分鍾執行一次 job_func 方法 scheduler .add_job(job_func, 'interval', seconds=2, start_date='2017-12-13 14:00:01' , end_date='2017-12-13 14:00:10') scheduler.start()
3)cron 觸發器
在特定時間周期性地觸發,定時任務(即規定在某一時刻執行)和Linux crontab格式兼容。它是功能最強大的觸發器。
我們先了解 cron 參數:
| 參數 | 說明 |
|---|---|
| year (int 或 str) | 年,4位數字 |
| month (int 或 str) | 月 (范圍1-12) |
| day (int 或 str) | 日 (范圍1-31 |
| week (int 或 str) | 周 (范圍1-53) |
| day_of_week (int 或 str) | 周內第幾天或者星期幾 (范圍0-6 或者 mon,tue,wed,thu,fri,sat,sun) |
| hour (int 或 str) | 時 (范圍0-23) |
| minute (int 或 str) | 分 (范圍0-59) |
| second (int 或 str) | 秒 (范圍0-59) |
| start_date (datetime 或 str) | 最早開始日期(包含) |
| end_date (datetime 或 str) | 最晚結束時間(包含) |
| timezone (datetime.tzinfo 或str) | 指定時區 |
這些參數是支持算數表達式,取值格式有如下:
cron 觸發器使用示例如下:
# job_function將會在6,7,8,11,12月的第3個周五的1,2,3點運行 sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # 截止到2016-12-30 00:00:00,每周一到周五早上五點半運行job_function sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2016-12-31')
import datetime from apscheduler.schedulers.background import BackgroundScheduler def job_func(): print("當前時間:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) scheduler = BackgroundScheduler() # 在每年 1-3、7-9 月份中的每個星期一、二中的 00:00, 01:00, 02:00 和 03:00 執行 job_func 任務 scheduler .add_job(job_func, 'cron', month='1-3,7-9',day='1-31', hour='0-23',minute= '1-59',second = '1-59') scheduler.start() while True: pass
3.3 作業存儲(job store)
該組件是對調度任務的管理。保存要調度的任務,其中除了默認的作業存儲是把作業保存在內存中,其他的作業存儲是將作業保存在數據庫中。一個作業的數據將在保存在持久化的作業存儲之前,會對作業執行序列化操作,當重新讀取作業時,再執行反序列化操作。同時,調度器不能分享同一個作業存儲。作業存儲支持主流的存儲機制:如redis,mongodb,關系型數據庫,內存等等。
1)添加 job
有兩種添加方法,其中一種上述代碼用到的 add_job(), 另一種則是scheduled_job()修飾器來修飾函數。
這個兩種辦法的區別是:第一種方法返回一個 apscheduler.job.Job 的實例,可以用來改變或者移除 job。第二種方法只適用於應用運行期間不會改變的 job。
第二種添加任務方式的例子:
import datetime from apscheduler.schedulers.background import BlockingScheduler scheduler = BlockingScheduler() @scheduler.scheduled_job( 'interval', seconds=2) def job_func(): print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) scheduler.start()
import datetime from apscheduler.schedulers.blocking import BlockingScheduler from app.untils.log_builder import sys_logging scheduler = BlockingScheduler() # 后台運行 # 設置為每日凌晨00:30:30時執行一次調度程序 @scheduler.scheduled_job("cron", day_of_week='*', hour='1', minute='30', second='30') def rebate(): print "schedule execute" sys_logging.debug("statistic scheduler execute success" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) if __name__ == '__main__': try: scheduler.start() sys_logging.debug("statistic scheduler start success") except (KeyboardInterrupt, SystemExit): scheduler.shutdown() sys_logging.debug("statistic scheduler start-up fail")
移除 job 也有兩種方法:
remove_job() 和
job.remove()。
remove_job() 是根據 job 的 id 來移除,所以要在 job 創建的時候指定一個 id。
job.remove() 則是對 job 執行 remove 方法即可
import datetime from apscheduler.schedulers.background import BlockingScheduler scheduler = BlockingScheduler() def job_func(): print("當前時間:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) # scheduler.add_job(job_func, 'interval', seconds=2, id='job_one') # scheduler.remove_job('job_one') job = scheduler.add_job(job_func, 'interval', seconds=2, id='job_one',name='test_job') job.remove() scheduler.start()
3).獲得任務列表:
可以通過get_jobs方法來獲取當前的任務列表,也可以通過get_job()來根據job_id來獲得某個任務的信息。並且apscheduler還提供了一個print_jobs()方法來打印格式化的任務列表。
import datetime from apscheduler.schedulers.background import BlockingScheduler scheduler = BlockingScheduler() def job_func(): print("當前時間:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) job = scheduler.add_job(job_func, 'interval', seconds=2, id='job_one') job = scheduler.add_job(job_func, 'interval', seconds=2, id='job_two') print scheduler.get_job('job_one') print scheduler.get_jobs() scheduler.start()
4).修改 job:
如果你因計划改變要對 job 進行修改,可以使用Job.modify() 或者 modify_job()方法來修改除id以外的任何作業屬性。
job.modify(max_instances=6, name='Alternate name')
import datetime from apscheduler.schedulers.background import BlockingScheduler scheduler = BlockingScheduler() def job_func(): print("當前時間:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) scheduler.add_job(job_func, 'interval', seconds=2, id='job_one',name= 'ljt') temp_dict = {"seconds":3} temp_trigger = scheduler._create_trigger(trigger='interval',trigger_args=temp_dict) result = scheduler.modify_job(job_id='job_one',trigger=temp_trigger) scheduler.start()
5).暫停與恢復任務:
暫停與恢復任務可以直接操作任務實例或者調度器來實現。當任務暫停時,它的運行時間會被重置,暫停期間不會計算時間。
import datetime import time from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler()#非阻塞,后台運行 def job_func(): print("當前時間:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) job = scheduler.add_job(job_func, 'interval', seconds=1, id='job_one',name= 'ljt') scheduler.start() while True: time.sleep(3) scheduler.pause_job('job_one') time.sleep(1) job.resume()#或者下面,等價 scheduler.resume_job('job_one')
6).啟動調度器
可以使用start()方法啟動調度器,BlockingScheduler需要在初始化之后才能執行start(),對於其他的Scheduler,調用start()方法都會直接返回,然后可以繼續執行后面的初始化操作。
例如:
from apscheduler.schedulers.blocking import BlockingScheduler def my_job(): print "Hello world!" scheduler = BlockingScheduler() scheduler.add_job(my_job, 'interval', seconds=1) scheduler.start()
7).關閉調度器:
使用下邊方法關閉調度器:
scheduler.shutdown()
默認情況下調度器會關閉它的任務存儲和執行器,並等待所有正在執行的任務完成,如果不想等待,可以進行如下操作:
scheduler.shutdown(wait=False)
3.4 執行器(executor)
執行器顧名思義是執行調度任務的模塊。最常用的 executor 有兩種:ProcessPoolExecutor 和 ThreadPoolExecutor
主要是處理作業的運行,它將要執行的作業放在新的線程或者線程池中運行。執行完畢之后,再通知調度器。基於線程池的操作,可以針對不同類型的作業任務,更為高效的使用CPU的計算資源。
大多數情況下,執行器選擇ThreadPoolExecutor就夠用了,但如果涉及到比較消耗CPU的作業,就可以選擇ProcessPoolExecutor* ,以充分利用多核CPU。
當然也可以同時配置使用兩個執行器,將進程池ProcessPoolExecutor調度器作為你的第二個執行器。
下面是顯式設置 job store(使用mongo存儲)和 executor 的代碼的示例。
第一種方式:
from pymongo import MongoClient from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor import time def my_job(): print int(time.time()) host = '127.0.0.1' port = 27017 client = MongoClient(host, port) jobstores = { 'mongo': MongoDBJobStore(collection='job', database='test', client=client), 'default': MemoryJobStore() } executors = { 'default': ThreadPoolExecutor(10), 'processpool': ProcessPoolExecutor(3) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) scheduler.add_job(my_job, 'interval', seconds=1) try: scheduler.start() except SystemExit: client.close()
- coalesce:當由於某種原因導致某個job積攢了好幾次沒有實際運行(比如說系統掛了5分鍾后恢復,有一個任務是每分鍾跑一次的,按道理說這5分鍾內本來是“計划”運行5次的,但實際沒有執行),如果coalesce為True,下次這個job被submit給executor時,只會執行1次,也就是最后這次,如果為False,那么會執行5次(不一定,因為還有其他條件,看后面misfiregracetime的解釋)。
- max_instance:每個job在同一時刻能夠運行的最大實例數,默認情況下為1個,可以指定為更大值,這樣即使上個job還沒運行完同一個job又被調度的話也能夠再開一個線程執行。(限制同一個job實例的並發執行數)
- misfire_grace_time:單位為秒,假設有這么一種情況,當某一job被調度時剛好線程池都被占滿,調度器會選擇將該job排隊不運行,misfiregracetime參數則是在線程池有可用線程時會比對該job的應調度時間跟當前時間的差值,如果差值<misfiregracetime時,調度器會再次調度該job.反之該job的執行狀態為EVENTJOBMISSED了,即錯過運行.</misfire。
-
作業被添加到數據庫中,程序中斷后重新運行時會自動從數據庫讀取作業信息,而不需要重新再添加到調度器中,如果不注釋 21-25 行添加作業的代碼,則作業會重新添加到數據庫中,這樣就有了兩個同樣的作業,避免出現這種情況可以在 add_job 的參數中增加 replace_existing=True,如scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval',seconds=3,replace_existing=True)
-
還有一點要注意,如果你的執行器或任務儲存器是會序列化任務的,那么這些任務就必須符合:
-
回調函數必須全局可用
-
回調函數參數必須也是可以被序列化的
重要提醒!
如果在程序初始化時,是從數據庫讀取任務的,那么必須為每個任務定義一個明確的ID,並且使用replace_existing=True,否則每次重啟程序,你都會得到一份新的任務拷貝,也就意味着任務的狀態不會保存。
內置任務儲存器中,只有MemoryJobStore不會序列化任務;內置執行器中,只有ProcessPoolExecutor會序列化任務。
建議:如果想要立刻運行任務,可以在添加任務時省略trigger參數
-
#持久化默認在內存中,monogo數據庫中只會生成一個test的數據庫job(collection)的值為空,程序最終只用了default的值。
如果需要持久化在monogo中jobstores改為'default': MongoDBJobStore(collection = 'job', database = 'test', client = client)。
再次重啟程序時,程序會從數據庫中直接讀取任務,可以不用重復添加任務。
第二種方式:
from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler({ 'apscheduler.jobstores.mongo': { 'type': 'mongodb' }, 'apscheduler.jobstores.default': { 'type': 'sqlalchemy', 'url': 'sqlite:///jobs.sqlite' }, 'apscheduler.executors.default': { 'class': 'apscheduler.executors.pool:ThreadPoolExecutor', 'max_workers': '20' }, 'apscheduler.executors.processpool': { 'type': 'processpool', 'max_workers': '5' }, 'apscheduler.job_defaults.coalesce': 'false', 'apscheduler.job_defaults.max_instances': '3', 'apscheduler.timezone': 'UTC', })
第三種方式:
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor jobstores = { 'mongo': {'type': 'mongodb'}, 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': {'type': 'threadpool', 'max_workers': 20}, 'processpool': ProcessPoolExecutor(max_workers=5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler() scheduler.configure(jobstores=jobstores, executors=executors,job_defaults=job_defaults, timezone=utc)
調度器監聽事件
可以給調度器添加事件監聽器,調度器事件只有在某些情況下才會被觸發,並且可以攜帶某些有用的信息。通過給 add_listener()傳遞合適的 mask參數,可以只監聽幾種特定的事件類型,具體類型可看源碼中的 event.exception或者 event.code值來做識別判斷。
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR,EVENT_JOB_ADDED import time def task_listener(event): if event.code == EVENT_JOB_EXECUTED: print '任務執行成功' if event.code == EVENT_JOB_ADDED: print '任務添加成功' if event.code == EVENT_JOB_ERROR: print '任務執行錯誤' def my_job(): print "Hello world!" print 1/0 scheduler = BackgroundScheduler() scheduler.add_listener(task_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_ADDED) job = scheduler.add_job(my_job, 'interval', seconds=1) scheduler.start() while True: time.sleep(3) job = scheduler.add_job(my_job, 'interval', seconds=1)
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR import datetime import logging import time logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='log1.txt', filemode='a') def aps_test(x): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) def date_test(x): print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) print (1 / 0) def my_listener(event): #定義一個事件監聽,出現意外情況打印相關信息報警。 if event.exception: print ('任務出錯了!!!!!!') else: print ('任務照常運行...') scheduler = BackgroundScheduler() scheduler.add_job(func=date_test, args=('一次性任務,會出錯',),next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task') scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) scheduler._logger = logging #啟用 scheduler 模塊的日記記錄 scheduler.start() while True: time.sleep(3)
在生產環境中,可以把出錯信息換成發送一封郵件或者發送一個短信,這樣定時任務出錯就可以立馬就知道。
第一次使用此定時器時總會執行兩次,一直不知道為什么,后來發現,python 的flask框架在debug模式下會多開一個線程監測項目變化,所以每次會跑兩遍,可以將debug選項改為False
注意:
當出現No handlers could be found for logger “apscheduler.scheduler”次錯誤信息時,說明沒有 logging模塊的logger存在,所以需要添加上,對應新增內容如下所示(僅供參):
import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datafmt='%a, %d %b %Y %H:%M:%S',
filename='/var/log/aaa.txt',
filemode='a'
)
總結:
APScheduler在實際使用過程中擁有最大的靈活性,可以滿足我們的大部分定時任務的相關需求;Celery比較重量級,通常如果項目中已有Celery在使用,而且不需要動態添加定時任務時可以考慮使用;schedule非常輕量級,使用簡單,但是不支持任務的持久化,也無法動態添加刪除任務,所以主要用於簡單的小型應用。
https://www.jianshu.com/p/ad2c42245906
