最近一個程序要用到后台定時任務,看了看python后台任務,一般2個選擇,一個是apscheduler,一個celery。apscheduler比較直觀簡單一點,就選說說這個庫吧。網上一搜索,暈死,好多寫apscheduler的都是超級老的版本,而且博客之間相互亂抄,錯誤一大堆。還是自己讀官方文檔,為大家理一遍吧。
先安裝一下吧,最新版本的apscheduler是3.0.5版
- 安裝
pip install apscheduler
安裝完畢
2. 簡單任務
首先,來個最簡單的例子,看看它的威力。
1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 import datetime 4 5 6 def aps_test(): 7 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), '你好' 8 9 10 scheduler = BlockingScheduler() 11 scheduler.add_job(func=aps_test, trigger='cron', second='*/5') 12 scheduler.start()
看代碼,定義一個函數,然后定義一個scheduler類型,添加一個job,然后執行,就可以了,代碼是不是超級簡單,而且非常清晰。看看結果吧。
5秒整倍數,就執行這個函數,是不是超級超級簡單?對了,apscheduler就是通俗易懂。
再寫一個帶參數的。
1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 import datetime 4 5 6 def aps_test(x): 7 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 8 9 scheduler = BlockingScheduler() 10 scheduler.add_job(func=aps_test, args=('你好',), trigger='cron', second='*/5') 11 scheduler.start()
結果跟上面一樣的。
好了,上面只是給大家看的小例子,我們先從頭到位梳理一遍吧。apscheduler分為4個模塊,分別是Triggers,Jobstores,Executors,Schedulers.從上面的例子我們就可以看出來了,triggers就是觸發器,上面的代碼中,用了cron,其實還有其他觸發器,看看它的源碼解釋。
The ``trigger`` argument can either be: #. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case any extra keyword arguments to this method are passed on to the trigger's constructor #. an instance of a trigger class
看見沒有,源碼中解釋說,有date, interval, cron可供選擇,其實看字面意思也可以知道,date表示具體的一次性任務,interval表示循環任務,cron表示定時任務,好了,分別寫個代碼看看效果最明顯。
1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 import datetime 4 5 6 def aps_test(x): 7 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 8 9 scheduler = BlockingScheduler() 10 scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5') 11 scheduler.add_job(func=aps_test, args=('一次性任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12)) 12 scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3) 13 14 scheduler.start()
看看結果
其實應該不用我解釋代碼,大家也可以看出結果了,非常清晰。除了一次性任務,trigger是不要寫的,直接定義next_run_time就可以了,關於date這部分,官網沒有解釋,但是去看看源碼吧,看這行代碼。
1 def _create_trigger(self, trigger, trigger_args): 2 if isinstance(trigger, BaseTrigger): 3 return trigger 4 elif trigger is None: 5 trigger = 'date' 6 elif not isinstance(trigger, six.string_types): 7 raise TypeError('Expected a trigger instance or string, got %s instead' % trigger.__class__.__name__) 8 9 # Use the scheduler's time zone if nothing else is specified 10 trigger_args.setdefault('timezone', self.timezone) 11 12 # Instantiate the trigger class 13 return self._create_plugin_instance('trigger', trigger, trigger_args)
第4行,如果trigger為None,直接定義trigger為'date'類型。其實弄到這里,大家應該自己拓展一下,如果實現web的異步任務。假設接到一個移動端任務,任務完成后,發送一個推送到移動端,用date類型的trigger完成可以做的很好。
3.日志
好了,scheduler的基本應用,我想大家已經會了,但這僅僅只是開始。如果代碼有意外咋辦?會阻斷整個任務嗎?如果我要計算密集型的任務咋辦?下面有個代碼,我們看看會發生什么情況。
1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 import datetime 4 5 6 def aps_test(x): 7 print 1/0 8 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 9 10 scheduler = BlockingScheduler() 11 scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5') 12 13 scheduler.start()
還是上面代碼,但我們中間故意加了個錯誤,看看會發生什么情況。
說我們沒有log文件,好吧,我們添加一個log文件,看看寫的什么。

1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 import datetime 4 import logging 5 6 logging.basicConfig(level=logging.INFO, 7 format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', 8 datefmt='%Y-%m-%d %H:%M:%S', 9 filename='log1.txt', 10 filemode='a') 11 12 13 def aps_test(x): 14 print 1/0 15 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 16 17 scheduler = BlockingScheduler() 18 scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5') 19 scheduler._logger = logging 20 scheduler.start()
終於可以看到了,這時候才看到錯誤,這個是一定要注意的。
其實,到這里,完全可以執行大多數任務了,但我們為了效率,安全性,再往下面看看,還有什么。
4.刪除任務
假設我們有個奇葩任務,要求執行一定階段任務以后,刪除某一個循環任務,其他任務照常進行。有如下代碼:

1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 import datetime 4 import logging 5 6 logging.basicConfig(level=logging.INFO, 7 format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', 8 datefmt='%Y-%m-%d %H:%M:%S', 9 filename='log1.txt', 10 filemode='a') 11 12 13 def aps_test(x): 14 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 15 16 17 def aps_date(x): 18 scheduler.remove_job('interval_task') 19 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 20 21 22 scheduler = BlockingScheduler() 23 scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5', id='cron_task') 24 scheduler.add_job(func=aps_date, args=('一次性任務,刪除循環任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id='date_task') 25 scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task') 26 scheduler._logger = logging 27 28 scheduler.start()
看看結果,
在運行過程中,成功刪除某一個任務,其實就是為每個任務定義一個id,然后remove_job這個id,是不是超級簡單,直觀?那還有什么呢?
5.停止任務,恢復任務
看看官方文檔,還有pause_job, resume_job,用法跟remove_job一樣,這邊就不詳細介紹了,就寫個代碼。

1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 import datetime 4 import logging 5 6 logging.basicConfig(level=logging.INFO, 7 format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', 8 datefmt='%Y-%m-%d %H:%M:%S', 9 filename='log1.txt', 10 filemode='a') 11 12 13 def aps_test(x): 14 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 15 16 17 def aps_pause(x): 18 scheduler.pause_job('interval_task') 19 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 20 21 22 def aps_resume(x): 23 scheduler.resume_job('interval_task') 24 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 25 26 scheduler = BlockingScheduler() 27 scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5', id='cron_task') 28 scheduler.add_job(func=aps_pause, args=('一次性任務,停止循環任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id='pause_task') 29 scheduler.add_job(func=aps_resume, args=('一次性任務,恢復循環任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=24), id='resume_task') 30 scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task') 31 scheduler._logger = logging 32 33 scheduler.start()
看看結果
是不是很容易?好了,刪除任務,停止任務,恢復任務就介紹到這,下面我們看看監聽任務。
6.意外
任何代碼都可能發生意外,關鍵是,發生意外了,如何第一時間知道,這才是公司最關心的,apscheduler已經為我們想到了這些。
看下面的代碼,

1 # coding:utf-8 2 from apscheduler.schedulers.blocking import BlockingScheduler 3 from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR 4 import datetime 5 import logging 6 7 logging.basicConfig(level=logging.INFO, 8 format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', 9 datefmt='%Y-%m-%d %H:%M:%S', 10 filename='log1.txt', 11 filemode='a') 12 13 14 def aps_test(x): 15 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 16 17 18 def date_test(x): 19 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x 20 print 1/0 21 22 23 def my_listener(event): 24 if event.exception: 25 print '任務出錯了!!!!!!' 26 else: 27 print '任務照常運行...' 28 29 scheduler = BlockingScheduler() 30 scheduler.add_job(func=date_test, args=('一定性任務,會出錯',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task') 31 scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task') 32 scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) 33 scheduler._logger = logging 34 35 scheduler.start()
看看結果
是不是很直觀,在生產環境中,你可以把出錯信息換成發送一封郵件或者發送一個短信,這樣定時任務出錯就可以立馬就知道了。
好了,今天就講到這,以后我們有機會再來拓展這個apscheduler,這個非常強大而且直觀的后台任務庫
異常處理
當job拋出異常時,APScheduler會默默的把他吞掉,不提供任何提示,這不是一種好的實踐,我們必須知曉程序的任何差錯。APScheduler提供注冊listener,可以監聽一些事件,包括:job拋出異常、job沒有來得及執行等。
Constant | Event class | Triggered when... |
---|---|---|
EVENT_SCHEDULER_START | SchedulerEvent | The scheduler is started |
EVENT_SCHEDULER_SHUTDOWN | SchedulerEvent | The scheduler is shut down |
EVENT_JOBSTORE_ADDED | JobStoreEvent | A job store is added to the scheduler |
EVENT_JOBSTORE_REMOVED | JobStoreEvent | A job store is removed from the scheduler |
EVENT_JOBSTORE_JOB_ADDED | JobStoreEvent | A job is added to a job store |
EVENT_JOBSTORE_JOB_REMOVED | JobStoreEvent | A job is removed from a job store |
EVENT_JOB_EXECUTED | JobEvent | A job is executed successfully |
EVENT_JOB_ERROR | JobEvent | A job raised an exception during execution |
EVENT_JOB_MISSED | JobEvent | A job’s execution time is missed |
看下面的例子,監聽異常和miss事件,這里用logging模塊打印日志,logger.exception()可以打印出異常堆棧信息。
- def err_listener(ev):
- err_logger = logging.getLogger('schedErrJob')
- if ev.exception:
- err_logger.exception('%s error.', str(ev.job))
- else:
- err_logger.info('%s miss', str(ev.job))
- schedudler.add_listener(err_listener, apscheduler.events.EVENT_JOB_ERROR | apscheduler.events.EVENT_JOB_MISSED)
事件的屬性包括:
|
最后,需要注意一點當job不以daemon模式運行時,並且APScheduler也不是daemon的,那么在關閉腳本時,Ctrl + C是不奏效的,必須kill才可以。可以通過命令實現關閉腳本:
- ps axu | grep {腳本名} | grep -v grep | awk '{print $2;}' | xargs kill