python任務調度框架apscheduler【轉】


簡介


APScheduler(以下簡稱APS)框架可以讓用戶定時執行或者周期性執行Python任務。既可以添加任務也可以刪除任務,還可以將任務存儲在數據庫中。當APS重啟之后,還會繼續執行之前設置的任務。
APS是跨平台的,注意APS既不是守護進程也不是服務,更不是命令行程序。APS是進程內的調度器,也就是說它的實現原理是在進程內產生內置的阻塞來創建定時服務,以便在預定的時間內執行某個任務。
APS支持以下三種定時任務:

  • crontab類型任務
  • 固定時間間隔任務
  • 基於日期時間的一次性任務

你可以使用上面任意一種定時任務或者組合任務。
APS支持以下面幾種方式保存:

  • 內存
  • SQLAlchemy
  • MongoDB
  • Redis

APS也可以整合進其他幾個Python框架:

  • asyncio
  • gevent
  • Tornado
  • Twisted
  • Qt

安裝


使用pip進行安裝:

$ pip install apscheduler

 

如果你沒有安裝pip,可以通過下面鏈接進行安裝get-pip.py
如果不能通過pip下載,也可以直接下載APS安裝包,解壓再進行安裝

$ python setup.py install

 

基礎概念

APS由以下幾部分組成:

  • 觸發器(triggers)
  • 任務倉庫(job stores)
  • 執行器(executors)
  • 調度器(schedulers)

觸發器包含了所有定時任務邏輯,每個任務都有一個對應的觸發器,觸發器決定任何的何時執行,初始配置情況下,觸發器是無狀態的。

任務倉庫保存要執行的任務,其中一個默認的任務倉庫將任務保存在內存中,而另外幾個任務倉庫將任務保存在數據庫中。在將任務保存到任務倉庫前,會對任務執行序列化操作,當重新讀取任務時,再執行反序列化操作。除了默認的任務倉庫,其他任務倉庫都不會在內存中保存任務,而是作為任務保存、加載、更新以及搜索的一個中間件。任務倉庫在定時器之間不能共享。

執行器用來執行定時任務,它只是將要執行的任務放在新的線程或者線程池中運行。執行完畢之后,再通知定時器。

調度器將其它幾個組件聯系在一起,一般在應用中只有一個調度器,程序開發者不會直接操作觸發器、任務倉庫或執行器,相反,調度器提供了這個接口。任務倉庫以及執行器的配置都是通過調度器來實現的。

選擇合適的調度器、執行器以及任務倉庫

選擇調度器是根據我們的開發環境與實際應用來決定的,下面是一些常用調度器:

  • BlockingScheduler:適合於只在進程中運行單個任務的情況
  • BackgroundScheduler: 適合於要求任何在程序后台運行的情況
  • AsyncIOScheduler:適合於使用asyncio框架的情況
  • GeventScheduler: 適合於使用gevent框架的情況
  • TornadoScheduler: 適合於使用Tornado框架的應用
  • TwistedScheduler: 適合使用Twisted框架的應用
  • QtScheduler: 適合使用QT的情況

而任務倉庫,如果是非持久任務,使用MemoryStore就可以了,如果是持久性任務,那么久需要根據編程環境進行選擇了。

大多數情況下,執行器選擇ThreadPoolExecutor就可以了,但是如果涉及到比較耗CPU的任務,就可以選擇ProcessPoolExecutor,以充分利用多核CPU。,當然也可以同時使用兩個執行器。

可以在相應的API中找到對應的任務倉庫以及執行器。

配置任務調度器

APS提供了多種調度器配置方法,既可以使用配置字典,也可以直接傳遞配置參數給調度器,還可以先初始化調度器,添加完任務之后,最后再來配置調度器。
相關的配置參數可以參考API文檔。
舉個簡單例子:

from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
# Initialize the rest of the application here, or before the scheduler initialization

上面的代碼生成一個后台調度器,使用默認名為defaultMemoryJobStore,以及默認名為defaultThreadPoolExecutor,最大線程數為10。

現在假設你想使用兩個任務倉庫以及兩個執行器,並且還想調整下任務的默認參數。可以使用下面三種方法,所完成的功能是一樣的。

方法1:

from pytz import utc
 
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
 
 
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

方法2:

from apscheduler.schedulers.background import BackgroundScheduler
 
 
# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
'apscheduler.jobstores.mongo': {
'type': 'mongodb'
},
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '5'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'UTC',
})

方法3:

from pytz import utc
 
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
 
 
jobstores = {
'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler()
 
# .. do something else here, maybe add jobs etc.
 
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

啟動調度器

使用start()方法啟動調度器,BlockingScheduler需要在初始化之后才能執行start(),對於其他的Scheduler,調用start()方法都會直接返回,然后可以繼續執行后面的初始化操作。

調度器啟動之后,就不能更改它的配置了。

添加任務

有兩種添加任務的辦法:

  • 調用add_job()
  • 使用scheduled_job()修飾器

第一個方法是使用最多的,因為調用它會返回一個apscheduler.job.Job實例,后續可以對它進行修改或者刪除,而使用修飾器添加的任務添加之后就不能進行修改。

在調度器中設置定時任務,如果任務添加的時候,調度器還沒有啟動,那么任務只是暫時放到調度器中,當調度器啟動之后重新計算第一次執行時間。

需要注意的是,可以使用執行器或者任務倉庫來序列化任務,但是這個任務必須滿足兩個條件:

  1. 調用對象必須全局可訪問
  2. 調用對象的參數必須也可以序列化。

在所有內置的任務倉庫中,只有MemoryJobStore不能序列化對象;在所有內置的執行器中,只有ProcessPoolExecutors會序列化任務。

如果你需要在應用中使用持久性任務,那就必須給任務定義一個ID,並設置replace_existing=True,否則每次重啟應用都會返回一個新的任務。

如果要立即執行任務,只需要在添加任務的時候省略trigger參數。

刪除任務

當從調度器中刪除任務的時候,就會從相關聯的任務倉庫中刪除任務,后面就不會再執行了,有兩種方法來刪除任務。

  1. 調用remove_job(),參數為job ID以及任務倉庫名
  2. 調用remove()

第二種方法用起來更加方便,但是需要先保存添加任務時返回的實例對象;而通過scheduled_job()添加的任務,只能使用第一種方法進行刪除。

如果任務執行完畢,它會自動被刪除。

例如:

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

同樣的,如果顯式使用任務ID,則使用下面的方法:

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

暫停與恢復任務

暫停與恢復任務也很簡單,可以直接操作任務實例或者調度器來實現,當任務暫停時,它的運行時間會被重置,暫停期間不會計算進去,重啟之后又算進去。

暫停任務可以使用以下兩種方法:

  • apscheduler.job.Job.pause()
  • apscheduler.schedulers.base.BaseScheduler.pause_job()

恢復任務可以使用以下兩種方法:

  • apscheduler.job.Job.resume()
  • apscheduler.schedulers.BaseScheduler.resume_job()

獲取任務列表

可以使用get_jobs()方法來獲取當前正在處理的任務列表,如果只是想獲取某個任務倉庫中的任務列表,可以使用任務倉庫名作為參數傳入。

APS還提供了一個print_jobs()方法來打印格式化的任務列表。

修改任務

使用apscheduler.job.Job.modify()或者modify_job()方法可以修改任務的屬性,你可以修改任務的任意屬性,除了id

例如:

job.modify(max_instances= 6, name='Alternate name')

如果你想重新調度某個任務,例如改變它的觸發器,則可以使用apscheduler.job.Job.reschedule()或者reschedule_job()方法,這個兩個方法都可以為任務重新創建一個觸發器,並重新計算任務的運行時間。

例如:

scheduler.reschedule_job( 'my_job_id', trigger='cron', minute='*/5')

關閉調度器

使用下面的方法關閉調度器:

scheduler.shutdown()

默認情況下,scheduler會關閉它的任務倉庫以及執行器,並等待所有正在執行的任務執行完。如果你不想等待,可以這樣操作:

scheduler.shutdown(wait= False)

限制任務實例並發執行的個數

默認情況下,只允許同時執行一個任務實例,通過max_instances參數來設置允許並發執行任務的個數。

錯失任務的合並

有時候調度器可能無法按時執行某個任務,最常見的情況就是當某個持久性任務保存在任務倉庫中時,調度器關閉之后再重啟,但是任務需要在重啟之前被執行,這時這個任務就被錯過了。調度器會根據任務misfire_grace_time參數的配置來決定錯過的任務還要不要繼續執行。這樣做的話可能導致重啟調度器之后會連續執行好幾個任務。

調度器事件

可以給調度器添加事件監聽器,調度器事件只有在某些情況下才會被觸發,並且可以攜帶某些有用的信息。通過給add_listener()傳遞合適的mask參數,可以只監聽幾種特定的事件類型。

例如:

def my_listener(event):
if event.exception:
print( 'The job crashed :(')
else:
print( 'The job worked :)')
 
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM