下載
pip install apscheduler
Hello World
#!/usr/bin/env python from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime sched = BlockingScheduler() def my_job(): print(f'{datetime.now():%H:%M:%S} Hello World ') sched.add_job(my_job, 'interval', seconds=5) # 每5秒執行一次 sched.add_job(my_job, 'cron', hour = '20',minute ='30' ,second = '00') # 每天的20:30:00執行一次 sched.start()
17:17:05 Hello World 17:17:10 Hello World 17:17:15 Hello World 17:17:20 Hello World 17:17:25 Hello World 17:17:30 Hello World
APScheduler 使用起來非常簡單,上面的代碼完成了每個五秒輸出一次信息的功能,它通過如下幾個步驟是實現
* BlockingScheduler 調度器中的一種,該種表示在進程中只運行調度程序時使用。
* sched.add_job() 添加作業,並指定調度方式為 interval,時間間隔為 5 秒
* sched.start() 開始任務
除了上述添加作業的方法,還可以使用裝飾器
@sched.scheduled_job('interval', seconds=5) def my_job(): print(f'{datetime.now():%H:%M:%S} Hello World ')
如果同一個方法被添加到多個任務重,則需要指定任務 id
@sched.scheduled_job('interval', id='my_job', seconds=5)
@sched.scheduled_job('interval', id='my_job1', seconds=3)
def my_job():
print(f'{datetime.now():%H:%M:%S} Hello World ')
基本概念
APScheduler四大組件:
* 觸發器 triggers :用於設定觸發任務的條件
* 任務儲存器 job stores:用於存放任務,把任務存放在內存或數據庫中
* 執行器 executors: 用於執行任務,可以設定執行模式為單線程或線程池
* 調度器 schedulers: 把上方三個組件作為參數,通過創建調度器實例來運行
觸發器
每一個任務都有自己的觸發器,觸發器用於決定任務下次運行的時間。
任務儲存器
默認情況下,任務存放在內存中。也可以配置存放在不同類型的數據庫中。如果任務存放在數據庫中,那么任務的存取有一個序列化和反序列化的過程,同時修改和搜索任務的功能也是由任務儲存器實現。
注!一個任務儲存器不要共享給多個調度器,否則會導致狀態混亂
執行器
任務會被執行器放入線程池或進程池去執行,執行完畢后,執行器會通知調度器。
調度器
一個調度器由上方三個組件構成,一般來說,一個程序只要有一個調度器就可以了。開發者也不必直接操作任務儲存器、執行器以及觸發器,因為調度器提供了統一的接口,通過調度器就可以操作組件,比如任務的增刪改查。
除了剛才用到的調度器,總共有如下幾種
BlockingScheduler 阻塞式調度器:進程中只運行調度程序時使用。
BackgroundScheduler 后台調度器: 當沒有使用任何框架時使用,並希望調度程序在應用程序的后台運行。
AsyncIOScheduler AsyncIO調度器:當應用程序使用 asyncio 模塊時使用
GeventScheduler Gevent調度器:當應用程序使用 gevent 時使用
TornadoScheduler Tornado調度器:當創建 Tornado 應用時使用
TwistedScheduler Twisted調度器:當創建 Twisted 應用時使用
QtScheduler Qt調度器:當創建 Qt 應用時使用
比較常用的是前兩個
任務儲存器的選擇,要看任務是否需要持久化。如果你運行的任務是無狀態的,選擇默認任務儲存器MemoryJobStore就可以應付。但是,如果你需要在程序關閉或重啟時,保存任務的狀態,那么就要選擇持久化的任務儲存器。SQLAlchemyJobStore並搭配PostgreSQL作為后台數據庫。這個方案可以提供強大的數據整合與保護功能。
執行器的選擇,同樣要看你的實際需求。默認的ThreadPoolExecutor線程池執行器方案可以滿足大部分需求。如果,你的程序是計算密集型的,那么最好用ProcessPoolExecutor進程池執行器方案來充分利用多核算力。也可以將ProcessPoolExecutor作為第二執行器,混合使用兩種不同的執行器。
調用方式
add_job() 中 trigger 參數為調用方式,有 interval, day, cron 三種值
date 日期:觸發任務運行的具體日期
interval 間隔:觸發任務運行的時間間隔
cron 周期:觸發任務運行的周期
一個任務也可以設定多種觸發器,比如,可以設定同時滿足所有觸發器條件而觸發,或者滿足一項即觸發。復合觸發器,請查閱一下文檔:鏈接
cron
cron
指定時間調度,參數如下 year (int|str) – 4-digit year month (int|str) – month (1-12) day (int|str) – day of the (1-31) week (int|str) – ISO week (1-53) day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) hour (int|str) – hour (0-23) minute (int|str) – minute (0-59) second (int|str) – second (0-59) start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) end_date (datetime|str) – latest possible date/time to trigger on (inclusive) timezone (datetime.tzinfo|str) – time zone to use forthe date/time calculations (defaults to scheduler timezone) jitter (int|None) – advance or delay the job execution by jitter seconds at most.
注!month和day_of_week參數分別接受的是英語縮寫jan– dec 和 mon – sun
當參數指定字符串時有許多種用法,比如:
# 當前任務會在 6、7、8、11、12 月的第三個周五的 0、1、2、3 點執行 sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
當省略時間參數時,在顯式指定參數之前的參數會被設定為*,之后的參數會被設定為最小值,week 和day_of_week的最小值為*。比如,設定day=1, minute=20等同於設定year=’’, month=’’, day=1, week=’’, day_of_week=’’, hour=’*’, minute=20, second=0,即每個月的第一天,且當分鍾到達20時就觸發。
start_date 和 end_date 可以用來適用時間范圍
# 在2014-05-30 00:00:00前,每周一到每周五 5:30運行 sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
通過 scheduled_job() 裝飾器實現:
@sched.scheduled_job('cron', id='my_job_id', day='last sun') def some_decorated_task(): print("I am printed at 00:00:00 on the last Sunday of every month!")
使用標准crontab表達式:
sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *'))
也可以添加jitter振動參數
# 每小時上下浮動120秒觸發 sched.add_job(job_function, 'cron', hour='*', jitter=120)
interval
時間間隔調用,參數如下
weeks (int) – number of weeks to wait days (int) – number of days to wait hours (int) – number of hours to wait minutes (int) – number of minutes to wait seconds (int) – number of seconds to wait start_date (datetime|str) – starting point for(int i = 0; i < the interval calculation end_date (datetime|str) – latest possible date/time to trigger on timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations jitter (int|None) – advance or delay the job execution by jitter seconds at most.
假如你要做一個活動,定時任務需要指定一個時間區間,就不需要手動去開啟或停止了,只需要像下邊這樣設置 start_date, end_date 即可
sched.add_job(job_function, 'interval', hours=2, start_date='2019-03-23 09:30:00', end_date='2019-04-15 11:00:00')
你可以框定周期開始時間start_date和結束時間end_date。
還可以每2小時觸發
# 每2小時觸發 sched.add_job(job_function, 'interval', hours=2)
jitter振動參數,給每次觸發添加一個隨機浮動秒數,一般適用於多服務器,避免同時運行造成服務擁堵。
# 每小時(上下浮動120秒區間內)運行`job_function` sched.add_job(job_function, 'interval', hours=1, jitter=120)
date
指定時間,但只會執行一次
run_date (datetime|str) – the date/time to run the job at timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
例子
@sched.scheduled_job('date', id='my_job', run_date='2019-03-23 18:25:30')
其中run_date參數可以是date類型、datetime類型或文本類型。
datetime類型(用於精確時間)
在2009年11月6日 16:30:05執行 sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
文本類型
sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['text'])
未指定時間,則會立即執行
# 未顯式指定,那么則立即執行 sched.add_job(my_job, args=['text'])
夏令時問題
有些timezone時區可能會有夏令時的問題。這個可能導致令時切換時,任務不執行或任務執行兩次。避免這個問題,可以使用UTC時間,或提前預知並規划好執行的問題。
APScheduler 有多種不同的配置方法,你可以選擇直接傳字典或傳參的方式創建調度器;也可以先實例一個調度器對象,再添加配置信息。靈活的配置方式可以滿足各種應用場景的需要。
整套的配置選項可以參考API文檔BaseScheduler類。一些調度器子類可能有它們自己特有的配置選項,以及獨立的任務儲存器和執行器也可能有自己特有的配置選項,可以查閱API文檔了解。
下面舉一個例子,創建一個使用默認任務儲存器和執行器的BackgroundScheduler:
from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() # 因為是非阻塞的后台調度器,所以程序會繼續向下執行
這樣就可以創建了一個后台調度器。這個調度器有一個名稱為default的MemoryJobStore(內存任務儲存器)和一個名稱是default且最大線程是10的ThreadPoolExecutor(線程池執行器)。
假如你現在有這樣的需求,兩個任務儲存器分別搭配兩個執行器;同時,還要修改任務的默認參數;最后還要改時區。可以參考下面例子,它們是完全等價的。
名稱為“mongo”的MongoDBJobStore
名稱為“default”的SQLAlchemyJobStore
名稱為“ThreadPoolExecutor ”的ThreadPoolExecutor,最大線程20個
名稱“processpool”的ProcessPoolExecutor,最大進程5個
UTC時間作為調度器的時區
默認為新任務關閉合並模式()
設置新任務的默認最大實例數為3
方法一:
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { 'mongo': MongoDBJobStore(), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
方法二:
from apscheduler.schedulers.background import BackgroundScheduler # The "apscheduler." prefix is hard coded scheduler = BackgroundScheduler({ 'apscheduler.jobstores.mongo': { 'type': 'mongodb' }, 'apscheduler.jobstores.default': { 'type': 'sqlalchemy', 'url': 'sqlite:///jobs.sqlite' }, 'apscheduler.executors.default': { 'class': 'apscheduler.executors.pool:ThreadPoolExecutor', 'max_workers': '20' }, 'apscheduler.executors.processpool': { 'type': 'processpool', 'max_workers': '5' }, 'apscheduler.job_defaults.coalesce': 'false', 'apscheduler.job_defaults.max_instances': '3', 'apscheduler.timezone': 'UTC', })
方法三:
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor jobstores = { 'mongo': {'type': 'mongodb'}, 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': {'type': 'threadpool', 'max_workers': 20}, 'processpool': ProcessPoolExecutor(max_workers=5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler() # ..這里可以添加任務 scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
啟動調度器
啟動調度器是只需調用start()即可。除了BlockingScheduler,非阻塞調度器都會立即返回,可以繼續運行之后的代碼,比如添加任務等。
對於BlockingScheduler,程序則會阻塞在start()位置,所以,要運行的代碼必須寫在start()之前。
注!調度器啟動后,就不能修改配置了。
添加任務
添加任務的方法有兩種:
通過調用add_job()
通過裝飾器scheduled_job()
第一種方法是最常用的;第二種方法是最方便的,但缺點就是運行時,不能修改任務。第一種add_job()方法會返回一個apscheduler.job.Job實例,這樣就可以在運行時,修改或刪除任務。
在任何時候你都能配置任務。但是如果調度器還沒有啟動,此時添加任務,那么任務就處於一個暫存的狀態。只有當調度器啟動時,才會開始計算下次運行時間。
還有一點要注意,如果你的執行器或任務儲存器是會序列化任務的,那么這些任務就必須符合:
回調函數必須全局可用
回調函數參數必須也是可以被序列化的
內置任務儲存器中,只有MemoryJobStore不會序列化任務;內置執行器中,只有ProcessPoolExecutor會序列化任務。
重要提醒! 如果在程序初始化時,是從數據庫讀取任務的,那么必須為每個任務定義一個明確的ID,並且使用replace_existing=True,否則每次重啟程序,你都會得到一份新的任務拷貝,也就意味着任務的狀態不會保存。 建議 如果想要立刻運行任務,可以在添加任務時省略trigger參數
移除任務
如果想從調度器移除一個任務,那么你就要從相應的任務儲存器中移除它,這樣才算移除了。有兩種方式:
調用remove_job(),參數為:任務ID,任務儲存器名稱
在通過add_job()創建的任務實例上調用remove()方法
第二種方式更方便,但前提必須在創建任務實例時,實例被保存在變量中。對於通過scheduled_job()創建的任務,只能選擇第一種方式。
當任務調度結束時(比如,某個任務的觸發器不再產生下次運行的時間),任務就會自動移除
job = scheduler.add_job(myfunc, 'interval', minutes=2) job.remove()
同樣,通過任務的具體ID:
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.remove_job('my_job_id')
暫停和恢復任務
通過任務實例或調度器,就能暫停和恢復任務。如果一個任務被暫停了,那么該任務的下一次運行時間就會被移除。在恢復任務前,運行次數計數也不會被統計。
暫停任務,有以下兩個方法:
apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()
恢復任務,
apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()
獲取任務列表
通過get_jobs()就可以獲得一個可修改的任務列表。get_jobs()第二個參數可以指定任務儲存器名稱,那么就會獲得對應任務儲存器的任務列表。
print_jobs()可以快速打印格式化的任務列表,包含觸發器,下次運行時間等信息。
修改任務 通過apscheduler.job.Job.modify()或modify_job(),你可以修改任務當中除了id的任何屬性。 比如: job.modify(max_instances=6, name='Alternate name')
1
如果想要重新調度任務(就是改變觸發器),你能通過apscheduler.job.Job.reschedule()或reschedule_job()來實現。這些方法會重新創建觸發器,並重新計算下次運行時間。
比如: scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
關閉調度器
關閉方法如下:
scheduler.shutdown()
默認情況下,調度器會先把正在執行的任務處理完,再關閉任務儲存器和執行器。但是,如果你就直接關閉,你可以添加參數:
scheduler.shutdown(wait=False)
上述方法不管有沒有任務在執行,會強制關閉調度器。
暫停、恢復任務進程
調度器可以暫停正在執行的任務:
scheduler.pause()
也可以恢復任務:
scheduler.resume()
同時,也可以在調度器啟動時,默認所有任務設為暫停狀態。
scheduler.start(paused=True)
限制任務執行的實例並行數
默認情況下,在同一時間,一個任務只允許一個執行中的實例在運行。比如說,一個任務是每5秒執行一次,但是這個任務在第一次執行的時候花了6秒,也就是說前一次任務還沒執行完,后一次任務又觸發了,由於默認一次只允許一個實例執行,所以第二次就丟失了。為了杜絕這種情況,可以在添加任務時,設置max_instances參數,為指定任務設置最大實例並行數。
丟失任務的執行與合並
有時,任務會由於一些問題沒有被執行。最常見的情況就是,在數據庫里的任務到了該執行的時間,但調度器被關閉了,那么這個任務就成了“啞彈任務”。錯過執行時間后,調度器才打開了。這時,調度器會檢查每個任務的misfire_grace_time參數int值,即啞彈上限,來確定是否還執行啞彈任務(這個參數可以全局設定的或者是為每個任務單獨設定)。此時,一個啞彈任務,就可能會被連續執行多次。
但這就可能導致一個問題,有些啞彈任務實際上並不需要被執行多次。coalescing合並參數就能把一個多次的啞彈任務揉成一個一次的啞彈任務。也就是說,coalescing為True能把多個排隊執行的同一個啞彈任務,變成一個,而不會觸發啞彈事件。
注!如果是由於線程池/進程池滿了導致的任務延遲,執行器就會跳過執行。要避免這個問題,可以添加進程或線程數來實現或把
misfire_grace_time值調高。
調度器事件
調度器允許添加事件偵聽器。部分事件會有特有的信息,比如當前運行次數等。add_listener(callback,mask)中,第一個參數是回調對象,mask是指定偵聽事件類型,mask參數也可以是邏輯組合。回調對象會有一個參數就是觸發的事件。
具體可以查看文檔中events模塊,里面有關於事件類型以及事件參數的詳細說明。
def my_listener(event): if event.exception: print('The job crashed :(') else: print('The job worked :)') # 當任務執行完或任務出錯時,調用my_listener scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
異常捕獲
通過logging模塊,可以添加apscheduler日志至DEBUG級別,這樣就能捕獲異常信息。
關於logging初始化的方式如下:
import logging logging.basicConfig() logging.getLogger('apscheduler').setLevel(logging.DEBUG)
日志會提供很多調度器的內部運行信息。
————————————————
版權聲明:本文為CSDN博主「abc_soul」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/abc_soul/article/details/88875643