Python定時任務APScheduler的實例實例詳解


APScheduler 支持三種調度任務:固定時間間隔,固定時間點(日期),Linux 下的 Crontab 命令。同時,它還支持異步執行、后台執行調度任務。

一、基本架構

  1. 觸發器 triggers:設定觸發任務的條件
  2. 描述一個任務何時被觸發,按日期或按時間間隔或按 cronjob 表達式三種方式觸發
  3. 任務存儲器 job stores:存放任務,可以放內存(默認)或數據庫
  4. 注:調度器之間不能共享任務存儲器
  5. 執行器 executors:用於執行任務,可設定執行模式
  6. 將指定的作業提交到線程池或者進程池中運行,任務完成通知調度器觸發相應的事件。
  7. 調度器 schedulers:將上方三個組件作為參數,創建調度器實例執行。

協調三個組件的運行。

二、調度器組件(schedulers)

  • BlockingScheduler 阻塞式調度器

調度程序是進程中唯一運行的進程,調用start函數會阻塞當前線程,不能立即返回。

1
2
3
4
5
6
7
from apscheduler.schedulers.blocking import BlockingScheduler
import time
scheduler = BlockingScheduler()
def job1():
  print "%s: 執行任務" % time.asctime()
scheduler.add_job(job1, 'interval' , seconds = 3 )
scheduler.start()
  • BackgroundScheduler 后台調度器

當前線程不會阻塞,調度器后台執行

1
2
3
4
5
6
7
8
from apscheduler.schedulers.background import BackgroundScheduler
import time
scheduler = BackgroundScheduler()
def job1():
  print "%s: 執行任務" % time.asctime()
scheduler.add_job(job1, 'interval' , seconds = 3 )
scheduler.start()
time.sleep( 10 )

注:10秒執行完后,程序結束。

  • AsyncIOScheduler AsyncIO調度器

適用於使用了asyncio的情況

1
2
3
4
5
6
7
8
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import asyncio
...
...
try :
  asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
  pass
  • GeventScheduler Gevent調度器

使用了Gevent的情況

1
2
3
4
5
6
7
8
from apscheduler.schedulers.gevent import GeventScheduler
...
...
g = scheduler.start()
try :
  g.join()
except (KeyboardInterrupt, SystemExit):
  pass
  • TornadoScheduler Tornado調度器

適用於構建Tornado應用

  • TwistedScheduler Twisted調度器

適用於構建Twisted應用

  • QtScheduler Qt調度器

適用於構建Qt應用

三、觸發器組件(trigger)

date :只在某個時間點執行一次,具體日期

run_date(datetime|str)

1
2
scheduler.add_job(my_job, 'date' , run_date = datetime( 2019 , 7 , 12 , 15 , 30 , 5 ), args = [])
scheduler.add_job(my_job, 'date' , run_date = "2019-07-12" , args = [])

timezone 指定時區

interval :每隔一段時間允許一次,時間間隔

1
2
3
weeks = 0 | days = 0 | hours = 0 | minutes = 0 | seconds = 0 , start_date = None , end_date = None , timezone = None
scheduler.add_job(my_job, 'interval' , hours = 2 )
scheduler.add_job(my_job, 'interval' , hours = 2 , start_date = '2017-9-8 21:30:00' , end_date = ' 2018 - 06 - 15 21 : 30 : 00 )

cron :任務的運行周期

1
(year = None , month = None , day = None , week = None , day_of_week = None , hour = None , minute = None , second = None , start_date = None , end_date = None , timezone = None )

除了week和 day_of_week,它們的默認值是 *

例如 day=1, minute=20 ,這就等於 year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0 ,工作將在每個月的第一天以每小時20分鍾的時間執行

表達式類型

 

表達式 參數類型 描述
* 所有 通配符。例: minutes=* 即每分鍾觸發
*/a 所有 可被a整除的通配符。
a-b 所有 范圍a-b觸發
a-b/c 所有 范圍a-b,且可被c整除時觸發
xth y 第幾個星期幾觸發。x為第幾個,y為星期幾
last x 一個月中,最后個星期幾觸發
last 一個月最后一天觸發
x,y,z 所有 組合表達式,可以組合確定值或上方的表達式

 

注:當設置的時間間隔小於,任務的執行時間,線程會阻塞住,等待執行完了才能執行下一個任務,可以設置 max_instance 指定一個任務同一時刻有多少個實例在運行,默認為1

四、配置調度器

線程池執行器默認為10,內存任務存儲器為 memoryjobstore ,如果想自己配置的話可以執行以下操作

需求:

  • 兩個任務儲存器分別搭配兩個執行器;同時,還要修改任務的默認參數;最后還要改時區
  • 名稱為“mongo”的 MongoDBJobStore
  • 名稱為“default”的 SQLAlchemyJobStore
  • 名稱為“ThreadPoolExecutor ”的 ThreadPoolExecutor ,最大線程20個
  • 名稱“processpool”的 ProcessPoolExecutor ,最大進程5個
  • UTC時間作為調度器的時區
  • 默認為新任務關閉 合並模式 ()
  • 設置新任務的默認最大實例數為3

方法一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
  'mongo' : MongoDBJobStore(),
  'default' : SQLAlchemyJobStore(url = 'sqlite:///jobs.sqlite' )
}
executors = {
  'default' : ThreadPoolExecutor( 20 ),
  'processpool' : ProcessPoolExecutor( 5 )
}
job_defaults = {
  'coalesce' : False ,
  'max_instances' : 3
}
scheduler = BackgroundScheduler(jobstores = jobstores, executors = executors, job_defaults = job_defaults, timezone = utc)

方法二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from apscheduler.schedulers.background import BackgroundScheduler
# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
  'apscheduler.jobstores.mongo' : {
    'type' : 'mongodb'
  },
  'apscheduler.jobstores.default' : {
   'type' : 'sqlalchemy' ,
   'url' : 'sqlite:///jobs.sqlite'
  },
  'apscheduler.executors.default' : {
   'class' : 'apscheduler.executors.pool:ThreadPoolExecutor' ,
   'max_workers' : '20'
  },
  'apscheduler.executors.processpool' : {
   'type' : 'processpool' ,
   'max_workers' : '5'
  },
  'apscheduler.job_defaults.coalesce' : 'false' ,
  'apscheduler.job_defaults.max_instances' : '3' ,
  'apscheduler.timezone' : 'UTC' ,
})

方法三:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
jobstores = {
  'mongo' : { 'type' : 'mongodb' },
  'default' : SQLAlchemyJobStore(url = 'sqlite:///jobs.sqlite' )
}
executors = {
  'default' : { 'type' : 'threadpool' , 'max_workers' : 20 },
  'processpool' : ProcessPoolExecutor(max_workers = 5 )
}
job_defaults = {
  'coalesce' : False ,
  'max_instances' : 3
}
scheduler = BackgroundScheduler()
# ..這里可以添加任務
scheduler.configure(jobstores = jobstores, executors = executors, job_defaults = job_defaults, timezone = utc)

五、啟動調度器

除了 BlockingScheduler 外,其他非阻塞的調度器都會立即返回,運行之后的代碼。

BlockingScheduler 需要將運行的代碼放在start()之前

1.添加任務

1.調用add_job()  #可以傳參max_instance,同一任務的運行實例個數

  當有任務中途中斷,后面恢復后,有N個任務沒有執行 
    coalesce:true ,恢復的任務會執行一次
    coalesce:false,恢復后的任務會執行N次配合misfire_grace_time使用
    misfire_grace_time設置時間差值,由於某些原因沒有運行,再次提交時,大於設置的時間,實例不會運行。

2.裝飾器scheduled_job()

立即運行可以不設置trigger參數

2.移除任務

1
2
3
4
5
6
# 根據任務實例刪除
job = scheduler.add_job(myfunc, 'interval' , minutes = 2 )
job.remove()
# 根據任務id刪除
scheduler.add_job(myfunc, 'interval' , minutes = 2 , id = 'my_job_id' )
scheduler.remove_job( 'my_job_id' )

3.暫停任務

1
2
3
4
5
6
7
8
job = scheduler.add_job(myfunc, 'interval' , minutes = 2 )
# 根據任務實例
job.pause() #暫停
job.resume() #繼續
# 根據任務id暫停
scheduler.add_job(myfunc, 'interval' , minutes = 2 , id = 'my_job_id' )
scheduler.pause_job( 'my_job_id' )
scheduler.resume_job( 'my_job_id' )

4.調度器操作

1
2
scheduler.start() #開啟
scheduler.shotdown(wait = True | False ) #關閉 False 無論任務是否執行,強制關閉

異常捕獲

1
2
3
4
# 可以添加apscheduler日志至DEBUG級別,這樣就能捕獲異常信息
import logging
logging.basicConfig()
logging.getLogger( 'apscheduler' ).setLevel(logging.DEBUG)

總結

以上所述是小編給大家介紹的Python定時任務APScheduler的實例實例詳解,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對腳本之家網站的支持!
如果你覺得本文對你有幫助,歡迎轉載,煩請注明出處,謝謝!

https://www.jb51.net/article/165895.htm


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM