一. 配置調度器
APScheduler 有多種不同的配置方法,你可以選擇直接傳字典或傳參的方式創建調度器;
也可以先實例一個調度器對象,再添加配置信息。靈活的配置方式可以滿足各種應用場景的需要。
整套的配置選項可以參考API文檔BaseScheduler
類。
一些調度器子類可能有它們自己特有的配置選項,以及獨立的任務儲存器和執行器也可能有自己特有的配置選項,可以查閱API文檔了解。
1.1 舉一個例子,創建一個使用默認任務儲存器和執行器的BackgroundScheduler
:
from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() # 因為是非阻塞的后台調度器,所以程序會繼續向下執行
這樣就可以創建了一個后台調度器。這個調度器有一個名稱為default
的MemoryJobStore
(內存任務儲存器)和一個名稱是default
且最大線程是10的ThreadPoolExecutor
(線程池執行器)。
假如你現在有這樣的需求,兩個任務儲存器分別搭配兩個執行器;同時,還要修改任務的默認參數;最后還要改時區。可以參考下面例子,它們是完全等價的。
- 名稱為“mongo”的
MongoDBJobStore
- 名稱為“default”的
SQLAlchemyJobStore
- 名稱為“default”的
ThreadPoolExecutor
,最大線程20個 - 名稱“processpool”的
ProcessPoolExecutor
,最大進程5個 - UTC時間作為調度器的時區
- 默認為新任務關閉合並模式()
- 設置新任務的默認最大實例數為3
#-*- coding:utf-8 -*- from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { 'mongo': MongoDBJobStore(), #調度器名稱為mongo的MemoryJobStore(內存任務儲存器) 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')#調度器名稱為“default”的SQLAlchemyJobStore } executors = { 'default': ThreadPoolExecutor(20),#名稱為“default ”的ThreadPoolExecutor,最大線程20個 'processpool': ProcessPoolExecutor(5)#名稱“processpool”的ProcessPoolExecutor,最大進程5個 } job_defaults = { 'coalesce': False, #默認為新任務關閉合並模式() 'max_instances': 3#設置新任務的默認最大實例數為3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
方法二:
#-*- coding:utf-8 -*- from apscheduler.schedulers.background import BackgroundScheduler # The "apscheduler." prefix is hard coded scheduler = BackgroundScheduler({ 'apscheduler.jobstores.mongo': { 'type': 'mongodb' }, 'apscheduler.jobstores.default': { 'type': 'sqlalchemy', 'url': 'sqlite:///jobs.sqlite' }, 'apscheduler.executors.default': { 'class': 'apscheduler.executors.pool:ThreadPoolExecutor', 'max_workers': '20' }, 'apscheduler.executors.processpool': { 'type': 'processpool', 'max_workers': '5' }, 'apscheduler.job_defaults.coalesce': 'false', 'apscheduler.job_defaults.max_instances': '3', 'apscheduler.timezone': 'UTC', })
方法三:
#-*- coding:utf-8 -*- from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor jobstores = { 'mongo': {'type': 'mongodb'}, 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': {'type': 'threadpool', 'max_workers': 20}, 'processpool': ProcessPoolExecutor(max_workers=5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler() # ..這里可以添加任務 scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
1.2 啟動調度器
啟動調度器是只需調用start()
即可。除了BlockingScheduler
,非阻塞調度器都會立即返回,可以繼續運行之后的代碼,比如添加任務等。
對於BlockingScheduler
,程序則會阻塞在start()
位置,所以,要運行的代碼必須寫在start()
之前。
注!調度器啟動后,就不能修改配置了。
1.3 添加任務
添加任務的方法有兩種:
- 通過調用
add_job()
- 通過裝飾器
scheduled_job()
第一種方法是最常用的;
第二種方法是最方便的,但缺點就是運行時,不能修改任務。
第一種add_job()
方法會返回一個apscheduler.job.Job
實例,這樣就可以在運行時,修改或刪除任務。在任何時候你都能配置任務。但是如果調度器還沒有啟動,此時添加任務,那么任務就處於一個暫存的狀態。只有當調度器啟動時,才會開始計算下次運行時間。
還有一點要注意,如果你的執行器或任務儲存器是會序列化任務的,那么這些任務就必須符合:
- 回調函數必須全局可用
- 回調函數參數必須也是可以被序列化的
內置任務儲存器中,只有MemoryJobStore
不會序列化任務;內置執行器中,只有ProcessPoolExecutor
會序列化任務。
重要提醒!
如果在程序初始化時,是從數據庫讀取任務的,那么必須為每個任務定義一個明確的ID,並且使用replace_existing=True
,否則每次重啟程序,你都會得到一份新的任務拷貝,也就意味着任務的狀態不會保存。
建議
如果想要立刻運行任務,可以在添加任務時省略trigger
參數
1.4 移除任務
如果想從調度器移除一個任務,那么你就要從相應的任務儲存器中移除它,這樣才算移除了。有兩種方式:
- 調用
remove_job()
,參數為:任務ID,任務儲存器名稱 - 在通過
add_job()
創建的任務實例上調用remove()
方法
第二種方式更方便,但前提必須在創建任務實例時,實例被保存在變量中。對於通過scheduled_job()
創建的任務,只能選擇第一種方式。
當任務調度結束時(比如,某個任務的觸發器不再產生下次運行的時間),任務就會自動移除。
job = scheduler.add_job(myfunc, 'interval', minutes=2) job.remove()
同樣,通過任務的具體ID:
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.remove_job('my_job_id')
1.5 暫停和恢復任務
通過任務實例或調度器,就能暫停和恢復任務。如果一個任務被暫停了,那么該任務的下一次運行時間就會被移除。在恢復任務前,運行次數計數也不會被統計。
暫停任務,有以下兩個方法:
apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()
恢復任務,
apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()
1.6 獲取任務列表
通過get_jobs()
就可以獲得一個可修改的任務列表。get_jobs()
第二個參數可以指定任務儲存器名稱,那么就會獲得對應任務儲存器的任務列表。
print_jobs()
可以快速打印格式化的任務列表,包含觸發器,下次運行時間等信息。
1.7 修改任務
通過apscheduler.job.Job.modify()
或modify_job()
,你可以修改任務當中除了id
的任何屬性。
比如:
job.modify(max_instances=6, name='Alternate name')
如果想要重新調度任務(就是改變觸發器),你能通過apscheduler.job.Job.reschedule()
或reschedule_job()
來實現。這些方法會重新創建觸發器,並重新計算下次運行時間。
比如:
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
1.8 關閉調度器
關閉方法如下:
scheduler.shutdown()
默認情況下,調度器會先把正在執行的任務處理完,再關閉任務儲存器和執行器。但是,如果你就直接關閉,你可以添加參數:
scheduler.shutdown(wait=False)
上述方法不管有沒有任務在執行,會強制關閉調度器。
1.9 暫停、恢復任務進程
調度器可以暫停正在執行的任務:
scheduler.pause()
也可以恢復任務:
scheduler.resume()
同時,也可以在調度器啟動時,默認所有任務設為暫停狀態。
scheduler.start(paused=True)
2.0 限制任務執行的實例並行數
默認情況下,在同一時間,一個任務只允許一個執行中的實例在運行。
比如說,一個任務是每5秒執行一次,但是這個任務在第一次執行的時候花了6秒,也就是說前一次任務還沒執行完,
后一次任務又觸發了,由於默認一次只允許一個實例執行,所以第二次就丟失了。為了杜絕這種情況,可以在添加任務時,
設置max_instances
參數,為指定任務設置最大實例並行數。
2.1 丟失任務的執行與合並
有時,任務會由於一些問題沒有被執行。
最常見的情況就是,在數據庫里的任務到了該執行的時間,但調度器被關閉了,那么這個任務就成了“啞彈任務”。
錯過執行時間后,調度器才打開了。這時,調度器會檢查每個任務的misfire_grace_time
參數int
值,即啞彈上限,來確定是否還執行啞彈任務(這個參數可以全局設定的或者是為每個任務單獨設定)。
此時,一個啞彈任務,就可能會被連續執行多次。
但這就可能導致一個問題,有些啞彈任務實際上並不需要被執行多次。coalescing
合並參數就能把一個多次的啞彈任務揉成一個一次的啞彈任務。
也就是說,coalescing
為True
能把多個排隊執行的同一個啞彈任務,變成一個,而不會觸發啞彈事件。
注!如果是由於線程池/進程池滿了導致的任務延遲,執行器就會跳過執行。要避免這個問題,可以添加進程或線程數來實現或把
misfire_grace_time
值調高。
2.2 調度器事件
調度器允許添加事件偵聽器。部分事件會有特有的信息,比如當前運行次數等。add_listener(callback,mask)
中,第一個參數是回調對象,mask
是指定偵聽事件類型,mask
參數也可以是邏輯組合。回調對象會有一個參數就是觸發的事件。
具體可以查看文檔中events
模塊,里面有關於事件類型以及事件參數的詳細說明。
def my_listener(event): if event.exception: print('The job crashed :(') else: print('The job worked :)') # 當任務執行完或任務出錯時,調用my_listener scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
事件類型
Constant | Description | Event class |
---|---|---|
EVENT_SCHEDULER_STARTED | 計划程序已啟動 |
SchedulerEvent |
EVENT_SCHEDULER_SHUTDOWN | 調度程序已關閉 | SchedulerEvent |
EVENT_SCHEDULER_PAUSED | 計划程序中的作業處理已暫停 | SchedulerEvent |
EVENT_SCHEDULER_RESUMED | 計划程序中的作業處理已恢復 | SchedulerEvent |
EVENT_EXECUTOR_ADDED | 已將執行器添加到計划程序中 | SchedulerEvent |
EVENT_EXECUTOR_REMOVED | 一個執行器被移到調度程序中 | SchedulerEvent |
EVENT_JOBSTORE_ADDED | 已將作業存儲添加到計划程序 | SchedulerEvent |
EVENT_JOBSTORE_REMOVED | 已從計划程序中刪除作業存儲 |
SchedulerEvent |
EVENT_ALL_JOBS_REMOVED | 所有作業都已從所有作業存儲或一個特定作業存儲中刪除 | SchedulerEvent |
EVENT_JOB_ADDED | 作業已添加到作業存儲中 |
JobEvent |
EVENT_JOB_REMOVED | 已從作業存儲中刪除作業 | JobEvent |
EVENT_JOB_MODIFIED | 已從計划程序外部修改作業 | JobEvent |
EVENT_JOB_SUBMITTED | 作業已提交給其執行者以運行 |
JobSubmissionEvent |
EVENT_JOB_MAX_INSTANCES | 提交給其執行者的作業未被執行者接受,因為該作業已達到其最大並發執行實例數 |
JobSubmissionEvent |
EVENT_JOB_EXECUTED | 作業已成功執行 |
JobExecutionEvent |
EVENT_JOB_ERROR | 作業在執行期間引發異常 |
JobExecutionEvent |
EVENT_JOB_MISSED | 錯失了一份工作 |
JobExecutionEvent |
EVENT_ALL | 包含所有事件類型的全部捕獲掩碼 |
N/A |
2.3 異常捕獲
通過logging模塊,可以添加apscheduler
日志至DEBUG
級別,這樣就能捕獲異常信息。
關於logging
初始化的方式如下:
import logging logging.basicConfig() logging.getLogger('apscheduler').setLevel(logging.DEBUG)
日志會提供很多調度器的內部運行信息。
文章參考鏈接:https://apscheduler.readthedocs.io/en/latest/index.html