Python APScheduler 定時任務


下載

pip install apscheduler

Hello World

#!/usr/bin/env python

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime

sched = BlockingScheduler()

def my_job():
    print(f'{datetime.now():%H:%M:%S} Hello World ')

sched.add_job(my_job, 'interval', seconds=5)
# 每5秒執行一次
sched.add_job(my_job, 'cron', hour = '20',minute ='30' ,second = '00')
# 每天的20:30:00執行一次
sched.start()
17:17:05 Hello World
17:17:10 Hello World
17:17:15 Hello World
17:17:20 Hello World
17:17:25 Hello World
17:17:30 Hello World

 

APScheduler 使用起來非常簡單,上面的代碼完成了每個五秒輸出一次信息的功能,它通過如下幾個步驟是實現

BlockingScheduler 調度器中的一種,該種表示在進程中只運行調度程序時使用。
* sched.add_job() 添加作業,並指定調度方式為 interval,時間間隔為 5 秒
* sched.start() 開始任務

除了上述添加作業的方法,還可以使用裝飾器

@sched.scheduled_job('interval', seconds=5)
def my_job():
    print(f'{datetime.now():%H:%M:%S} Hello World ')
如果同一個方法被添加到多個任務重,則需要指定任務 id

@sched.scheduled_job('interval', id='my_job', seconds=5)
@sched.scheduled_job('interval', id='my_job1', seconds=3)
def my_job():
print(f'{datetime.now():%H:%M:%S} Hello World ')

 

基本概念
APScheduler四大組件:

* 觸發器 triggers :用於設定觸發任務的條件
* 任務儲存器 job stores:用於存放任務,把任務存放在內存或數據庫中
* 執行器 executors: 用於執行任務,可以設定執行模式為單線程或線程池
* 調度器 schedulers: 把上方三個組件作為參數,通過創建調度器實例來運行

 

觸發器

每一個任務都有自己的觸發器,觸發器用於決定任務下次運行的時間。

任務儲存器

默認情況下,任務存放在內存中。也可以配置存放在不同類型的數據庫中。如果任務存放在數據庫中,那么任務的存取有一個序列化和反序列化的過程,同時修改和搜索任務的功能也是由任務儲存器實現。

注!一個任務儲存器不要共享給多個調度器,否則會導致狀態混亂

 

執行器

任務會被執行器放入線程池或進程池去執行,執行完畢后,執行器會通知調度器。

 

調度器
一個調度器由上方三個組件構成,一般來說,一個程序只要有一個調度器就可以了。開發者也不必直接操作任務儲存器、執行器以及觸發器,因為調度器提供了統一的接口,通過調度器就可以操作組件,比如任務的增刪改查。
除了剛才用到的調度器,總共有如下幾種

BlockingScheduler 阻塞式調度器:進程中只運行調度程序時使用。
BackgroundScheduler 后台調度器: 當沒有使用任何框架時使用,並希望調度程序在應用程序的后台運行。
AsyncIOScheduler AsyncIO調度器:當應用程序使用 asyncio 模塊時使用
GeventScheduler Gevent調度器:當應用程序使用 gevent 時使用
TornadoScheduler Tornado調度器:當創建 Tornado 應用時使用
TwistedScheduler Twisted調度器:當創建 Twisted 應用時使用
QtScheduler Qt調度器:當創建 Qt 應用時使用

比較常用的是前兩個

任務儲存器的選擇,要看任務是否需要持久化。如果你運行的任務是無狀態的,選擇默認任務儲存器MemoryJobStore就可以應付。但是,如果你需要在程序關閉或重啟時,保存任務的狀態,那么就要選擇持久化的任務儲存器。SQLAlchemyJobStore並搭配PostgreSQL作為后台數據庫。這個方案可以提供強大的數據整合與保護功能。
執行器的選擇,同樣要看你的實際需求。默認的ThreadPoolExecutor線程池執行器方案可以滿足大部分需求。如果,你的程序是計算密集型的,那么最好用ProcessPoolExecutor進程池執行器方案來充分利用多核算力。也可以將ProcessPoolExecutor作為第二執行器,混合使用兩種不同的執行器。

調用方式
add_job() 中 trigger 參數為調用方式,有 interval, day, cron 三種值

date 日期:觸發任務運行的具體日期
interval 間隔:觸發任務運行的時間間隔
cron 周期:觸發任務運行的周期
一個任務也可以設定多種觸發器,比如,可以設定同時滿足所有觸發器條件而觸發,或者滿足一項即觸發。復合觸發器,請查閱一下文檔:鏈接

cron
 
         
cron
指定時間調度,參數如下 year (int|str) – 4-digit year month (int|str) – month (1-12) day (int|str) – day of the (1-31) week (int|str) – ISO week (1-53) day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) hour (int|str) – hour (0-23) minute (int|str) – minute (0-59) second (int|str) – second (0-59) start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) end_date (datetime|str) – latest possible date/time to trigger on (inclusive) timezone (datetime.tzinfo|str) – time zone to use forthe date/time calculations (defaults to scheduler timezone) jitter (int|None) – advance or delay the job execution by jitter seconds at most.

注!month和day_of_week參數分別接受的是英語縮寫jan– dec 和 mon – sun

當參數指定字符串時有許多種用法,比如:

# 當前任務會在 6、7、8、11、12 月的第三個周五的 0、1、2、3 點執行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
當省略時間參數時,在顯式指定參數之前的參數會被設定為*,之后的參數會被設定為最小值,week
和day_of_week的最小值為*。比如,設定day=1, minute=20等同於設定year=’’, month=’’,
day=1, week=’’, day_of_week=’’, hour=’*’, minute=20,
second=0,即每個月的第一天,且當分鍾到達20時就觸發。

start_date 和 end_date 可以用來適用時間范圍

# 在2014-05-30 00:00:00前,每周一到每周五 5:30運行
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')

通過 scheduled_job() 裝飾器實現:

@sched.scheduled_job('cron', id='my_job_id', day='last sun')
def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")

使用標准crontab表達式:

sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *'))

也可以添加jitter振動參數

# 每小時上下浮動120秒觸發
sched.add_job(job_function, 'cron', hour='*', jitter=120)

 

 

interval

時間間隔調用,參數如下

weeks (int) – number of weeks to wait
days (int) – number of days to wait
hours (int) – number of hours to wait
minutes (int) – number of minutes to wait
seconds (int) – number of seconds to wait
start_date (datetime|str) – starting point for(int i = 0; i < the interval calculation
end_date (datetime|str) – latest possible date/time to trigger on
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
jitter (int|None) – advance or delay the job execution by jitter seconds at most.

 

假如你要做一個活動,定時任務需要指定一個時間區間,就不需要手動去開啟或停止了,只需要像下邊這樣設置 start_date, end_date 即可

sched.add_job(job_function, 'interval', hours=2, start_date='2019-03-23 09:30:00', end_date='2019-04-15 11:00:00')

你可以框定周期開始時間start_date和結束時間end_date。

還可以每2小時觸發

# 每2小時觸發
sched.add_job(job_function, 'interval', hours=2)

jitter振動參數,給每次觸發添加一個隨機浮動秒數,一般適用於多服務器,避免同時運行造成服務擁堵。

# 每小時(上下浮動120秒區間內)運行`job_function`
sched.add_job(job_function, 'interval', hours=1, jitter=120)

 

date

指定時間,但只會執行一次

run_date (datetime|str) – the date/time to run the job at
timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already

例子

@sched.scheduled_job('date', id='my_job', run_date='2019-03-23 18:25:30')

其中run_date參數可以是date類型、datetime類型或文本類型。

datetime類型(用於精確時間)

 在2009年11月6日 16:30:05執行
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])

文本類型

sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['text'])

未指定時間,則會立即執行

# 未顯式指定,那么則立即執行
sched.add_job(my_job, args=['text'])

夏令時問題

有些timezone時區可能會有夏令時的問題。這個可能導致令時切換時,任務不執行或任務執行兩次。避免這個問題,可以使用UTC時間,或提前預知並規划好執行的問題。

APScheduler 有多種不同的配置方法,你可以選擇直接傳字典或傳參的方式創建調度器;也可以先實例一個調度器對象,再添加配置信息。靈活的配置方式可以滿足各種應用場景的需要。
整套的配置選項可以參考API文檔BaseScheduler類。一些調度器子類可能有它們自己特有的配置選項,以及獨立的任務儲存器和執行器也可能有自己特有的配置選項,可以查閱API文檔了解。
下面舉一個例子,創建一個使用默認任務儲存器和執行器的BackgroundScheduler:

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

# 因為是非阻塞的后台調度器,所以程序會繼續向下執行

這樣就可以創建了一個后台調度器。這個調度器有一個名稱為default的MemoryJobStore(內存任務儲存器)和一個名稱是default且最大線程是10的ThreadPoolExecutor(線程池執行器)。
假如你現在有這樣的需求,兩個任務儲存器分別搭配兩個執行器;同時,還要修改任務的默認參數;最后還要改時區。可以參考下面例子,它們是完全等價的。

名稱為“mongo”的MongoDBJobStore
名稱為“default”的SQLAlchemyJobStore
名稱為“ThreadPoolExecutor ”的ThreadPoolExecutor,最大線程20個
名稱“processpool”的ProcessPoolExecutor,最大進程5個
UTC時間作為調度器的時區
默認為新任務關閉合並模式()
設置新任務的默認最大實例數為3

方法一:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

方法二:

from apscheduler.schedulers.background import BackgroundScheduler


# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
    'apscheduler.jobstores.mongo': {
         'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
})

方法三:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler()

# ..這里可以添加任務

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

啟動調度器
啟動調度器是只需調用start()即可。除了BlockingScheduler,非阻塞調度器都會立即返回,可以繼續運行之后的代碼,比如添加任務等。
對於BlockingScheduler,程序則會阻塞在start()位置,所以,要運行的代碼必須寫在start()之前。

注!調度器啟動后,就不能修改配置了。

 

 

添加任務
添加任務的方法有兩種:

通過調用add_job()
通過裝飾器scheduled_job()
第一種方法是最常用的;第二種方法是最方便的,但缺點就是運行時,不能修改任務。第一種add_job()方法會返回一個apscheduler.job.Job實例,這樣就可以在運行時,修改或刪除任務。

在任何時候你都能配置任務。但是如果調度器還沒有啟動,此時添加任務,那么任務就處於一個暫存的狀態。只有當調度器啟動時,才會開始計算下次運行時間。

還有一點要注意,如果你的執行器或任務儲存器是會序列化任務的,那么這些任務就必須符合:

回調函數必須全局可用
回調函數參數必須也是可以被序列化的
內置任務儲存器中,只有MemoryJobStore不會序列化任務;內置執行器中,只有ProcessPoolExecutor會序列化任務。

重要提醒!
如果在程序初始化時,是從數據庫讀取任務的,那么必須為每個任務定義一個明確的ID,並且使用replace_existing=True,否則每次重啟程序,你都會得到一份新的任務拷貝,也就意味着任務的狀態不會保存。

建議 如果想要立刻運行任務,可以在添加任務時省略trigger參數

 

 

移除任務
如果想從調度器移除一個任務,那么你就要從相應的任務儲存器中移除它,這樣才算移除了。有兩種方式:

調用remove_job(),參數為:任務ID,任務儲存器名稱
在通過add_job()創建的任務實例上調用remove()方法
第二種方式更方便,但前提必須在創建任務實例時,實例被保存在變量中。對於通過scheduled_job()創建的任務,只能選擇第一種方式。

當任務調度結束時(比如,某個任務的觸發器不再產生下次運行的時間),任務就會自動移除

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

同樣,通過任務的具體ID:

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

 

 

暫停和恢復任務


通過任務實例或調度器,就能暫停和恢復任務。如果一個任務被暫停了,那么該任務的下一次運行時間就會被移除。在恢復任務前,運行次數計數也不會被統計。

暫停任務,有以下兩個方法:

apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()

 

恢復任務,

apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()

 

獲取任務列表
通過get_jobs()就可以獲得一個可修改的任務列表。get_jobs()第二個參數可以指定任務儲存器名稱,那么就會獲得對應任務儲存器的任務列表。

print_jobs()可以快速打印格式化的任務列表,包含觸發器,下次運行時間等信息。

修改任務
通過apscheduler.job.Job.modify()或modify_job(),你可以修改任務當中除了id的任何屬性。

比如:

job.modify(max_instances=6, name='Alternate name')

 

1
如果想要重新調度任務(就是改變觸發器),你能通過apscheduler.job.Job.reschedule()或reschedule_job()來實現。這些方法會重新創建觸發器,並重新計算下次運行時間。

比如:

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

 


關閉調度器

關閉方法如下:

scheduler.shutdown()

默認情況下,調度器會先把正在執行的任務處理完,再關閉任務儲存器和執行器。但是,如果你就直接關閉,你可以添加參數:

scheduler.shutdown(wait=False)

 

上述方法不管有沒有任務在執行,會強制關閉調度器。

 

 

 

暫停、恢復任務進程
調度器可以暫停正在執行的任務:

scheduler.pause()

也可以恢復任務:

scheduler.resume()

同時,也可以在調度器啟動時,默認所有任務設為暫停狀態。

scheduler.start(paused=True)

限制任務執行的實例並行數
默認情況下,在同一時間,一個任務只允許一個執行中的實例在運行。比如說,一個任務是每5秒執行一次,但是這個任務在第一次執行的時候花了6秒,也就是說前一次任務還沒執行完,后一次任務又觸發了,由於默認一次只允許一個實例執行,所以第二次就丟失了。為了杜絕這種情況,可以在添加任務時,設置max_instances參數,為指定任務設置最大實例並行數。

丟失任務的執行與合並
有時,任務會由於一些問題沒有被執行。最常見的情況就是,在數據庫里的任務到了該執行的時間,但調度器被關閉了,那么這個任務就成了“啞彈任務”。錯過執行時間后,調度器才打開了。這時,調度器會檢查每個任務的misfire_grace_time參數int值,即啞彈上限,來確定是否還執行啞彈任務(這個參數可以全局設定的或者是為每個任務單獨設定)。此時,一個啞彈任務,就可能會被連續執行多次。
但這就可能導致一個問題,有些啞彈任務實際上並不需要被執行多次。coalescing合並參數就能把一個多次的啞彈任務揉成一個一次的啞彈任務。也就是說,coalescing為True能把多個排隊執行的同一個啞彈任務,變成一個,而不會觸發啞彈事件。

注!如果是由於線程池/進程池滿了導致的任務延遲,執行器就會跳過執行。要避免這個問題,可以添加進程或線程數來實現或把
misfire_grace_time值調高。

 

 

調度器事件
調度器允許添加事件偵聽器。部分事件會有特有的信息,比如當前運行次數等。add_listener(callback,mask)中,第一個參數是回調對象,mask是指定偵聽事件類型,mask參數也可以是邏輯組合。回調對象會有一個參數就是觸發的事件。

具體可以查看文檔中events模塊,里面有關於事件類型以及事件參數的詳細說明。

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')

# 當任務執行完或任務出錯時,調用my_listener
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

 


異常捕獲
通過logging模塊,可以添加apscheduler日志至DEBUG級別,這樣就能捕獲異常信息。

關於logging初始化的方式如下:

 

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

日志會提供很多調度器的內部運行信息。


————————————————
版權聲明:本文為CSDN博主「abc_soul」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/abc_soul/article/details/88875643

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM