APScheduler庫的詳細用法


 

python定時任務最強框架APScheduler詳細教程

原標題:python定時任務最強框架APScheduler詳細教程

APScheduler定時任務

上次測試女神聽了我的建議,已經做好了要給項目添加定時任務的決定了。但是之前提供的四種方式中,她不知道具體選擇哪一個。為了和女神更近一步,我把我入行近10年收藏的干貨免費拿出來分享給女神,希望女神凌晨2點再找我的時候,不再是因為要給他調程序了。

Python中定時任務的解決方案,總體來說有四種,分別是: crontabschedulerCeleryAPScheduler,其中 crontab不適合多台服務器的配置、 scheduler太過於簡單、 Celery依賴的軟件比較多,比較耗資源。最好的解決方案就是 APScheduler

APScheduler使用起來十分方便。提供了基於日期、固定時間間隔以及 crontab類型的任務。還可以在程序運行過程中動態的新增任務和刪除任務。在任務運行過程中,還可以把任務存儲起來,下次啟動運行依然保留之前的狀態。另外最重要的一個特點是,因為他是基於 Python語言的庫,所以是可以跨平台的,一段代碼,處處運行!

在這里我來給大家詳細介紹一下具體的用法。

一、安裝:

安裝非常簡單,通過 pip install apscheduler即可。

二、基本使用:

先來看一段代碼,然后再來給大家詳細講解其中的細節:

  1. from apscheduler . schedulers . blocking import BlockingScheduler
  2. from datetime import datetime
  3. def my_clock :
  4. print ( "Hello! The time is:%s" % datetime . now )
  5. if __name__ == '__main__' :
  6. scheduler = BlockingScheduler
  7. scheduler . add_job ( my_clock , "interval" , seconds = 3 )
  8. scheduler . start

其中 BlockingScheduler是阻塞性的調度器,是最基本的調度器,下面調用 start方法就會阻塞當前進程,所以如果你的程序除了調度進程沒有其他后台進程,那么是可以是否的,否則這個調度器會阻塞你程序的正常執行。

接下來就是定義一個 my_clock函數,這個函數就是需要定時調度的任務代碼。

然后就是實例化一個 BlockingScheduler對象,並把 my_clock添加到任務調度中。然后看 interval參數,這里用的是間隔的方式來調度,調度頻率是 seconds=3,也就是每3秒執行一次。

執行結果如下:

可以看到每隔3秒鍾的時間會執行一次。說明定時任務已經成功執行了!

在了解了 APScheduler的基本使用后,再來對 APScheduler的四個基本對象做個了解,這樣才能從全局掌握 APScheduler

三、四個基本對象: 1. 觸發器(triggers):

觸發器就是根據你指定的觸發方式,比如是按照時間間隔,還是按照 crontab觸發,觸發條件是什么等。每個任務都有自己的觸發器。

2. 任務存儲器(job stores):

任務存儲器是可以存儲任務的地方,默認情況下任務保存在內存,也可將任務保存在各種數據庫中。任務存儲進去后,會進行序列化,然后也可以反序列化提取出來,繼續執行。

3. 執行器(executors):

執行器的目的是安排任務到線程池或者進程池中運行的。

4. 調度器(schedulers):

任務調度器是屬於整個調度的總指揮官。他會合理安排作業存儲器、執行器、觸發器進行工作,並進行添加和刪除任務等。調度器通常是只有一個的。開發人員很少直接操作觸發器、存儲器、執行器等。因為這些都由調度器自動來實現了。

四、觸發器:

觸發器有兩種,第一種是 interval,第二種是 crontabinterval可以具體指定多少時間間隔執行一次。 crontab可以指定執行的日期策略。以下分別進行講解。

1. date觸發器:

在某個日期時間只觸發一次事件。示例代碼如下:

  1. from datetime import date
  2. from apscheduler . schedulers . blocking import BlockingScheduler
  3. sched = BlockingScheduler
  4. def my_job ( text ):
  5. print ( text )
  6. sched . add_job ( my_job , 'date' , run_date = date ( 2020 , 5 , 22 ), args =[ 'text' ])
  7. sched . start

更多請參考:https://apscheduler.readthedocs.io/en/stable/modules/triggers/date.html

2. interval觸發器:

想要在固定的時間間隔觸發事件。 interval的觸發器可以設置以下的觸發參數:

  1. weeks :周。整形。
  2. days :一個月中的第幾天。整形。
  3. hours :小時。整形。
  4. minutes :分鍾。整形。
  5. seconds :秒。整形。
  6. start_date :間隔觸發的起始時間。
  7. end_date :間隔觸發的結束時間。
  8. jitter :觸發的時間誤差。
  1. def cron_task :
  2. scheduler = BlockingScheduler
  3. scheduler . add_job ( tick , "cron" , hour = 11 , minute = 24 )
  4. scheduler . start

在每天的11點24分觸發事件。更多請參考:https://apscheduler.readthedocs.io/en/stable/modules/triggers/interval.html

3. crontab觸發器:

在某個確切的時間周期性的觸發事件。可以使用的參數如下:

  1. year :4位數字的年份。
  2. month :1-12月份。
  3. day :1-31日。
  4. week :1-53周。
  5. day_of_week :一個禮拜中的第幾天( 0 - 6 或者 mon 、 tue 、 wed 、 thu 、 fri 、 sat 、 sun )。
  6. hour : 0 - 23 小時。
  7. minute : 0 - 59 分鍾。
  8. second : 0 - 59 秒。
  9. start_date : datetime 類型或者字符串類型,起始時間。
  10. end_date : datetime 類型或者字符串類型,結束時間。
  11. timezone :時區。
  12. jitter :任務觸發的誤差時間。

也可以用表達式類型,可以用以下方式:

表達式 字段 描述
* 任何 在每個值都觸發
*/a 任何 每隔 a觸發一次
a-b 任何 a-b區間內任何一個時間觸發( a必須小於 b
a-b/c 任何 a-b區間內每隔 c觸發一次
xth y day x個星期 y觸發
lastx day 最后一個星期 x觸發
last day 一個月中的最后一天觸發
x,y,z 任何 可以把上面的表達式進行組合

示例如下:

  1. def cron_task :
  2. scheduler = BlockingScheduler
  3. scheduler . add_job ( tick , "cron" , day = "4th sun" , hour = 20 , minute = 1 )
  4. scheduler . start
五、調度器:
  1. BlockingScheduler :適用於調度程序是進程中唯一運行的進程,調用 start 函數會阻塞當前線程,不能立即返回。
  2. BackgroundScheduler :適用於調度程序在應用程序的后台運行,調用 start 后主線程不會阻塞。
  3. AsyncIOScheduler :適用於使用了 asyncio 模塊的應用程序。
  4. GeventScheduler :適用於使用 gevent 模塊的應用程序。
  5. TwistedScheduler :適用於構建 Twisted 的應用程序。
  6. QtScheduler :適用於構建 Qt 的應用程序。
六、任務存儲器:

任務存儲器的選擇有兩種。一是內存,也是默認的配置。二是數據庫。使用內存的方式是簡單高效,但是不好的是,一旦程序出現問題,重新運行的話,會把之前已經執行了的任務重新執行一遍。數據庫則可以在程序崩潰后,重新運行可以從之前中斷的地方恢復正常運行。有以下幾種選擇:

  1. MemoryJobStore :沒有序列化,任務存儲在內存中,增刪改查都是在內存中完成。
  2. SQLAlchemyJobStore :使用 SQLAlchemy 這個 ORM 框架作為存儲方式。
  3. MongoDBJobStore :使用 mongodb 作為存儲器。
  4. RedisJobStore :使用 redis 作為存儲器。
七、執行器:

執行器的選擇取決於應用場景。通常默認的 ThreadPoolExecutor已經在大部分情況下是可以滿足我們需求的。如果我們的任務涉及到一些 CPU密集計算的操作。那么應該考慮 ProcessPoolExecutor。然后針對每種程序, apscheduler也設置了不同的 executor

  1. ThreadPoolExecutor :線程池執行器。
  2. ProcessPoolExecutor :進程池執行器。
  3. GeventExecutor : Gevent 程序執行器。
  4. TornadoExecutor : Tornado 程序執行器。
  5. TwistedExecutor : Twisted 程序執行器。
  6. AsyncIOExecutor : asyncio 程序執行器。
八、定時任務調度配置:

這里我們用一個例子來說明。比如我想這樣配置

  1. 執行器:
    • 配置 default 執行器為 ThreadPoolExecutor ,並且設置最多的線程數是20個。
    • <
  2. 存儲器:
    • 配置 default 的任務存儲器為 SQLAlchemyJobStore (使用 SQLite ) 。
    • <
  3. 任務配置:
    • 設置 coalesce 為 False :設置這個目的是,比如由於某個原因導致某個任務積攢了很多次沒有執行(比如有一個任務是1分鍾跑一次,但是系統原因斷了5分鍾),如果 coalesce = True ,那么下次恢復運行的時候,會只執行一次,而如果設置 coalesce = False ,那么就不會合並,會5次全部執行。
    • max_instances = 5 :同一個任務同一時間最多只能有5個實例在運行。比如一個耗時10分鍾的job,被指定每分鍾運行1次,如果我 max_instance 值5,那么在第 6 ~ 10 分鍾上,新的運行實例不會被執行,因為已經有5個實例在跑了。

那么代碼如下:

  1. from apscheduler . schedulers . blocking import BlockingScheduler
  2. from datetime import datetime
  3. from apscheduler . jobstores . sqlalchemy import SQLAlchemyJobStore
  4. from apscheduler . executors . pool import ThreadPoolExecutor
  5. def interval_task :
  6. jobstores = {
  7. 'default' : SQLAlchemyJobStore ( url = 'sqlite:///jobs.sqlite' )
  8. }
  9. executors = {
  10. 'default' : ThreadPoolExecutor ( 20 )
  11. }
  12. job_defaults = {
  13. 'coalesce' : False ,
  14. 'max_instances' : 3
  15. }
  16. scheduler = BlockingScheduler ( jobstores = jobstores , executors = executors , job_defaults = job_defaults )
  17. scheduler . add_job ( tick , "interval" , minutes = 1 )
  18. scheduler . start
九、任務操作: 1. 添加任務:

使用 scheduler.add_job(job_obj,args,id,trigger,**trigger_kwargs)

2. 刪除任務:

使用 scheduler.remove_job(job_id,jobstore=None)

3. 暫停任務:

使用 scheduler.pause_job(job_id,jobstore=None)

4. 恢復任務:

使用 scheduler.resume_job(job_id,jobstore=None)

5. 修改某個任務屬性信息:

使用 scheduler.modify_job(job_id,jobstore=None,**changes)

6. 修改單個作業的觸發器並更新下次運行時間:

使用 scheduler.reschedule_job(job_id,jobstore=None,trigger=None,**trigger_args)

7. 輸出作業信息:

使用 scheduler.print_jobs(jobstore=None,out=sys.stdout)

十、異常監聽:

當我們的任務拋出異常后,我們可以監聽到,然后把錯誤信息進行記錄。示例代碼如下:

  1. from apscheduler . schedulers . blocking import BlockingScheduler
  2. from apscheduler . events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
  3. import datetime
  4. import logging
  5. # 配置日志顯示
  6. logging . basicConfig ( level = logging . INFO ,
  7. format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s' ,
  8. datefmt = '%Y-%m-%d %H:%M:%S' ,
  9. filename = 'log1.txt' ,
  10. filemode = 'a' )
  11. def aps_test ( x ):
  12. print ( datetime . datetime . now . strftime ( '%Y-%m-%d %H:%M:%S' ), x )
  13. def date_test ( x ):
  14. print ( datetime . datetime . now . strftime ( '%Y-%m-%d %H:%M:%S' ), x )
  15. # 故意拋出異常
  16. print ( 1 / 0 )
  17. def my_listener ( event ):
  18. if event . exception :
  19. print ( '任務出錯了!!!!!!' )
  20. else :
  21. print ( '任務照常運行...' )
  22. scheduler = BlockingScheduler
  23. scheduler . add_job ( func = date_test , args =( '一次性任務,會出錯' ,), next_run_time = datetime . datetime . now + datetime . timedelta ( seconds = 15 ), id = 'date_task' )
  24. scheduler . add_job ( func = aps_test , args =( '循環任務' ,), trigger = 'interval' , seconds = 3 , id = 'interval_task' )
  25. # 配置任務執行完成和執行錯誤的監聽
  26. scheduler . add_listener ( my_listener , EVENT_JOB_EXECUTED | EVENT_JOB_ERROR )
  27. # 設置日志
  28. scheduler . _logger = logging
  29. scheduler . start


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM