APScheduler
APScheduler是基於Quartz的一個Python定時任務框架,實現了Quartz的所有功能,使用起來十分方便。提供了基於日期、固定時間間隔以及crontab類型的任務,並且可以持久化任務。
APScheduler提供了多種不同的調度器,方便開發者根據自己的實際需要進行使用;同時也提供了不同的存儲機制,可以方便與Redis,數據庫等第三方的外部持久化機制進行協同工作,總之功能非常強大和易用。
安裝
使用 pip 包管理工具安裝 APScheduler 是最方便快捷的。
APScheduler的主要的調度類
在APScheduler中有以下幾個非常重要的概念,需要大家理解:
- 觸發器(trigger)
包含調度邏輯,每一個作業有它自己的觸發器,用於決定接下來哪一個作業會運行,根據trigger中定義的時間點,頻率,時間區間等等參數設置。除了他們自己初始配置以外,觸發器完全是無狀態的。
- 作業存儲(job store)
存儲被調度的作業,默認的作業存儲是簡單地把作業保存在內存中,其他的作業存儲是將作業保存在數據庫中。一個作業的數據講在保存在持久化作業存儲時被序列化,並在加載時被反序列化。調度器不能分享同一個作業存儲。job store支持主流的存儲機制:redis, mongodb, 關系型數據庫, 內存等等
- 執行器(executor)
處理作業的運行,他們通常通過在作業中提交制定的可調用對象到一個線程或者進城池來進行。當作業完成時,執行器將會通知調度器。基於池化的操作,可以針對不同類型的作業任務,更為高效地使用cpu的計算資源。
- 調度器(scheduler)
通常在應用只有一個調度器,調度器提供了處理這些的合適的接口。配置作業存儲和執行器可以在調度器中完成,例如添加、修改和移除作業。
這里簡單列一下常用的若干調度器:
- BlockingScheduler:僅可用在當前你的進程之內,與當前的進行共享計算資源
- BackgroundScheduler: 在后台運行調度,不影響當前的系統計算運行
- AsyncIOScheduler: 如果當前系統中使用了async module,則需要使用異步的調度器
- GeventScheduler: 如果使用了gevent,則需要使用該調度
- TornadoScheduler: 如果使用了Tornado, 則使用當前的調度器
- TwistedScheduler:Twister應用的調度器
- QtScheduler: Qt的調度器
APScheduler提供的多種調度器,可以根據具體需求來選擇合適的調度器:
BlockingScheduler:適合於只在進程中運行單個任務的情況,通常在調度器是你唯一要運行的東西時使用。
BackgroundScheduler: 適合於要求任何在程序后台運行的情況,當希望調度器在應用后台執行時使用。
AsyncIOScheduler:適合於使用asyncio框架的情況
GeventScheduler: 適合於使用gevent框架的情況
TornadoScheduler: 適合於使用Tornado框架的應用
TwistedScheduler: 適合使用Twisted框架的應用
QtScheduler: 適合使用QT的情況
觸發器組件(trigger)
調用方式
add_job() 中 trigger 參數為調用方式,有 interval, day, cron 三種值
- date 日期:觸發任務運行的具體日期
- interval 間隔:觸發任務運行的時間間隔
- cron 周期:觸發任務運行的周期
觸發器date
特定的時間點觸發,只執行一次。參數如下:
使用例子:
from datetime import datetime from datetime import date from apscheduler.schedulers.blocking import BlockingScheduler def job(text): print(text) scheduler = BlockingScheduler() # 在 2019-8-30 運行一次 job 方法 scheduler.add_job(job, 'date', run_date=date(2019, 8, 30), args=['text1']) # 在 2019-8-30 01:00:00 運行一次 job 方法 scheduler.add_job(job, 'date', run_date=datetime(2019, 8, 30, 1, 0, 0), args=['text2']) # 在 2019-8-30 01:00:01 運行一次 job 方法 scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=['text3']) scheduler.start()
觸發器interval
固定時間間隔觸發。參數如下:
使用例子:
import time from apscheduler.schedulers.blocking import BlockingScheduler def job(text): t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) print('{} --- {}'.format(text, t)) scheduler = BlockingScheduler() # 每隔 1分鍾 運行一次 job 方法 scheduler.add_job(job, 'interval', minutes=1, args=['job1']) # 在 2019-08-29 22:15:00至2019-08-29 22:17:00期間,每隔1分30秒 運行一次 job 方法 scheduler.add_job(job, 'interval', minutes=1, seconds = 30, start_date='2019-08-29 22:15:00', end_date='2019-08-29 22:17:00', args=['job2']) scheduler.start() ''' 運行結果: job2 --- 2019-08-29 22:15:00 job1 --- 2019-08-29 22:15:46 job2 --- 2019-08-29 22:16:30 job1 --- 2019-08-29 22:16:46 job1 --- 2019-08-29 22:17:46 ...余下省略... '''
觸發器cron
在特定時間周期性地觸發。參數如下:
(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)
除了week和 day_of_week,它們的默認值是*
例如day=1, minute=20
,這就等於year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0
,工作將在每個月的第一天以每小時20分鍾的時間執行
表達式類型
注!month和day_of_week參數分別接受的是英語縮寫jan– dec 和 mon – sun
當設置的時間間隔小於,任務的執行時間,線程會阻塞住,等待執行完了才能執行下一個任務,可以設置max_instance
指定一個任務同一時刻有多少個實例在運行,默認為1
使用例子:
import time from apscheduler.schedulers.blocking import BlockingScheduler def job(text): t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) print('{} --- {}'.format(text, t)) scheduler = BlockingScheduler() # 在每天22點,每隔 1分鍾 運行一次 job 方法 scheduler.add_job(job, 'cron', hour=22, minute='*/1', args=['job1']) # 在每天22和23點的25分,運行一次 job 方法 scheduler.add_job(job, 'cron', hour='22-23', minute='25', args=['job2']) scheduler.start() ''' 運行結果: job1 --- 2019-08-29 22:25:00 job2 --- 2019-08-29 22:25:00 job1 --- 2019-08-29 22:26:00 job1 --- 2019-08-29 22:27:00 ...余下省略... '''
添加任務
添加任務的方法有兩種:
- 通過調用add_job()
- 通過裝飾器scheduled_job()
第一種方法是最常用的方法。第二種方法主要是方便地聲明在應用程序運行時不會更改的任務。該 add_job()方法返回一個apscheduler.job.Job實例,可以使用該實例稍后修改或刪除該任務。
import time from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler() @scheduler.scheduled_job('interval', seconds=5) def job1(): t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) print('job1 --- {}'.format(t)) @scheduler.scheduled_job('cron', second='*/7') def job2(): t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) print('job2 --- {}'.format(t)) scheduler.start() ''' 運行結果: job2 --- 2019-08-29 22:36:35 job1 --- 2019-08-29 22:36:37 job2 --- 2019-08-29 22:36:42 job1 --- 2019-08-29 22:36:42 job1 --- 2019-08-29 22:36:47 job2 --- 2019-08-29 22:36:49 ...余下省略... '''
移除作業
移除 job 也有兩種方法:remove_job()
和 job.remove()
。
remove_job() 是根據 job 的 id 來移除,所以要在 job 創建的時候指定一個 id。
job.remove() 則是對 job 執行 remove 方法即可
scheduler.add_job(job_func, 'interval', minutes=2, id='job_one') scheduler.remove_job(job_one) job = add_job(job_func, 'interval', minutes=2, id='job_one') job.remvoe()
獲取 job 列表
通過 scheduler.get_jobs()
方法能夠獲取當前調度器中的所有 job 的列表
暫停作業:
– apscheduler.job.Job.pause()
– apscheduler.schedulers.base.BaseScheduler.pause_job()
恢復作業:
– apscheduler.job.Job.resume()
– apscheduler.schedulers.base.BaseScheduler.resume_job()
修改作業 job
可以通過apscheduler.job.Job.modify() or modify_job()來動態修改job的屬性信息,除了job id無法修改之外,都是可以修改的。
job.modify(max_instances=6, name='Alternate name')
另外我們也可以通過apscheduler.job.Job.reschedule() or reschedule_job()動態重新設置trigger,示例如下:
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
關閉調度器
默認情況下調度器會等待所有正在運行的作業完成后,關閉所有的調度器和作業存儲。如果你不想等待,可以將wait選項設置為False。
scheduler.shutdown()
scheduler.shutdown(wait=False)