APScheduler(定時任務二配置調度器)


一. 配置調度器

  APScheduler 有多種不同的配置方法,你可以選擇直接傳字典或傳參的方式創建調度器;

也可以先實例一個調度器對象,再添加配置信息。靈活的配置方式可以滿足各種應用場景的需要。

 

整套的配置選項可以參考API文檔BaseScheduler類。

一些調度器子類可能有它們自己特有的配置選項,以及獨立的任務儲存器和執行器也可能有自己特有的配置選項,可以查閱API文檔了解。

 1.1 舉一個例子,創建一個使用默認任務儲存器和執行器的BackgroundScheduler

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

# 因為是非阻塞的后台調度器,所以程序會繼續向下執行

這樣就可以創建了一個后台調度器。這個調度器有一個名稱為defaultMemoryJobStore(內存任務儲存器)和一個名稱是default且最大線程是10的ThreadPoolExecutor(線程池執行器)。

假如你現在有這樣的需求,兩個任務儲存器分別搭配兩個執行器;同時,還要修改任務的默認參數;最后還要改時區。可以參考下面例子,它們是完全等價的。

 

  • 名稱為“mongo”的MongoDBJobStore
  • 名稱為“default”的SQLAlchemyJobStore
  • 名稱為“default”的ThreadPoolExecutor,最大線程20個
  • 名稱“processpool”的ProcessPoolExecutor,最大進程5個
  • UTC時間作為調度器的時區
  • 默認為新任務關閉合並模式()
  • 設置新任務的默認最大實例數為3
方法一:
#-*- coding:utf-8 -*-

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
    'mongo': MongoDBJobStore(), #調度器名稱為mongo的MemoryJobStore(內存任務儲存器)
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')#調度器名稱為“default”的SQLAlchemyJobStore
}
executors = {
    'default': ThreadPoolExecutor(20),#名稱為“default ”的ThreadPoolExecutor,最大線程20個
    'processpool': ProcessPoolExecutor(5)#名稱“processpool”的ProcessPoolExecutor,最大進程5個
}
job_defaults = {
    'coalesce': False, #默認為新任務關閉合並模式()
    'max_instances': 3#設置新任務的默認最大實例數為3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

方法二:

#-*- coding:utf-8 -*-
from apscheduler.schedulers.background import BackgroundScheduler


# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
    'apscheduler.jobstores.mongo': {
         'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
})

方法三:

#-*- coding:utf-8 -*-
from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler()

# ..這里可以添加任務

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

1.2  啟動調度器

啟動調度器是只需調用start()即可。除了BlockingScheduler,非阻塞調度器都會立即返回,可以繼續運行之后的代碼,比如添加任務等。

對於BlockingScheduler,程序則會阻塞在start()位置,所以,要運行的代碼必須寫在start()之前。

注!調度器啟動后,就不能修改配置了。

1.3  添加任務

添加任務的方法有兩種:

  1. 通過調用add_job()
  2. 通過裝飾器scheduled_job()

第一種方法是最常用的;

第二種方法是最方便的,但缺點就是運行時,不能修改任務。

第一種add_job()方法會返回一個apscheduler.job.Job實例,這樣就可以在運行時,修改或刪除任務。在任何時候你都能配置任務。但是如果調度器還沒有啟動,此時添加任務,那么任務就處於一個暫存的狀態。只有當調度器啟動時,才會開始計算下次運行時間。

還有一點要注意,如果你的執行器或任務儲存器是會序列化任務的,那么這些任務就必須符合:

  1. 回調函數必須全局可用
  2. 回調函數參數必須也是可以被序列化的

內置任務儲存器中,只有MemoryJobStore不會序列化任務;內置執行器中,只有ProcessPoolExecutor會序列化任務。

重要提醒!
如果在程序初始化時,是從數據庫讀取任務的,那么必須為每個任務定義一個明確的ID,並且使用replace_existing=True,否則每次重啟程序,你都會得到一份新的任務拷貝,也就意味着任務的狀態不會保存。

建議
如果想要立刻運行任務,可以在添加任務時省略trigger參數

1.4  移除任務

如果想從調度器移除一個任務,那么你就要從相應的任務儲存器中移除它,這樣才算移除了。有兩種方式:

  1. 調用remove_job(),參數為:任務ID,任務儲存器名稱
  2. 在通過add_job()創建的任務實例上調用remove()方法

第二種方式更方便,但前提必須在創建任務實例時,實例被保存在變量中。對於通過scheduled_job()創建的任務,只能選擇第一種方式。

當任務調度結束時(比如,某個任務的觸發器不再產生下次運行的時間),任務就會自動移除。

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

同樣,通過任務的具體ID:

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

 

1.5 暫停和恢復任務

通過任務實例或調度器,就能暫停和恢復任務。如果一個任務被暫停了,那么該任務的下一次運行時間就會被移除。在恢復任務前,運行次數計數也不會被統計。

暫停任務,有以下兩個方法:

  • apscheduler.job.Job.pause()
  • apscheduler.schedulers.base.BaseScheduler.pause_job()

恢復任務,

  • apscheduler.job.Job.resume()
  • apscheduler.schedulers.base.BaseScheduler.resume_job()

1.6   獲取任務列表

通過get_jobs()就可以獲得一個可修改的任務列表。get_jobs()第二個參數可以指定任務儲存器名稱,那么就會獲得對應任務儲存器的任務列表。

print_jobs()可以快速打印格式化的任務列表,包含觸發器,下次運行時間等信息。

1.7  修改任務

通過apscheduler.job.Job.modify()modify_job(),你可以修改任務當中除了id的任何屬性。

比如:

job.modify(max_instances=6, name='Alternate name')

 

如果想要重新調度任務(就是改變觸發器),你能通過apscheduler.job.Job.reschedule()reschedule_job()來實現。這些方法會重新創建觸發器,並重新計算下次運行時間。

比如:

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

 

1.8  關閉調度器

關閉方法如下:

scheduler.shutdown()

默認情況下,調度器會先把正在執行的任務處理完,再關閉任務儲存器和執行器。但是,如果你就直接關閉,你可以添加參數:

scheduler.shutdown(wait=False)
 

上述方法不管有沒有任務在執行,會強制關閉調度器。


1.9  暫停、恢復任務進程

調度器可以暫停正在執行的任務:

scheduler.pause()
 

也可以恢復任務:

 
scheduler.resume()

同時,也可以在調度器啟動時,默認所有任務設為暫停狀態。

scheduler.start(paused=True)

 

2.0 限制任務執行的實例並行數

默認情況下,在同一時間,一個任務只允許一個執行中的實例在運行。

比如說,一個任務是每5秒執行一次,但是這個任務在第一次執行的時候花了6秒,也就是說前一次任務還沒執行完,

后一次任務又觸發了,由於默認一次只允許一個實例執行,所以第二次就丟失了。為了杜絕這種情況,可以在添加任務時,

設置max_instances參數,為指定任務設置最大實例並行數。

2.1  丟失任務的執行與合並

有時,任務會由於一些問題沒有被執行。

最常見的情況就是,在數據庫里的任務到了該執行的時間,但調度器被關閉了,那么這個任務就成了“啞彈任務”。

錯過執行時間后,調度器才打開了。這時,調度器會檢查每個任務的misfire_grace_time參數int值,即啞彈上限,來確定是否還執行啞彈任務(這個參數可以全局設定的或者是為每個任務單獨設定)。

此時,一個啞彈任務,就可能會被連續執行多次。

但這就可能導致一個問題,有些啞彈任務實際上並不需要被執行多次。coalescing合並參數就能把一個多次的啞彈任務揉成一個一次的啞彈任務。

也就是說,coalescingTrue能把多個排隊執行的同一個啞彈任務,變成一個,而不會觸發啞彈事件。

注!如果是由於線程池/進程池滿了導致的任務延遲,執行器就會跳過執行。要避免這個問題,可以添加進程或線程數來實現或把 misfire_grace_time值調高。

2.2   調度器事件

調度器允許添加事件偵聽器。部分事件會有特有的信息,比如當前運行次數等。add_listener(callback,mask)中,第一個參數是回調對象,mask是指定偵聽事件類型,mask參數也可以是邏輯組合。回調對象會有一個參數就是觸發的事件。

具體可以查看文檔中events模塊,里面有關於事件類型以及事件參數的詳細說明。

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')

# 當任務執行完或任務出錯時,調用my_listener
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

事件類型

Constant Description Event class
EVENT_SCHEDULER_STARTED

計划程序已啟動 

SchedulerEvent
EVENT_SCHEDULER_SHUTDOWN 調度程序已關閉 SchedulerEvent
EVENT_SCHEDULER_PAUSED 計划程序中的作業處理已暫停 SchedulerEvent
EVENT_SCHEDULER_RESUMED 計划程序中的作業處理已恢復 SchedulerEvent
EVENT_EXECUTOR_ADDED 已將執行器添加到計划程序中 SchedulerEvent
EVENT_EXECUTOR_REMOVED 一個執行器被移到調度程序中 SchedulerEvent
EVENT_JOBSTORE_ADDED 已將作業存儲添加到計划程序 SchedulerEvent
EVENT_JOBSTORE_REMOVED

已從計划程序中刪除作業存儲

SchedulerEvent
EVENT_ALL_JOBS_REMOVED 所有作業都已從所有作業存儲或一個特定作業存儲中刪除 SchedulerEvent
EVENT_JOB_ADDED

作業已添加到作業存儲中

JobEvent
EVENT_JOB_REMOVED 已從作業存儲中刪除作業 JobEvent
EVENT_JOB_MODIFIED 已從計划程序外部修改作業 JobEvent
EVENT_JOB_SUBMITTED

作業已提交給其執行者以運行

JobSubmissionEvent
EVENT_JOB_MAX_INSTANCES

提交給其執行者的作業未被執行者接受,因為該作業已達到其最大並發執行實例數

JobSubmissionEvent
EVENT_JOB_EXECUTED

作業已成功執行

JobExecutionEvent
EVENT_JOB_ERROR

作業在執行期間引發異常

JobExecutionEvent
EVENT_JOB_MISSED

錯失了一份工作

JobExecutionEvent
EVENT_ALL

包含所有事件類型的全部捕獲掩碼

N/A


2.3  異常捕獲

通過logging模塊,可以添加apscheduler日志至DEBUG級別,這樣就能捕獲異常信息。

關於logging初始化的方式如下:

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

 

日志會提供很多調度器的內部運行信息。

文章參考鏈接:https://apscheduler.readthedocs.io/en/latest/index.html




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM