簡介
APScheduler
(以下簡稱APS)框架可以讓用戶定時執行或者周期性執行Python任務。既可以添加任務也可以刪除任務,還可以將任務存儲在數據庫中。當APS重啟之后,還會繼續執行之前設置的任務。
APS是跨平台的,注意APS既不是守護進程也不是服務,更不是命令行程序。APS是進程內的調度器,也就是說它的實現原理是在進程內產生內置的阻塞來創建定時服務,以便在預定的時間內執行某個任務。
APS支持以下三種定時任務:
crontab
類型任務- 固定時間間隔任務
- 基於日期時間的一次性任務
你可以使用上面任意一種定時任務或者組合任務。
APS支持以下面幾種方式保存:
- 內存
- SQLAlchemy
- MongoDB
- Redis
APS也可以整合進其他幾個Python框架:
- asyncio
- gevent
- Tornado
- Twisted
- Qt
安裝
使用pip進行安裝:
$ pip install apscheduler
|
如果你沒有安裝pip,可以通過下面鏈接進行安裝get-pip.py
如果不能通過pip下載,也可以直接下載APS安裝包,解壓再進行安裝
$ python setup.py install
|
基礎概念
APS由以下幾部分組成:
- 觸發器(triggers)
- 任務倉庫(job stores)
- 執行器(executors)
- 調度器(schedulers)
觸發器包含了所有定時任務邏輯,每個任務都有一個對應的觸發器,觸發器決定任何的何時執行,初始配置情況下,觸發器是無狀態的。
任務倉庫保存要執行的任務,其中一個默認的任務倉庫將任務保存在內存中,而另外幾個任務倉庫將任務保存在數據庫中。在將任務保存到任務倉庫前,會對任務執行序列化操作,當重新讀取任務時,再執行反序列化操作。除了默認的任務倉庫,其他任務倉庫都不會在內存中保存任務,而是作為任務保存、加載、更新以及搜索的一個中間件。任務倉庫在定時器之間不能共享。
執行器用來執行定時任務,它只是將要執行的任務放在新的線程或者線程池中運行。執行完畢之后,再通知定時器。
調度器將其它幾個組件聯系在一起,一般在應用中只有一個調度器,程序開發者不會直接操作觸發器、任務倉庫或執行器,相反,調度器提供了這個接口。任務倉庫以及執行器的配置都是通過調度器來實現的。
選擇合適的調度器、執行器以及任務倉庫
選擇調度器是根據我們的開發環境與實際應用來決定的,下面是一些常用調度器:
- BlockingScheduler:適合於只在進程中運行單個任務的情況
- BackgroundScheduler: 適合於要求任何在程序后台運行的情況
AsyncIOScheduler
:適合於使用asyncio框架的情況GeventScheduler
: 適合於使用gevent框架的情況- TornadoScheduler: 適合於使用Tornado框架的應用
TwistedScheduler
: 適合使用Twisted框架的應用QtScheduler
: 適合使用QT的情況
而任務倉庫,如果是非持久任務,使用MemoryStore
就可以了,如果是持久性任務,那么久需要根據編程環境進行選擇了。
大多數情況下,執行器選擇ThreadPoolExecutor
就可以了,但是如果涉及到比較耗CPU的任務,就可以選擇ProcessPoolExecutor
,以充分利用多核CPU。,當然也可以同時使用兩個執行器。
可以在相應的API中找到對應的任務倉庫以及執行器。
配置任務調度器
APS提供了多種調度器配置方法,既可以使用配置字典,也可以直接傳遞配置參數給調度器,還可以先初始化調度器,添加完任務之后,最后再來配置調度器。
相關的配置參數可以參考API文檔。
舉個簡單例子:
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
# Initialize the rest of the application here, or before the scheduler initialization
|
上面的代碼生成一個后台調度器,使用默認名為default
的MemoryJobStore
,以及默認名為default
的ThreadPoolExecutor
,最大線程數為10。
現在假設你想使用兩個任務倉庫以及兩個執行器,並且還想調整下任務的默認參數。可以使用下面三種方法,所完成的功能是一樣的。
方法1:
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
|
方法2:
from apscheduler.schedulers.background import BackgroundScheduler
# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
'apscheduler.jobstores.mongo': {
'type': 'mongodb'
},
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '5'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'UTC',
})
|
方法3:
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
jobstores = {
'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler()
# .. do something else here, maybe add jobs etc.
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
|
啟動調度器
使用start()
方法啟動調度器,BlockingScheduler
需要在初始化之后才能執行start()
,對於其他的Scheduler,調用start()
方法都會直接返回,然后可以繼續執行后面的初始化操作。
調度器啟動之后,就不能更改它的配置了。
添加任務
有兩種添加任務的辦法:
- 調用
add_job()
- 使用
scheduled_job()
修飾器
第一個方法是使用最多的,因為調用它會返回一個apscheduler.job.Job
實例,后續可以對它進行修改或者刪除,而使用修飾器添加的任務添加之后就不能進行修改。
在調度器中設置定時任務,如果任務添加的時候,調度器還沒有啟動,那么任務只是暫時放到調度器中,當調度器啟動之后重新計算第一次執行時間。
需要注意的是,可以使用執行器或者任務倉庫來序列化任務,但是這個任務必須滿足兩個條件:
- 調用對象必須全局可訪問
- 調用對象的參數必須也可以序列化。
在所有內置的任務倉庫中,只有MemoryJobStore
不能序列化對象;在所有內置的執行器中,只有ProcessPoolExecutors
會序列化任務。
如果你需要在應用中使用持久性任務,那就必須給任務定義一個ID,並設置replace_existing=True
,否則每次重啟應用都會返回一個新的任務。
如果要立即執行任務,只需要在添加任務的時候省略trigger
參數。
刪除任務
當從調度器中刪除任務的時候,就會從相關聯的任務倉庫中刪除任務,后面就不會再執行了,有兩種方法來刪除任務。
- 調用
remove_job()
,參數為job ID以及任務倉庫名 - 調用
remove()
第二種方法用起來更加方便,但是需要先保存添加任務時返回的實例對象;而通過scheduled_job()
添加的任務,只能使用第一種方法進行刪除。
如果任務執行完畢,它會自動被刪除。
例如:
job = scheduler.add_job(myfunc,
'interval', minutes=2)
job.remove()
|
同樣的,如果顯式使用任務ID,則使用下面的方法:
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
|
暫停與恢復任務
暫停與恢復任務也很簡單,可以直接操作任務實例或者調度器來實現,當任務暫停時,它的運行時間會被重置,暫停期間不會計算進去,重啟之后又算進去。
暫停任務可以使用以下兩種方法:
apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()
恢復任務可以使用以下兩種方法:
apscheduler.job.Job.resume()
apscheduler.schedulers.BaseScheduler.resume_job()
獲取任務列表
可以使用get_jobs()
方法來獲取當前正在處理的任務列表,如果只是想獲取某個任務倉庫中的任務列表,可以使用任務倉庫名作為參數傳入。
APS還提供了一個print_jobs()
方法來打印格式化的任務列表。
修改任務
使用apscheduler.job.Job.modify()
或者modify_job()
方法可以修改任務的屬性,你可以修改任務的任意屬性,除了id
。
例如:
job.modify(max_instances=
6, name='Alternate name')
|
如果你想重新調度某個任務,例如改變它的觸發器,則可以使用apscheduler.job.Job.reschedule()
或者reschedule_job()
方法,這個兩個方法都可以為任務重新創建一個觸發器,並重新計算任務的運行時間。
例如:
scheduler.reschedule_job(
'my_job_id', trigger='cron', minute='*/5')
|
關閉調度器
使用下面的方法關閉調度器:
scheduler.shutdown()
|
默認情況下,scheduler會關閉它的任務倉庫以及執行器,並等待所有正在執行的任務執行完。如果你不想等待,可以這樣操作:
scheduler.shutdown(wait=
False)
|
限制任務實例並發執行的個數
默認情況下,只允許同時執行一個任務實例,通過max_instances
參數來設置允許並發執行任務的個數。
錯失任務的合並
有時候調度器可能無法按時執行某個任務,最常見的情況就是當某個持久性任務保存在任務倉庫中時,調度器關閉之后再重啟,但是任務需要在重啟之前被執行,這時這個任務就被錯過了。調度器會根據任務misfire_grace_time
參數的配置來決定錯過的任務還要不要繼續執行。這樣做的話可能導致重啟調度器之后會連續執行好幾個任務。
調度器事件
可以給調度器添加事件監聽器,調度器事件只有在某些情況下才會被觸發,並且可以攜帶某些有用的信息。通過給add_listener()
傳遞合適的mask
參數,可以只監聽幾種特定的事件類型。
例如:
def my_listener(event):
if event.exception:
print(
'The job crashed :(')
else:
print(
'The job worked :)')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
|