簡介
APScheduler基於Quartz的一個Python定時任務框架,實現了Quartz的所有功能,使用起來十分方便。提供了基於日期、固定時間間隔以及crontab類型的任務,並且可以持久化任務。基於這些功能,我們可以很方便的實現一個python定時任務系統。
github:https://github.com/agronholm/apscheduler
官網文檔:https://apscheduler.readthedocs.io/en/latest/
安裝
1、pip安裝
pip install apscheduler
2、源碼安裝
下載地址:https://pypi.python.org/pypi/APScheduler/
python setup.py install
組成
APScheduler整個系統可以說由這五個概念組成:
- 觸發器(trigger)包含調度邏輯,每一個作業有它自己的觸發器,用於決定接下來哪一個作業會運行。除了他們自己初始配置意外,觸發器完全是無狀態的。
- 作業存儲(job store)存儲被調度的作業,默認的作業存儲是簡單地把作業保存在內存中,其他的作業存儲是將作業保存在數據庫中。一個作業的數據講在保存在持久化作業存儲時被序列化,並在加載時被反序列化。調度器不能分享同一個作業存儲。
- 執行器(executor)處理作業的運行,他們通常通過在作業中提交制定的可調用對象到一個線程或者進城池來進行。當作業完成時,執行器將會通知調度器。
- 調度器(scheduler)是其他的組成部分。你通常在應用只有一個調度器,應用的開發者通常不會直接處理作業存儲、調度器和觸發器,相反,調度器提供了處理這些的合適的接口。配置作業存儲和執行器可以在調度器中完成,例如添加、修改和移除作業。
- 任務(job):描述一個任務本身。
調度器(scheduler)的IO模型
scheduler由於IO模型的不同,可以有多種實現:
- BlockingScheduler:main_loop就在當前進程的主線程內運行,所以調用start函數后會阻塞當前線程。通過一個threading.Event條件變量對象完成scheduler的定時喚醒。
- BackgroundScheduler:和BlockingScheduler基本一樣,除了main_loop放在了單獨線程里,所以調用start后主線程不會阻塞
- AsyncIOScheduler:使用asyncio作為IO模型的scheduler,和AsyncIOExecutor配合使用,用asynio中event_loop的call_later完成定時喚醒
- GeventScheduler:和BlockingScheduler基本一樣,使用gevent作為IO模型,和GeventExecutor配合使用
- QtScheduler:使用QTimer完成定時喚醒
- TornadoScheduler:使用tornado的IO模型,用ioloop.add_timeout完成定時喚醒
- TwistedScheduler:配合TwistedExecutor,用reactor.callLater完成定時喚醒
1、BlockingScheduler簡單應用:
from apscheduler.schedulers.blocking import BlockingScheduler def my_job(): print 'hello world' scheduler = BlockingScheduler() scheduler.add_job(my_job, 'interval', seconds=5) scheduler.start()
上面的例子表示每隔5s執行一次my_job函數,輸出hello world
2、BackgroundScheduler簡單應用
from apscheduler.schedulers.background import BackgroundScheduler def job3(): print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))) print("I'm working job_3") scheduler = BackgroundScheduler() scheduler.add_job(job3, 'interval', seconds=3) #間隔3秒鍾執行一次 scheduler.start()
配置
import time import redis from pytz import utc from pymongo import MongoClient from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.jobstores.redis import RedisJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.events import EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_MISSED client = MongoClient(host='127.0.0.1', port=27017) pool = redis.ConnectionPool(host='127.0.0.1', port=16379) jobstores = { 'mongo': MongoDBJobStore(collection='job', database='test', client=client), 'redis': RedisJobStore(connection_pool=pool), 'default': MemoryJobStore(), 'default_test': MemoryJobStore() } executors = { 'default': ThreadPoolExecutor(200), 'processpool': ProcessPoolExecutor(10) } job_defaults = { 'coalesce': True, 'max_instances': 1, 'misfire_grace_time': 60 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
- coalesce:當由於某種原因導致某個job積攢了好幾次沒有實際運行(比如說系統掛了5分鍾后恢復,有一個任務是每分鍾跑一次的,按道理說這5分鍾內本來是“計划”運行5次的,但實際沒有執行),如果coalesce為True,下次這個job被submit給executor時,只會執行1次,也就是最后這次,如果為False,那么會執行5次(不一定,因為還有其他條件,看后面misfire_grace_time的解釋)
- max_instance: 就是說同一個job同一時間最多有幾個實例再跑,比如一個耗時10分鍾的job,被指定每分鍾運行1次,如果我們max_instance值為5,那么在第6~10分鍾上,新的運行實例不會被執行,因為已經有5個實例在跑了。默認情況下同一個job,只允許一個job實例運行。這在某個job在下次運行時間到達之后仍未執行完畢時,能達到控制的目的。你也可以該變這一行為,在你調用add_job時可以傳遞max_instances=5來運行同時運行同一個job的5個job實例。
- misfire_grace_time:設想和上述coalesce類似的場景,如果一個job本來14:00有一次執行,但是由於某種原因沒有被調度上,現在14:01了,這個14:00的運行實例被提交時,會檢查它預訂運行的時間和當下時間的差值(這里是1分鍾),大於我們設置的30秒限制,那么這個運行實例不會被執行。
- executor的選擇:線程池和進程池。默認default是線程池方式。這個數是執行任務的實際並發數,如果你設置的小了而job添加的比較多,可能出現丟失調度的情況。同時對於python多線程場景,如果是計算密集型任務,實際的並發度達不到配置的數量。所以這個數字要根據具體的要求設置。
作業存儲(job store)backend
jobstore提供給scheduler一個序列化jobs的統一抽象,提供對scheduler中job的增刪改查接口,根據存儲backend的不同,分以下幾種
- MemoryJobStore:沒有序列化,jobs就存在內存里,增刪改查也都是在內存中操作
- SQLAlchemyJobStore:所有sqlalchemy支持的數據庫都可以做為backend,增刪改查操作轉化為對應backend的sql語句
- MongoDBJobStore:用mongodb作backend
- RedisJobStore: 用redis作backend
- ZooKeeperJobStore:用ZooKeeper做backend
觸發器(trigger)控制
1、interval 間隔調度(每隔多久執行)
weeks (int) – number of weeks to wait days (int) – number of days to wait hours (int) – number of hours to wait minutes (int) – number of minutes to wait seconds (int) – number of seconds to wait start_date (datetime|str) – starting point for the interval calculation end_date (datetime|str) – latest possible date/time to trigger on timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
例子:
#表示每隔3天17時19分07秒執行一次任務 sched.add_job(my_job, 'interval', days=03, hours=17, minutes=19, seconds=07) #間隔3秒鍾執行一次 scheduler.add_job(job3, 'interval', seconds=3)
2、date 定時調度(作業只會執行一次)
run_date (datetime|str) – the date/time to run the job at -(任務開始的時間) timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
例子:
#在指定的時間,只執行一次 scheduler.add_job(tick, 'date', run_date='2016-02-14 15:01:05') # The job will be executed on November 6th, 2009 sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text']) # The job will be executed on November 6th, 2009 at 16:30:05 sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
3、cron定時調度(某一定時時刻執行)
(int|str) 表示參數既可以是int類型,也可以是str類型 (datetime | str) 表示參數既可以是datetime類型,也可以是str類型 year (int|str) – 4-digit year -(表示四位數的年份,如2008年) month (int|str) – month (1-12) -(表示取值范圍為1-12月) day (int|str) – day of the (1-31) -(表示取值范圍為1-31日) week (int|str) – ISO week (1-53) -(格里歷2006年12月31日可以寫成2006年-W52-7(擴展形式)或2006W527(緊湊形式)) day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) - (表示一周中的第幾天,既可以用0-6表示也可以用其英語縮寫表示) hour (int|str) – hour (0-23) - (表示取值范圍為0-23時) minute (int|str) – minute (0-59) - (表示取值范圍為0-59分) second (int|str) – second (0-59) - (表示取值范圍為0-59秒) start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) - (表示開始時間) end_date (datetime|str) – latest possible date/time to trigger on (inclusive) - (表示結束時間) timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示時區取值)
參數的取值格式:
例子:
#表示2017年3月22日17時19分07秒執行該程序 sched.add_job(my_job, 'cron', year=2017,month = 03,day = 22,hour = 17,minute = 19,second = 07) #表示任務在6,7,8,11,12月份的第三個星期五的00:00,01:00,02:00,03:00 執行該程序 sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') #表示從星期一到星期五5:30(AM)直到2014-05-30 00:00:00 sched.add_job(my_job(), 'cron', day_of_week='mon-fri', hour=5, minute=30,end_date='2014-05-30') #表示每5秒執行該程序一次,相當於interval 間隔調度中seconds = 5 sched.add_job(my_job, 'cron',second = '*/5')
操作