前言
APScheduler是python下的任務調度框架,全程為Advanced Python Scheduler,是一款輕量級的Python任務調度框架。它允許你像Linux下的Crontab那樣安排定期執行的任務,並且支持Python函數或任意可調用的對象。
安裝
(ENV1) [eason@localhost]$ pip install apscheduler
Collecting apscheduler
Downloading APScheduler-3.3.0-py2.py3-none-any.whl (57kB) 100% |████████████████████████████████| 61kB 81kB/s Collecting pytz (from apscheduler) Downloading pytz-2016.10-py2.py3-none-any.whl (483kB) 100% |████████████████████████████████| 491kB 52kB/s Collecting funcsigs; python_version == "2.7" (from apscheduler) Downloading funcsigs-1.0.2-py2.py3-none-any.whl Requirement already satisfied: six>=1.4.0 in /home/eason/ENV1/lib/python2.7/site-packages (from apscheduler) Collecting tzlocal>=1.2 (from apscheduler) Downloading tzlocal-1.3.tar.gz Requirement already satisfied: setuptools>=0.7 in /home/eason/ENV1/lib/python2.7/site-packages (from apscheduler) Collecting futures; python_version == "2.7" (from apscheduler) Downloading futures-3.0.5-py2-none-any.whl Building wheels for collected packages: tzlocal Running setup.py bdist_wheel for tzlocal ... done Stored in directory: /home/eason/.cache/pip/wheels/80/19/a8/635ad9f4ad8a63b49d073c55cbca31fb5898ce2560ed145a69 Successfully built tzlocal Installing collected packages: pytz, funcsigs, tzlocal, futures, apscheduler Successfully installed apscheduler-3.3.0 funcsigs-1.0.2 futures-3.0.5 pytz-2016.10 tzlocal-1.3 (ENV1) [eason@localhost]$
基本概念
APScheduler 有四種組件:
- triggers
- job stores
- executors
- schedulers
triggers(觸發器)中包含調度邏輯,每個作業都由自己的觸發器來決定下次運行時間。除了他們自己初始配置意外,觸發器完全是無狀態的。
job stores(作業存儲器)存儲被調度的作業,默認的作業存儲器只是簡單地把作業保存在內存中,其他的作業存儲器則是將作業保存在數據庫中。當作業被保存到一個持久化的作業存儲器中的時候,該作業的數據會被序列化,並在加載時被反序列化。作業存儲器不能共享調度器。
executors(執行器)處理作業的運行,他們通常通過在作業中提交指定的可調用對象到一個線程或者進城池來進行。當作業完成時,執行器將會通知調度器。
schedulers(調度器)配置作業存儲器和執行器可以在調度器中完成,例如添加、修改和移除作業。根據不同的應用場景可以選用不同的調度器,可選的有BlockingScheduler,BackgroundScheduler,AsyncIOScheduler,GeventScheduler,TornadoScheduler,TwistedScheduler,QtScheduler 7種。
選擇合適的調度器
- BlockingScheduler : 當調度器是你應用中唯一要運行的東西時
- BackgroundScheduler : 當你沒有運行任何其他框架並希望調度器在你應用的后台執行時使用。
- AsyncIOScheduler : 當你的程序使用了asyncio(一個異步框架)的時候使用。
- GeventScheduler : 當你的程序使用了gevent(高性能的Python並發框架)的時候使用。
- TornadoScheduler : 當你的程序基於Tornado(一個web框架)的時候使用。
- TwistedScheduler : 當你的程序使用了Twisted(一個異步框架)的時候使用
- QtScheduler : 如果你的應用是一個Qt應用的時候可以使用。
選擇合適的作業存儲器
如果你的應用在每次啟動的時候都會重新創建作業,那么使用默認的作業存儲器(MemoryJobStore)即可,但是如果你需要在調度器重啟或者應用程序奔潰的情況下任然保留作業,你應該根據你的應用環境來選擇具體的作業存儲器。例如:使用Mongo或者SQLAlchemy JobStore (用於支持大多數RDBMS)
關於執行器
對執行器的選擇取決於你使用上面哪些框架,大多數情況下,使用默認的ThreadPoolExecutor已經能夠滿足需求。如果你的應用涉及到CPU密集型操作,你可以考慮使用ProcessPoolExecutor來使用更多的CPU核心。你也可以同時使用兩者,將ProcessPoolExecutor作為第二執行器。
關於觸發器
當你調度作業的時候,你需要為這個作業選擇一個觸發器,用來描述這個作業何時被觸發,APScheduler有三種內置的觸發器類型:
- date 一次性指定日期
- interval 在某個時間范圍內間隔多長時間執行一次
- cron 和Linux crontab格式兼容,最為強大
date 最基本的一種調度,作業只會執行一次。它的參數如下:
- run_date (datetime|str) – 作業的運行日期或時間
- timezone (datetime.tzinfo|str) – 指定時區
舉個栗子:
# 2016-12-12運行一次job_function sched.add_job(job_function, 'date', run_date=date(2016, 12, 12), args=['text']) # 2016-12-12 12:00:00運行一次job_function sched.add_job(job_function, 'date', run_date=datetime(2016, 12, 12, 12, 0, 0), args=['text'])
interval 間隔調度,參數如下:
- weeks (int) – 間隔幾周
- days (int) – 間隔幾天
- hours (int) – 間隔幾小時
- minutes (int) – 間隔幾分鍾
- seconds (int) – 間隔多少秒
- start_date (datetime|str) – 開始日期
- end_date (datetime|str) – 結束日期
- timezone (datetime.tzinfo|str) – 時區
舉個栗子:
# 每兩個小時調一下job_function sched.add_job(job_function, 'interval', hours=2)
cron參數如下:
- year (int|str) – 年,4位數字
- month (int|str) – 月 (范圍1-12)
- day (int|str) – 日 (范圍1-31)
- week (int|str) – 周 (范圍1-53)
- day_of_week (int|str) – 周內第幾天或者星期幾 (范圍0-6 或者 mon,tue,wed,thu,fri,sat,sun)
- hour (int|str) – 時 (范圍0-23)
- minute (int|str) – 分 (范圍0-59)
- second (int|str) – 秒 (范圍0-59)
- start_date (datetime|str) – 最早開始日期(包含)
- end_date (datetime|str) – 最晚結束時間(包含)
- timezone (datetime.tzinfo|str) – 指定時區
取值格式:
表達式 | 參數 | 描述 |
---|---|---|
* | any | Fire on every value |
*/a | any | Fire every a values, starting from the minimum |
a-b | any | Fire on any value within the a-b range (a must be smaller than b) |
a-b/c | any | Fire every c values within the a-b range |
xth y | day | Fire on the x -th occurrence of weekday y within the month |
last x | day | Fire on the last occurrence of weekday x within the month |
last | day | Fire on the last day within the month |
x,y,z | any | Fire on any matching expression; can combine any number of any of the above expressions |
舉個栗子:
# job_function將會在6,7,8,11,12月的第3個周五的1,2,3點運行 sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # 截止到2016-12-30 00:00:00,每周一到周五早上五點半運行job_function sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2016-12-31')
實踐
添加作業有兩種方法,一種是使用add_job()函數,還有一種方式是通過scheduled_job()裝飾器。
add_job()函數方式
from apscheduler.schedulers.blocking import BlockingScheduler import datetime def my_job1(): print 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") def my_job2(): print 'my_job2 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") sched = BlockingScheduler() # 每隔5秒運行一次my_job1 sched.add_job(my_job1, 'interval', seconds=5,id='my_job1') # 每隔5秒運行一次my_job2 sched.add_job(my_job2,'cron',second='*/5',id='my_job2') sched.start()
執行結果:
$my_job2 is running, Now is 2016-12-13 14:41:10 $my_job1 is running, Now is 2016-12-13 14:41:12 $my_job2 is running, Now is 2016-12-13 14:41:15 $my_job1 is running, Now is 2016-12-13 14:41:17 $my_job2 is running, Now is 2016-12-13 14:41:20 $my_job1 is running, Now is 2016-12-13 14:41:22 $my_job2 is running, Now is 2016-12-13 14:41:25 $my_job1 is running, Now is 2016-12-13 14:41:27
scheduled_job()裝飾器方式
from apscheduler.schedulers.blocking import BlockingScheduler import datetime sched = BlockingScheduler() #每隔5秒運行一次my_job1 @sched.scheduled_job('interval',seconds=5,id='my_job1') def my_job1(): print 'my_job1 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") #每隔5秒運行一次my_job2 @sched.scheduled_job('cron',second='*/5',id='my_job2') def my_job2(): print 'my_job2 is running, Now is %s' % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") sched.start()
執行結果:
$my_job2 is running, Now is 2016-12-13 15:09:00 $my_job1 is running, Now is 2016-12-13 15:09:03 $my_job2 is running, Now is 2016-12-13 15:09:05 $my_job1 is running, Now is 2016-12-13 15:09:08 $my_job2 is running, Now is 2016-12-13 15:09:10 $my_job1 is running, Now is 2016-12-13 15:09:13 $my_job2 is running, Now is 2016-12-13 15:09:15 $my_job1 is running, Now is 2016-12-13 15:09:18
使用SQLAlchemy作業存儲器存放作業
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime,timedelta import logging sched = BlockingScheduler() def my_job(): print 'my_job is running, Now is %s' % datetime.now().strftime("%Y-%m-%d %H:%M:%S") #使用sqlalchemy作業存儲器 url='mysql+mysqldb://root:123456@localhost:3306/scrapy?charset=utf8' sched.add_jobstore('sqlalchemy',url=url) #添加作業 sched.add_job(my_job,'interval',id='myjob',seconds=5) log = logging.getLogger('apscheduler.executors.default') log.setLevel(logging.INFO) # DEBUG #設定日志格式 fmt = logging.Formatter('%(levelname)s:%(name)s:%(message)s') h = logging.StreamHandler() h.setFormatter(fmt) log.addHandler(h) sched.start()
執行結果:
$ python scheduler.py
INFO:apscheduler.executors.default:Running job "my_job (trigger: interval[0:00:05], next run at: 2016-12-13 21:26:45 CST)" (scheduled at 2016-12-13 21:26:45.067157+08:00) my_job is running, Now is 2016-12-13 21:26:45 INFO:apscheduler.executors.default:Job "my_job (trigger: interval[0:00:05], next run at: 2016-12-13 21:26:50 CST)" executed successfully INFO:apscheduler.executors.default:Running job "my_job (trigger: interval[0:00:05], next run at: 2016-12-13 21:26:50 CST)" (scheduled at 2016-12-13 21:26:50.067157+08:00) my_job is running, Now is 2016-12-13 21:26:50 INFO:apscheduler.executors.default:Job "my_job (trigger: interval[0:00:05], next run at: 2016-12-13 21:26:50 CST)" executed successfully INFO:apscheduler.executors.default:Running job "my_job (trigger: interval[0:00:05], next run at: 2016-12-13 21:26:55 CST)" (scheduled at 2016-12-13 21:26:55.067157+08:00) my_job is running, Now is 2016-12-13 21:26:55 INFO:apscheduler.executors.default:Job "my_job (trigger: interval[0:00:05], next run at: 2016-12-13 21:26:55 CST)" executed successfully INFO:apscheduler.executors.default:Running job "my_job (trigger: interval[0:00:05], next run at: 2016-12-13 21:27:00 CST)" (scheduled at 2016-12-13 21:27:00.067157+08:00) my_job is running, Now is 2016-12-13 21:27:00 INFO:apscheduler.executors.default:Job "my_job (trigger: interval[0:00:05], next run at: 2016-12-13 21:27:05 CST)" executed successfully INFO:apscheduler.executors.default:Running job "my_job (trigger: interval[0:00:05], next run at: 2016-12-13 21:27:05 CST)" (scheduled at 2016-12-13 21:27:05.067157+08:00) my_job is running, Now is 2016-12-13 21:27:05 INFO:apscheduler.executors.default:Job "my_job (trigger: interval[0:00:05], next run at: 2016-12-13 21:27:05 CST)" executed successfully