一.安裝與簡介
1.安裝
pip install apscheduler
官方文檔:https://apscheduler.readthedocs.io/en/latest/#
2.簡介
APScheduler基於Quartz的一個Python定時任務框架,實現了Quartz的所有功能,使用起來十分方便。提供了基於日期、固定時間間隔以及crontab類型的任務,
並且可以持久化任務。基於這些功能,我們可以很方便的實現一個python定時任務系統。
觸發器(trigger)包含調度邏輯,每一個作業有它自己的觸發器,用於決定接下來哪一個作業會運行。除了他們自己初始配置意外,觸發器完全是無狀態的。
作業存儲(job store)存儲被調度的作業,默認的作業存儲是簡單地把作業保存在內存中,其他的作業存儲是將作業保存在數據庫中。一個作業的數據講在保存在持久化作業存儲時被序列化,
並在加載時被反序列化。調度器不能分享同一個作業存儲。
執行器(executor)處理作業的運行,他們通常通過在作業中提交制定的可調用對象到一個線程或者進城池來進行。當作業完成時,執行器將會通知調度器。
調度器(scheduler)是其他的組成部分。你通常在應用只有一個調度器,應用的開發者通常不會直接處理作業存儲、調度器和觸發器,相反,調度器提供了處理這些的合適的接口。
配置作業存儲和執行器可以在調度器中完成,例如添加、修改和移除作業。
調度器工作流程:
二.案例
1.hello world
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime sched = BlockingScheduler() #構造定時器對象 def my_job(): print(f'{datetime.now():%H:%M:%S} Hello World ') sched.add_job(my_job, 'interval', seconds=5) #給定時器添加任務,觸發條件,以及間隔時間 sched.start() #開啟定時器
解釋:
導入調度器模塊 BlockingScheduler,這是最簡單的調度器,調用 start 方阻塞當前進程,如果你的程序只用於調度,
除了調度進程外沒有其他后台進程,那么請用 BlockingScheduler 非常有用,此時調度進程相當於守護進程。 定義一個函數my_job代表我們要調度的作業程序。
實例化一個 BlockingScheduler 類,不帶參數表明使用默認的作業存儲器-內存,默認的執行器是線程池執行器,最大並發線程數默認為 10 個(另一個是進程池執行器)。
結果:
除了上述添加作業的方法,還可以使用裝飾器
@sched.scheduled_job('interval', seconds=5) def my_job(): print(f'{datetime.now():%H:%M:%S} Hello World ')
如果同一個方法被添加到多個任務重,則需要指定任務 id
@sched.scheduled_job('interval', id='my_job', seconds=5) @sched.scheduled_job('interval', id='my_job1', seconds=3) def my_job(): print(f'{datetime.now():%H:%M:%S} Hello World ')
2. 帶參數的
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('你好',), trigger='cron', second='*/5') scheduler.start()
結果:
三.解析
BlockingScheduler
調度器中的一種,該種表示在進程中只運行調度程序時使用。sched.add_job()
添加作業,並指定調度方式為interval
,時間間隔為 5 秒sched.start()
開始任務
apscheduler分為4個模塊,分別是Triggers,Job stores,Executors,Schedulers.從上面的例子我們就可以看出來了,triggers就是觸發器,上面的代碼中,用了cron,其實還有其他觸發器,看看它的源碼解釋。
The ``trigger`` argument can either be: #. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case any extra keyword arguments to this method are passed on to the trigger's constructor #. an instance of a trigger class
源碼中解釋說,有date, interval, cron可供選擇,date表示具體的一次性任務,interval表示循環任務,cron表示定時任務。
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5') scheduler.add_job(func=aps_test, args=('一次性任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12)) scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3) scheduler.start()
結果:
結果非常清晰。除了一次性任務,trigger是不要寫的,直接定義next_run_time就可以了,關於date這部分,官網沒有解釋,但是去看看源碼吧,看這行代碼
def _create_trigger(self, trigger, trigger_args): if isinstance(trigger, BaseTrigger): return trigger elif trigger is None: trigger = 'date' elif not isinstance(trigger, six.string_types): raise TypeError('Expected a trigger instance or string, got %s instead' % trigger.__class__.__name__) # Use the scheduler's time zone if nothing else is specified trigger_args.setdefault('timezone', self.timezone) # Instantiate the trigger class return self._create_plugin_instance('trigger', trigger, trigger_args)
第4行,如果trigger為None,直接定義trigger為'date'類型。其實弄到這里,大家應該自己拓展一下,如果實現web的異步任務。假設接到一個移動端任務,任務完成后,發送一個推送到移動端,用date類型的trigger完成可以做的很好。
四.日志
如果代碼有意外咋辦?會阻斷整個任務嗎?如果我要計算密集型的任務咋辦?下面有個代碼,我們看看會發生什么情況。
那就這樣一直循環報錯????
加上日志我們在看看:
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='log1.txt', filemode='a') def aps_test(x): print 1/0 print datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5') scheduler._logger = logging scheduler.start()
一樣的結果,只不過記錄到日志了:
四.刪除任務
假設我們有個奇葩任務,要求執行一定階段任務以后,刪除某一個循環任務,其他任務照常進行。有如下代碼:
#!/usr/bin/env python # -*- coding: utf-8 -*- #author tom # coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='log1.txt', filemode='a') def aps_test(x): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) def aps_date(x): scheduler.remove_job('interval_task') print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5', id='cron_task') scheduler.add_job(func=aps_date, args=('一次性任務,刪除循環任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id='date_task') scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task') scheduler._logger = logging scheduler.start()
在運行過程中,成功刪除某一個任務,其實就是為每個任務定義一個id,然后remove_job這個id,是不是超級簡單,直觀?那還有什么呢?
五.停止任務,恢復任務
看看官方文檔,還有pause_job, resume_job,用法跟remove_job一樣,這邊就不詳細介紹了,就寫個代碼。
#!/usr/bin/env python # -*- coding: utf-8 -*- #author tom # coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='log1.txt', filemode='a') def aps_test(x): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) def aps_pause(x): scheduler.pause_job('interval_task') print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) def aps_resume(x): scheduler.resume_job('interval_task') print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5', id='cron_task') scheduler.add_job(func=aps_pause, args=('一次性任務,停止循環任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12), id='pause_task') scheduler.add_job(func=aps_resume, args=('一次性任務,恢復循環任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=24), id='resume_task') scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task') scheduler._logger = logging scheduler.start()
六.意外
任何代碼都可能發生意外,關鍵是,發生意外了,如何第一時間知道,這才是公司最關心的,apscheduler已經為我們想到了這些。
#!/usr/bin/env python # -*- coding: utf-8 -*- #author tom # coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR import datetime import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='log1.txt', filemode='a') def aps_test(x): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) def date_test(x): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) print (1/0) def my_listener(event): if event.exception: print ('任務出錯了!!!!!!') else: print ('任務照常運行...') scheduler = BlockingScheduler() scheduler.add_job(func=date_test, args=('一定性任務,會出錯',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task') scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) scheduler._logger = logging scheduler.start()
是不是很直觀,在生產環境中,你可以把出錯信息換成發送一封郵件或者發送一個短信,這樣定時任務出錯就可以立馬就知道了。
七.關於調度器的配置(進階)
關於調度器的知識:
調度器的主循環其實就是反復檢查是不是有到時需要執行的任務,分以下幾步進行: 詢問自己的每一個作業存儲器,有沒有到期需要執行的任務,如果有,計算這些作業中每個作業需要運行的時間點,如果時間點有多個,做 coalesce 檢查。 提交給執行器按時間點運行。 在配置調度器前,我們首先要選取適合我們應用環境場景的調度器,存儲器和執行器。下面是各調度器的適用場景: BlockingScheduler:適用於調度程序是進程中唯一運行的進程,調用start函數會阻塞當前線程,不能立即返回。
BackgroundScheduler:適用於調度程序在應用程序的后台運行,調用start后主線程不會阻塞。
AsyncIOScheduler:適用於使用了asyncio模塊的應用程序。
GeventScheduler:適用於使用gevent模塊的應用程序。
TwistedScheduler:適用於構建Twisted的應用程序。
QtScheduler:適用於構建Qt的應用程序。 上述調度器可以滿足我們絕大多數的應用環境,本文以兩種調度器為例說明如何進行調度器配置。
關於作業存儲器的知識:
作業存儲器的選擇有兩種:
一是內存,也是默認的配置;
二是數據庫。具體選哪一種看我們的應用程序在崩潰時是否重啟整個應用程序,如果重啟整個應用程序,那么作業會被重新添加到調度器中,此時簡單的選取內存作為作業存儲器即簡單又高效。
但是,當調度器重啟或應用程序崩潰時您需要您的作業從中斷時恢復正常運行,那么通常我們選擇將作業存儲在數據庫中,使用哪種數據庫通常取決於為在您的編程環境中使用了什么數據庫。
我們可以自由選擇,PostgreSQL 是推薦的選擇,因為它具有強大的數據完整性保護。
關於執行器的知識:
同樣的,執行器的選擇也取決於應用場景。通常默認的 ThreadPoolExecutor 已經足夠好。如果作業負載涉及CPU 密集型操作,
那么應該考慮使用 ProcessPoolExecutor,甚至可以同時使用這兩種執行器,將ProcessPoolExecutor 行器添加為二級執行器。
apscheduler 提供了許多不同的方法來配置調度器。可以使用字典,也可以使用關鍵字參數傳遞。首先實例化調度程序,添加作業,然后配置調度器,獲得最大的靈活性。
如果調度程序在應用程序的后台運行,選擇 BackgroundScheduler,並使用默認的 jobstore 和默認的executor,則以下配置即可:
from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler()
假如我們想配置更多信息:設置兩個執行器、兩個作業存儲器、調整新作業的默認值,並設置不同的時區。下述三個方法是完全等同的。
配置需求:
配置名為“mongo”的MongoDBJobStore作業存儲器 配置名為“default”的SQLAlchemyJobStore(使用SQLite) 配置名為“default”的ThreadPoolExecutor,最大線程數為20 配置名為“processpool”的ProcessPoolExecutor,最大進程數為5 UTC作為調度器的時區 coalesce默認情況下關閉 作業的默認最大運行實例限制為3 misfire_grace_time 配置超時,如果設置為 10 秒,則表示誤差在 10 秒內仍允許運行,否則會報 job was missed by ..。
調度器的配置
方法一:
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExec utor jobstores = { 'mongo': MongoDBJobStore(), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, 'max_instances': 3, 'misfire_grace_time':10 #10秒的任務超時容錯 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
方法二:
from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler({ 'apscheduler.jobstores.mongo': { 'type': 'mongodb' }, 'apscheduler.jobstores.default': { 'type': 'sqlalchemy', 'url': 'sqlite:///jobs.sqlite' }, 'apscheduler.executors.default': { 'class': 'apscheduler.executors.pool:ThreadPoolExecutor', 'max_workers': '20' }, 'apscheduler.executors.processpool': { 'type': 'processpool', 'max_workers': '5' }, 'apscheduler.job_defaults.coalesce': 'false', 'apscheduler.job_defaults.max_instances': '3', 'apscheduler.timezone': 'UTC', })
方法三:
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor jobstores = { 'mongo': { 'type': 'mongodb' }, 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': { 'type': 'threadpool', 'max_workers': 20 }, 'processpool': ProcessPoolExecutor(max_workers=5) } job_defaults = { 'coalesce': False, 'max_instances': 3, 'misfire_grace_time': 10 # 10秒的任務超時容錯 } scheduler = BackgroundScheduler() # .. do something else here, maybe add jobs etc.
以上涵蓋了大多數情況的調度器配置,在實際運行時可以試試不同的配置會有怎樣不同的效果。
啟動調度器
啟動調度器前需要先添加作業,有兩種方法向調度器添加作業:
一是通過接口add_job(),
二是通過使用函數裝飾器,其中 add_job() 返回一個apscheduler.job.Job類的實例,
用於后續修改或刪除作業。
我們可以隨時在調度器上調度作業。如果在添加作業時,調度器還沒有啟動,那么任務將不會運行,並且第一次運行時間在調度器啟動時計算。
注意:如果使用的是序列化作業的執行器或作業存儲器,那么要求被調用的作業(函數)必須是全局可訪問的,被調用的作業的參數是可序列化的,
作業存儲器中,只有 MemoryJobStore 不會序列化作業。執行器中,只有ProcessPoolExecutor 將序列化作業。
啟用調度器只需要調用調度器的 start() 方法,下面分別使用不同的作業存儲器來舉例說明:
方法一:使用默認的作業存儲器:
#!/usr/bin/env python # -*- coding: utf-8 -*- #author tom from apscheduler.schedulers.blocking import BlockingScheduler import datetime from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor def my_job(id='my_job'): print(id, '-->', datetime.datetime.now()) jobstores = { 'default': MemoryJobStore() } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(10) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) scheduler.add_job(my_job, args=['job_interval', ], id='job_interval', trigger='interval', seconds=5, replace_existing=True) scheduler.add_job(my_job, args=['job_cron', ], id='job_cron', trigger='cron', month='4-8,11-12', hour='7-23', second='*/10', \ end_date = '2019-11-30') scheduler.add_job(my_job, args=['job_once_now', ], id='job_once_now') scheduler.add_job(my_job, args=['job_date_once', ], id='job_date_once', trigger='date', run_date='2019-08-13 23:28:05') try: scheduler.start() except SystemExit: print('exit') exit()
運行結果:
job_once_now --> 2019-08-13 23:26:24.935198 job_interval --> 2019-08-13 23:26:29.928484 job_cron --> 2019-08-13 23:26:30.000488 job_interval --> 2019-08-13 23:26:34.928770 job_interval --> 2019-08-13 23:26:39.935056 job_cron --> 2019-08-13 23:26:40.012061 job_interval --> 2019-08-13 23:26:44.941342 job_interval --> 2019-08-13 23:26:49.936628 job_cron --> 2019-08-13 23:26:50.009632 job_interval --> 2019-08-13 23:26:54.940914
上述代碼使用內存作為作業存儲器,操作比較簡單,重啟程序相當於第一次運行。
方法二:使用數據庫作為存儲器:
from apscheduler.schedulers.blocking import BlockingScheduler import datetime from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore def my_job(id='my_job'): print(id, '-->', datetime.datetime.now()) jobstores = { 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(10) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) scheduler.add_job(my_job, args=['job_interval', ], id='job_interval', trigger='interval', seconds=5, replace_existing=True) scheduler.add_job(my_job, args=['job_cron', ], id='job_cron', trigger='cron', month='4-8,11-12', hour='7-23', second='*/10', \ end_date = '2019-11-30') scheduler.add_job(my_job, args=['job_once_now', ], id='job_once_now') scheduler.add_job(my_job, args=['job_date_once', ], id='job_date_once', trigger='date', run_date='2019-08-13 23:30:05') try: scheduler.start() except SystemExit: print('exit') exit()
說明,紅色代碼修改為數據庫作為作業存儲器
運行結果如下:
Run time of job "my_job (trigger: date[2019-08-13 23:35:05 CST], next run at: 2019-08-13 23:35:05 CST)" was missed by 0:06:38.154717 job_once_now --> 2019-08-13 23:41:43.172718 job_interval --> 2019-08-13 23:41:48.067998 job_cron --> 2019-08-13 23:41:50.028110 job_interval --> 2019-08-13 23:41:53.105287 job_interval --> 2019-08-13 23:41:58.046569 job_cron --> 2019-08-13 23:42:00.031683 job_interval --> 2019-08-13 23:42:03.102858 job_interval --> 2019-08-13 23:42:08.055142 job_cron --> 2019-08-13 23:42:10.046255 job_interval --> 2019-08-13 23:42:13.102430 job_interval --> 2019-08-13 23:42:18.071715 job_cron --> 2019-08-13 23:42:20.084830 job_interval --> 2019-08-13 23:42:23.108003
提示:我們有作業本應在2019-08-13 23:30:05運行的作業沒有運行,因為現在的時間為2019-08-13 23:41,錯過了 23:30:05 的時間。
如果將上術代碼第 21-25 行注釋掉,重新運行本程序,則四種類型的作業仍會運行,結果如下:
運行結果:
Run time of job "my_job (trigger: cron[month='4-8,11-12', hour='7-23', second='*/10'], next run at: 2019-08-13 23:45:50 CST)" was missed by 0:00:18.783910
Run time of job "my_job (trigger: cron[month='4-8,11-12', hour='7-23', second='*/10'], next run at: 2019-08-13 23:46:10 CST)" was missed by 0:00:08.784911
Run time of job "my_job (trigger: interval[0:00:05], next run at: 2019-08-13 23:45:53 CST)" was missed by 0:00:15.774201
Run time of job "my_job (trigger: interval[0:00:05], next run at: 2019-08-13 23:45:53 CST)" was missed by 0:00:10.774201
Run time of job "my_job (trigger: interval[0:00:05], next run at: 2019-08-13 23:45:53 CST)" was missed by 0:00:05.774201
job_interval --> 2019-08-13 23:46:08.798911
job_cron --> 2019-08-13 23:46:10.056983
job_interval --> 2019-08-13 23:46:13.187162
job_interval --> 2019-08-13 23:46:18.102443
job_cron --> 2019-08-13 23:46:20.067556
job_interval --> 2019-08-13 23:46:23.097729
job_interval --> 2019-08-13 23:46:28.094015
job_cron --> 2019-08-13 23:46:30.068128
job_interval --> 2019-08-13 23:46:33.101301
作業仍會運行,說明作業被添加到數據庫中,程序中斷后重新運行時會自動從數據庫讀取作業信息,而不需要重新再添加到調度器中,如果不注釋 紅色添加作業的代碼,則作業會重新添加到數據庫中,這樣就有了兩個同樣的作業,避免出現這種情況可以在 add_job 的參數中增加 replace_existing=True,如
scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval',seconds=3,replace_existing=True)
如果我們想運行錯過運行的作業,使用 misfire_grace_time,如
scheduler.add_job(my_job,args = ['job_cron',] ,id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11',second='*/15',coalesce=True,\
misfire_grace_time=30,replace_existing=True,end_date='2018-05-30')
說明:misfire_grace_time,假如一個作業本來 08:00 有一次執行,但是由於某種原因沒有被調度上,現在 08:01 了,這個 08:00 的運行實例被提交時,會檢查它預訂運行的時間和當下時間的差值(這里是1分鍾),大於我們設置的 30 秒限制,那么這個運行實例不會被執行。最常見的情形是 scheduler 被 shutdown 后重啟,某個任務會積攢了好幾次沒執行如 5 次,下次這個作業被提交給執行器時,執行 5 次。設置 coalesce=True 后,只會執行一次。
其他操作如下:
scheduler.remove_job(job_id,jobstore=None)#刪除作業 scheduler.remove_all_jobs(jobstore=None)#刪除所有作業 scheduler.pause_job(job_id,jobstore=None)#暫停作業 scheduler.resume_job(job_id,jobstore=None)#恢復作業 scheduler.modify_job(job_id, jobstore=None, **changes)#修改單個作業屬性信息 scheduler.reschedule_job(job_id, jobstore=None, trigger=None,**trigger_args)#修改單個作業的觸發器並更新下次運行時間 scheduler.print_jobs(jobstore=None, out=sys.stdout)#輸出作業信息
調度器事件監聽:
scheduler 的基本應用,在前面已經介紹過了,但仔細思考一下:如果程序有異常拋出會影響整個調度任務嗎?請看下面的代碼,運行一下看看會發生什么情況:
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print (1/0) print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5') scheduler.start()
運行結果如下:
Job "aps_test (trigger: cron[second='*/5'], next run at: 2019-08-13 23:55:45 CST)" raised an exception Traceback (most recent call last): File "C:\Users\Administrator\Envs\mmachine_learn\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job retval = job.func(*job.args, **job.kwargs) File "F:/machine learnning/machine_learning/myCeleryProj/apschedule_test.py", line 84, in aps_test print (1/0) ZeroDivisionError: division by zero Job "aps_test (trigger: cron[second='*/5'], next run at: 2019-08-13 23:55:50 CST)" raised an exception Traceback (most recent call last):
可能看出每 5 秒拋出一次報錯信息。任何代碼都可能拋出異常,關鍵是,發生導常事件,如何第一時間知道,這才是我們最關心的,apscheduler 已經為我們想到了這些,提供了事件監聽來解決這一問題。
將上述代碼稍做調整,加入日志記錄和事件監聽,如下所示。
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR import datetime import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename='log1.txt', filemode='a') def aps_test(x): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) def date_test(x): print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) print (1/0) def my_listener(event): if event.exception: print ('任務出錯了!!!!!!') else: print ('任務照常運行...') scheduler = BlockingScheduler() scheduler.add_job(func=date_test, args=('一定性任務,會出錯',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task') scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3, id='interval_task') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) scheduler._logger = logging scheduler.start()