定時框架APScheduler
APSScheduler是python的一個定時任務框架,它提供了基於日期date、固定時間間隔interval、以及linux上的crontab類型的定時任務。該框架不僅可以添加、刪除定時任務,還可以將任務存儲到數據庫中、實現任務的持久化。
APScheduler有四種組件
-
triggers(觸發器):觸發器包含調度邏輯,每一個作業有它自己的觸發器,用於決定接下來哪一個作業會運行,除了他們自己初始化配置外,觸發器完全是無狀態的。
-
job stores(作業存儲):用來存儲被調度的作業,默認的作業存儲器是簡單地把作業任務保存在內存中,其它作業存儲器可以將任務作業保存到各種數據庫中,支持MongoDB、Redis、SQLAlchemy存儲方式。當對作業任務進行持久化存儲的時候,作業的數據將被序列化,重新讀取作業時在反序列化。
-
executors(執行器):執行器用來執行定時任務,只是將需要執行的任務放在新的線程或者線程池中運行。當作業任務完成時,執行器將會通知調度器。對於執行器,默認情況下選擇ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任務如比較消耗CPU的任務則可以選擇ProcessPoolExecutor,當然根據根據實際需求可以同時使用兩種執行器。
-
schedulers(調度器):調度器是將其它部分聯系在一起,一般在應用程序中只有一個調度器,應用開發者不會直接操作觸發器、任務存儲以及執行器,相反調度器提供了處理的接口。通過調度器完成任務的存儲以及執行器的配置操作,如可以添加。修改、移除任務作業
APScheduler提供了七種調度器
- BlockingScheduler:適合於只在進程中運行單個任務的情況,通常在調度器是你唯一要運行的東西時使用。
- BackgroundScheduler: 適合於要求任何在程序后台運行的情況,當希望調度器在應用后台執行時使用。
- AsyncIOScheduler:適合於使用asyncio異步框架的情況
- GeventScheduler: 適合於使用gevent框架的情況
- TornadoScheduler: 適合於使用Tornado框架的應用
- TwistedScheduler: 適合使用Twisted框架的應用
- QtScheduler: 適合使用QT的情況
APScheduler提供了四種存儲方式
- MemoryJobStore
- sqlalchemy
- mongodb
- redis
APScheduler提供了三種任務觸發器
- data:固定日期觸發器:任務只運行一次,運行完畢自動清除;若錯過指定運行時間,任務不會被創建
- interval:時間間隔觸發器
- cron:cron風格的任務觸發
示例
示例1 BlockingScheduler
- BlockingScheduler:在進程中運行單個任務,調度器是唯一運行的東西
- 該示例代碼生成了一個BlockingScheduler調度器,使用了默認的任務存儲MemoryJobStore,以及默認的執行器ThreadPoolExecutor,並且最大線程數為10。
定義job
def jod1():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ' #####job1 ' + str(random.randint(0, 10)))
定義run方法
def run1():
scheduler = BlockingScheduler()
scheduler.add_job(jod1, 'interval', seconds=5)
scheduler.start()
完整代碼
import time
import random
from apscheduler.schedulers.blocking import BlockingScheduler
def jod1():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ' #####job1 ' + str(random.randint(0, 10)))
def run1():
scheduler = BlockingScheduler()
scheduler.add_job(jod1, 'interval', seconds=5)
scheduler.start()
if __name__ == '__main__':
run1()
運行結果
2019-05-16 11:09:05 #####job1 2
2019-05-16 11:09:10 #####job1 5
2019-05-16 11:09:15 #####job1 1
2019-05-16 11:09:20 #####job1 0
2019-05-16 11:09:25 #####job1 8
2019-05-16 11:09:30 #####job1 0
2019-05-16 11:09:35 #####job1 9
2019-05-16 11:09:40 #####job1 1
2019-05-16 11:09:45 #####job1 4
......
測試1
我們在示例1的基礎上再加一個job2,看看什么情況
job2示例代碼
def jod2():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ' #####job2 ' + str(random.randint(0, 10)))
run2示例代碼
def run2():
scheduler = BlockingScheduler()
scheduler.add_job(jod1, 'interval', seconds=5)
scheduler.add_job(jod2, 'interval', seconds=5)
scheduler.start()
實行結果:
2019-05-16 11:10:46 #####job2 9
2019-05-16 11:10:46 #####job1 2
2019-05-16 11:10:51 #####job2 3
2019-05-16 11:10:51 #####job1 2
2019-05-16 11:10:56 #####job2 9
2019-05-16 11:10:56 #####job1 1
2019-05-16 11:11:01 #####job2 9
2019-05-16 11:11:01 #####job1 1
2019-05-16 11:11:06 #####job2 2
2019-05-16 11:11:06 #####job1 5
2019-05-16 11:11:11 #####job2 6
2019-05-16 11:11:11 #####job1 7
2019-05-16 11:11:16 #####job2 1
2019-05-16 11:11:16 #####job1 6
......
從執行結果來看,在同一個調度器中添加2個job,這兩個job會在同一時刻同時執行
測試2
我們將job的運行時間強制改為6秒,看看調度器會怎么處理
修改后的job1
def jod1():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ' #####job1 ' + str(random.randint(0, 10)))
time.sleep(6)
run1 不變,運行結果
2019-05-16 11:18:11 #####job1 1
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 11:18:16 CST)" skipped: maximum number of running instances reached (1)
2019-05-16 11:18:21 #####job1 9
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 11:18:26 CST)" skipped: maximum number of running instances reached (1)
2019-05-16 11:18:31 #####job1 5
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 11:18:36 CST)" skipped: maximum number of running instances reached (1)
2019-05-16 11:18:41 #####job1 0
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 11:18:46 CST)" skipped: maximum number of running instances reached (1)
從執行結果來看,由於job的運行時間超過了執行器需要執行時的時間,所以本次任務跳過,就變成了10秒執行一次的任務了。
示例2 BackgroundScheduler
完整示例:
import time
import random
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
def jod1():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ' #####job1 ' + str(random.randint(0, 10)))
time.sleep(6)
def run3():
scheduler = BackgroundScheduler()
scheduler.add_job(jod1, 'interval', seconds=5)
scheduler.start()
while True:
print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
time.sleep(2)
print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
if __name__ == '__main__':
# run1()
# run2()
run3()
測試1
開啟一個任務,然后另外開啟一個線程
def run3():
scheduler = BackgroundScheduler()
scheduler.add_job(jod1, 'interval', seconds=5)
scheduler.start()
while True:
print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
time.sleep(2)
print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
運行結果
main-start: 2019-05-16 18:42:13
main-end: 2019-05-16 18:42:15
main-start: 2019-05-16 18:42:15
main-end: 2019-05-16 18:42:17
main-start: 2019-05-16 18:42:17
2019-05-16 18:42:18 #####job1 6
main-end: 2019-05-16 18:42:19
main-start: 2019-05-16 18:42:19
main-end: 2019-05-16 18:42:21
main-start: 2019-05-16 18:42:21
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 18:42:23 CST)" skipped: maximum number of running instances reached (1)
main-end: 2019-05-16 18:42:23
main-start: 2019-05-16 18:42:23
main-end: 2019-05-16 18:42:25
main-start: 2019-05-16 18:42:25
main-end: 2019-05-16 18:42:27
main-start: 2019-05-16 18:42:27
2019-05-16 18:42:28 #####job1 5
main-end: 2019-05-16 18:42:29
main-start: 2019-05-16 18:42:29
main-end: 2019-05-16 18:42:31
main-start: 2019-05-16 18:42:31
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 18:42:33 CST)" skipped: maximum number of running instances reached (1)
main-end: 2019-05-16 18:42:33
main-start: 2019-05-16 18:42:33
main-end: 2019-05-16 18:42:35
main-start: 2019-05-16 18:42:35
main-end: 2019-05-16 18:42:37
main-start: 2019-05-16 18:42:37
2019-05-16 18:42:38 #####job1 1
......
示例3 采用cron的方式
采用cron的方式來調度任務
def run4():
scheduler = BackgroundScheduler()
scheduler.add_job(jod1, 'cron', day_of_week='fri', second='*/5')
scheduler.start()
while True:
print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
time.sleep(2)
print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))