python定時任務最強框架APScheduler詳細教程
APScheduler定時任務
上次測試女神聽了我的建議,已經做好了要給項目添加定時任務的決定了。但是之前提供的四種方式中,她不知道具體選擇哪一個。為了和女神更近一步,我把我入行近10年收藏的干貨免費拿出來分享給女神,希望女神凌晨2點再找我的時候,不再是因為要給他調程序了。
Python中定時任務的解決方案,總體來說有四種,分別是: crontab、 scheduler、 Celery、 APScheduler,其中 crontab不適合多台服務器的配置、 scheduler太過於簡單、 Celery依賴的軟件比較多,比較耗資源。最好的解決方案就是 APScheduler。
APScheduler使用起來十分方便。提供了基於日期、固定時間間隔以及 crontab類型的任務。還可以在程序運行過程中動態的新增任務和刪除任務。在任務運行過程中,還可以把任務存儲起來,下次啟動運行依然保留之前的狀態。另外最重要的一個特點是,因為他是基於 Python語言的庫,所以是可以跨平台的,一段代碼,處處運行!
在這里我來給大家詳細介紹一下具體的用法。
一、安裝:
安裝非常簡單,通過 pip install apscheduler即可。
二、基本使用:
先來看一段代碼,然后再來給大家詳細講解其中的細節:
- from apscheduler . schedulers . blocking import BlockingScheduler
- from datetime import datetime
- def my_clock :
- print ( "Hello! The time is:%s" % datetime . now )
- if __name__ == '__main__' :
- scheduler = BlockingScheduler
- scheduler . add_job ( my_clock , "interval" , seconds = 3 )
- scheduler . start
其中 BlockingScheduler是阻塞性的調度器,是最基本的調度器,下面調用 start方法就會阻塞當前進程,所以如果你的程序除了調度進程沒有其他后台進程,那么是可以是否的,否則這個調度器會阻塞你程序的正常執行。
接下來就是定義一個 my_clock函數,這個函數就是需要定時調度的任務代碼。
然后就是實例化一個 BlockingScheduler對象,並把 my_clock添加到任務調度中。然后看 interval參數,這里用的是間隔的方式來調度,調度頻率是 seconds=3,也就是每3秒執行一次。
執行結果如下:

可以看到每隔3秒鍾的時間會執行一次。說明定時任務已經成功執行了!
在了解了 APScheduler的基本使用后,再來對 APScheduler的四個基本對象做個了解,這樣才能從全局掌握 APScheduler。
三、四個基本對象: 1. 觸發器(triggers):
觸發器就是根據你指定的觸發方式,比如是按照時間間隔,還是按照 crontab觸發,觸發條件是什么等。每個任務都有自己的觸發器。
2. 任務存儲器(job stores):
任務存儲器是可以存儲任務的地方,默認情況下任務保存在內存,也可將任務保存在各種數據庫中。任務存儲進去后,會進行序列化,然后也可以反序列化提取出來,繼續執行。
3. 執行器(executors):
執行器的目的是安排任務到線程池或者進程池中運行的。
4. 調度器(schedulers):
任務調度器是屬於整個調度的總指揮官。他會合理安排作業存儲器、執行器、觸發器進行工作,並進行添加和刪除任務等。調度器通常是只有一個的。開發人員很少直接操作觸發器、存儲器、執行器等。因為這些都由調度器自動來實現了。

四、觸發器:
觸發器有兩種,第一種是 interval,第二種是 crontab。 interval可以具體指定多少時間間隔執行一次。 crontab可以指定執行的日期策略。以下分別進行講解。
1. date觸發器:
在某個日期時間只觸發一次事件。示例代碼如下:
- from datetime import date
- from apscheduler . schedulers . blocking import BlockingScheduler
- sched = BlockingScheduler
- def my_job ( text ):
- print ( text )
- sched . add_job ( my_job , 'date' , run_date = date ( 2020 , 5 , 22 ), args =[ 'text' ])
- sched . start
更多請參考:https://apscheduler.readthedocs.io/en/stable/modules/triggers/date.html
2. interval觸發器:
想要在固定的時間間隔觸發事件。 interval的觸發器可以設置以下的觸發參數:
- weeks :周。整形。
- days :一個月中的第幾天。整形。
- hours :小時。整形。
- minutes :分鍾。整形。
- seconds :秒。整形。
- start_date :間隔觸發的起始時間。
- end_date :間隔觸發的結束時間。
- jitter :觸發的時間誤差。
- def cron_task :
- scheduler = BlockingScheduler
- scheduler . add_job ( tick , "cron" , hour = 11 , minute = 24 )
- scheduler . start
在每天的11點24分觸發事件。更多請參考:https://apscheduler.readthedocs.io/en/stable/modules/triggers/interval.html
3. crontab觸發器:
在某個確切的時間周期性的觸發事件。可以使用的參數如下:
- year :4位數字的年份。
- month :1-12月份。
- day :1-31日。
- week :1-53周。
- day_of_week :一個禮拜中的第幾天( 0 - 6 或者 mon 、 tue 、 wed 、 thu 、 fri 、 sat 、 sun )。
- hour : 0 - 23 小時。
- minute : 0 - 59 分鍾。
- second : 0 - 59 秒。
- start_date : datetime 類型或者字符串類型,起始時間。
- end_date : datetime 類型或者字符串類型,結束時間。
- timezone :時區。
- jitter :任務觸發的誤差時間。
也可以用表達式類型,可以用以下方式:
| 表達式 | 字段 | 描述 |
|---|---|---|
| * | 任何 | 在每個值都觸發 |
| */a | 任何 | 每隔 a觸發一次 |
| a-b | 任何 | 在 a-b區間內任何一個時間觸發( a必須小於 b) |
| a-b/c | 任何 | 在 a-b區間內每隔 c觸發一次 |
| xth y | day | 第 x個星期 y觸發 |
| lastx | day | 最后一個星期 x觸發 |
| last | day | 一個月中的最后一天觸發 |
| x,y,z | 任何 | 可以把上面的表達式進行組合 |
示例如下:
- def cron_task :
- scheduler = BlockingScheduler
- scheduler . add_job ( tick , "cron" , day = "4th sun" , hour = 20 , minute = 1 )
- scheduler . start
- BlockingScheduler :適用於調度程序是進程中唯一運行的進程,調用 start 函數會阻塞當前線程,不能立即返回。
- BackgroundScheduler :適用於調度程序在應用程序的后台運行,調用 start 后主線程不會阻塞。
- AsyncIOScheduler :適用於使用了 asyncio 模塊的應用程序。
- GeventScheduler :適用於使用 gevent 模塊的應用程序。
- TwistedScheduler :適用於構建 Twisted 的應用程序。
- QtScheduler :適用於構建 Qt 的應用程序。
任務存儲器的選擇有兩種。一是內存,也是默認的配置。二是數據庫。使用內存的方式是簡單高效,但是不好的是,一旦程序出現問題,重新運行的話,會把之前已經執行了的任務重新執行一遍。數據庫則可以在程序崩潰后,重新運行可以從之前中斷的地方恢復正常運行。有以下幾種選擇:
- MemoryJobStore :沒有序列化,任務存儲在內存中,增刪改查都是在內存中完成。
- SQLAlchemyJobStore :使用 SQLAlchemy 這個 ORM 框架作為存儲方式。
- MongoDBJobStore :使用 mongodb 作為存儲器。
- RedisJobStore :使用 redis 作為存儲器。
執行器的選擇取決於應用場景。通常默認的 ThreadPoolExecutor已經在大部分情況下是可以滿足我們需求的。如果我們的任務涉及到一些 CPU密集計算的操作。那么應該考慮 ProcessPoolExecutor。然后針對每種程序, apscheduler也設置了不同的 executor:
- ThreadPoolExecutor :線程池執行器。
- ProcessPoolExecutor :進程池執行器。
- GeventExecutor : Gevent 程序執行器。
- TornadoExecutor : Tornado 程序執行器。
- TwistedExecutor : Twisted 程序執行器。
- AsyncIOExecutor : asyncio 程序執行器。
這里我們用一個例子來說明。比如我想這樣配置
- 執行器:
- 配置 default 執行器為 ThreadPoolExecutor ,並且設置最多的線程數是20個。
- <
- 存儲器:
- 配置 default 的任務存儲器為 SQLAlchemyJobStore (使用 SQLite ) 。
- <
- 任務配置:
- 設置 coalesce 為 False :設置這個目的是,比如由於某個原因導致某個任務積攢了很多次沒有執行(比如有一個任務是1分鍾跑一次,但是系統原因斷了5分鍾),如果 coalesce = True ,那么下次恢復運行的時候,會只執行一次,而如果設置 coalesce = False ,那么就不會合並,會5次全部執行。
- max_instances = 5 :同一個任務同一時間最多只能有5個實例在運行。比如一個耗時10分鍾的job,被指定每分鍾運行1次,如果我 max_instance 值5,那么在第 6 ~ 10 分鍾上,新的運行實例不會被執行,因為已經有5個實例在跑了。
那么代碼如下:
- from apscheduler . schedulers . blocking import BlockingScheduler
- from datetime import datetime
- from apscheduler . jobstores . sqlalchemy import SQLAlchemyJobStore
- from apscheduler . executors . pool import ThreadPoolExecutor
- def interval_task :
- jobstores = {
- 'default' : SQLAlchemyJobStore ( url = 'sqlite:///jobs.sqlite' )
- }
- executors = {
- 'default' : ThreadPoolExecutor ( 20 )
- }
- job_defaults = {
- 'coalesce' : False ,
- 'max_instances' : 3
- }
- scheduler = BlockingScheduler ( jobstores = jobstores , executors = executors , job_defaults = job_defaults )
- scheduler . add_job ( tick , "interval" , minutes = 1 )
- scheduler . start
使用 scheduler.add_job(job_obj,args,id,trigger,**trigger_kwargs)。
2. 刪除任務:
使用 scheduler.remove_job(job_id,jobstore=None)。
3. 暫停任務:
使用 scheduler.pause_job(job_id,jobstore=None)。
4. 恢復任務:
使用 scheduler.resume_job(job_id,jobstore=None)。
5. 修改某個任務屬性信息:
使用 scheduler.modify_job(job_id,jobstore=None,**changes)。
6. 修改單個作業的觸發器並更新下次運行時間:
使用 scheduler.reschedule_job(job_id,jobstore=None,trigger=None,**trigger_args)
7. 輸出作業信息:
使用 scheduler.print_jobs(jobstore=None,out=sys.stdout)
十、異常監聽:
當我們的任務拋出異常后,我們可以監聽到,然后把錯誤信息進行記錄。示例代碼如下:
- from apscheduler . schedulers . blocking import BlockingScheduler
- from apscheduler . events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
- import datetime
- import logging
- # 配置日志顯示
- logging . basicConfig ( level = logging . INFO ,
- format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s' ,
- datefmt = '%Y-%m-%d %H:%M:%S' ,
- filename = 'log1.txt' ,
- filemode = 'a' )
- def aps_test ( x ):
- print ( datetime . datetime . now . strftime ( '%Y-%m-%d %H:%M:%S' ), x )
- def date_test ( x ):
- print ( datetime . datetime . now . strftime ( '%Y-%m-%d %H:%M:%S' ), x )
- # 故意拋出異常
- print ( 1 / 0 )
- def my_listener ( event ):
- if event . exception :
- print ( '任務出錯了!!!!!!' )
- else :
- print ( '任務照常運行...' )
- scheduler = BlockingScheduler
- scheduler . add_job ( func = date_test , args =( '一次性任務,會出錯' ,), next_run_time = datetime . datetime . now + datetime . timedelta ( seconds = 15 ), id = 'date_task' )
- scheduler . add_job ( func = aps_test , args =( '循環任務' ,), trigger = 'interval' , seconds = 3 , id = 'interval_task' )
- # 配置任務執行完成和執行錯誤的監聽
- scheduler . add_listener ( my_listener , EVENT_JOB_EXECUTED | EVENT_JOB_ERROR )
- # 設置日志
- scheduler . _logger = logging
- scheduler . start
