python-APScheduler


python-APScheduler

APScheduler定時框架

APScheduler 全稱是Advanced Python Scheduler是一個 Python 定時任務框架,用於執行周期或者定時任務,APScheduler 支持三種調度任務:固定時間間隔,固定時間點(日期)Linux 下的 Crontab 命令 ,同時支持異步執行、后台執行調度任務。

可以在主程序的運行過程中快速增加新作業或刪除舊作業.apscheduler 可以當作一個跨平台的調度工具來使用,可以做為 linux 系統crontab 工具或 windows 計划任務程序的替換,不僅可以添加、刪除定時任務,還可以將任務存儲到數據庫中、實現任務的持久化

特點:

  • 類似於 Liunx Cron 的調度程序(可選的開始/結束時間)
  • 基於時間間隔的執行調度(周期性調度,可選的開始/結束時間)
  • 一次性執行任務(在設定的日期/時間運行一次任務)

安裝

pip install apscheduler

基礎概念

  • 觸發器(trigger)

某一個工作到來時引發的事件,包含調度的邏輯,每一個作業都有它自己的觸發器,用於決定哪個作業任務會執行,除了它們初始化配置之外,其完全是無狀態的。總的來說就是一個任務應該在什么時候執行

  • 執行器(executor)

主要是處理作業的運行,它將要執行的作業放在新的線程或者線程池中運行。執行完畢之后,再通知調度器。基於線程池的操作,可以針對不同類型的作業任務,更為高效的使用CPU的計算資源。

  • 作業存儲(job stores)

保存要調度的任務,其中除了默認的作業存儲是把作業保存在內存中,其他的作業存儲是將作業保存在數據庫中。一個作業的數據將在保存在持久化的作業存儲之前,會對作業執行序列化操作,當重新讀取作業時,再執行反序列化操作。同時,調度器不能分享同一個作業存儲。作業存儲支持主流的存儲機制:如redis,mongodb,關系型數據庫,內存等等。

  • 調度器(scheduler)

負責將上面幾個組件聯系在一起,一般在應用中只有一個調度器,程序開發者不會直接操作觸發器、作業存儲或執行器,而是利用調度器提供了處理這些合適的接口,作業存儲和執行器的配置都是通過在調度器中完成的。

使用步驟和流程

  1. 新建一個 schedulers (調度器) 。

  2. 添加一個調度任務(job stores)。

  3. 運行調度任務。

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
# 創建執行函數
def job():
    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# BlockingScheduler

#創建調度器
sched = BlockingScheduler()

# 添加執行函數
sched.add_job(my_job, 'interval', seconds=5, id='my_job_id')

# 執行定時調度
sched.start()


# 或者采用裝飾器的方式
sched = BlockingScheduler()

@sched.scheduled_job('interval', seconds=5)
def my_job():
    print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

sched.start()

Job 作業

Job作為APScheduler最小執行單位。創建Job時指定執行的函數,函數中所需參數,Job執行時的一些設置信息

  • func:Job執行的函數

  • trigger:apscheduler定義的觸發器,用於確定Job的執行時間,根據設置的trigger規則,計算得到下次執行此job的時間, 滿足時將會執行

  • args=None, kwargs=None

  • id:指定作業的唯一ID

  • name:指定作業的名字

  • misfire_grace_time:Job的延遲執行時間,例如Job的計划執行時間是21:00:00,但因服務重啟或其他原因導致21:00:31才執行,如果設置此key為40,則該job會繼續執行,否則將會丟棄此job

  • max_instances:執行此job的最大實例數,executor執行job時,根據job的id來計算執行次數,根據設置的最大實例數來確定是否可執行

  • next_run_time:Job下次的執行時間,創建Job時可以指定一個時間[datetime],不指定的話則默認根據trigger獲取觸發時間

  • coalesce:Job是否合並執行,是一個bool值。例如scheduler停止20s后重啟啟動,而job的觸發器設置為5s執行一次,因此此job錯過了4個執行時間,如果設置為是,則會合並到一次執行,否則會逐個執行

  • executor:apscheduler定義的執行器,job創建時設置執行器的名字,根據字符串你名字到scheduler獲取到執行此job的 執行器,執行job指定的函數

調度器(schedulers)

APScheduler 提供 7 種調度器,能夠滿足我們各種場景的需要

BlockingScheduler : 調度器在當前進程的主線程中運行,調用start函數會阻塞當前線程,不能立即返回。
BackgroundScheduler : 調度器在后台線程中運行,調用start主線程不會阻塞。
AsyncIOScheduler : 適用於 asyncio 模塊(一個異步框架)一起使用。
GeventScheduler : 適用於 gevent(高性能的Python並發框架)。
TornadoScheduler : 適用於 Tornado(一個web框架)的IO模型,用 ioloop.add_timeout 完成定時喚醒。
TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定時喚醒。
QtScheduler : 適用於 構建 Qt 應用程序,需使用QTimer完成定時喚醒。

導入方式

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.gevent import GeventScheduler
from apscheduler.schedulers.tornado import TornadoScheduler
from apscheduler.schedulers.qt import QtScheduler

觸發器(triggers)

作用:根據觸發器的規則計算出Job的觸發時間,然后與當前時間比較確定此Job是否會被執行,總之就是根據trigger規則計算出下一個執行時間。

目前APScheduler 提供三種任務觸發器:

  • date 固定日期出發,它表示特定的時間點觸發, 作業任務只會執行一次
  • interval 間隔時間調度, 固定時間間隔觸發。interval 間隔調度
  • cron 特定時間周期性地觸發,和Linux crontab 格式兼容。它是功能最強大的觸發器

date 觸發

sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])

interval 觸發

  • weeks (int) – 間隔幾周
  • days (int) – 間隔幾天
  • hours (int) – 間隔幾小時
  • minutes (int) – 間隔幾分鍾
  • seconds (int) – 間隔多少秒
  • start_date (datetime|str) – 開始日期
  • end_date (datetime|str) – 結束日期
  • timezone (datetime.tzinfo|str) – 時區
sched.add_job(job_function, 'interval', hours=2)

cron 觸發(某一定時時刻執行)

(int|str) 表示參數既可以是int類型,也可以是str類型
(datetime | str) 表示參數既可以是datetime類型,也可以是str類型

year=(int|str)       month (1-12) -(表示取值范圍為1-12月)
month=(int|str)      day of the (1-31) -(表示取值范圍為1-31日)
day=(int|str)
week=(int|str)
hour=(int|str)       hour (0-23) - (表示取值范圍為0-23時)
minute=(int|str)     minute (0-59) - (表示取值范圍為0-59分)
second=(int|str)     second (0-59) - (表示取值范圍為0-59秒)
start_date=(datetime|str)
end_date=(datetime|str)
timezone=(datetime,tzinfo)

字符串參數格式

字符形式 類型 描述
* 所有 通配符。例:minutes=* 即每分鍾觸發
*/a 所有 可被a整除的通配符。
a-b 所有 范圍a-b觸發
a-b/c 所有 范圍a-b,且可被c整除時觸發
x th y 第幾個星期幾觸發。x為第幾個,y為星期幾
last x 一個月中,最后個星期幾觸發
last 一個月最后一天觸發
x,y,z 所有 組合表達式,可以組合確定值或上方的表達式
# 6-8,11-12月第三個周五 00:00, 01:00, 02:00, 03:00運行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 每周一到周五運行 直到2024-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'

https://blog.csdn.net/abc_soul/article/details/88875643

作業存儲(job store)

作業存儲器的選擇有兩種:一是內存,也是默認的配置;二是數據庫。
具體選哪一種看我們的應用程序在崩潰時是否重啟整個應用程序,如果重啟整個應用程序,那么作業會被重新添加到調度器中,此時簡單的選取內存作為作業存儲器即簡單又高效。

提供了四種存儲方式:

  • MemoryJobStore 內存
  • sqlalchemy 支持sqlalchemy的數據庫如mysql,sqlite等
  • mongodb 存儲在mongodb
  • redis 存儲在redis

執行器(executor)

執行調度任務的模塊。最常用的 executor 有兩種:ProcessPoolExecutorThreadPoolExecutor

Event 事件

Event是APScheduler在進行某些操作時觸發相應的事件,用戶可以自定義一些函數來監聽這些事件,
當觸發某些Event時,做一些具體的操作

常見的比如。Job執行異常事件 EVENT_JOB_ERROR。Job執行時間錯過事件 EVENT_JOB_MISSED。

案例

每隔5秒鍾執行一次

import time
from apscheduler.schedulers.blocking import BlockingScheduler

def job():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))


if __name__ == '__main__':
    # 該示例代碼生成了一個BlockingScheduler調度器,使用了默認的任務存儲MemoryJobStore,以及默認的執行器ThreadPoolExecutor,並且最大線程數為10。
    
    # BlockingScheduler:在進程中運行單個任務,調度器是唯一運行的東西
    scheduler = BlockingScheduler()
    # 采用阻塞的方式

    # 采用固定時間間隔(interval)的方式,每隔5秒鍾執行一次
    scheduler.add_job(job, 'interval', seconds=5)
    
    scheduler.start()

在特定的時間執行一次

import time
from apscheduler.schedulers.blocking import BlockingScheduler

def job():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    
if __name__ == '__main__':
    # BlockingScheduler:在進程中運行單個任務,調度器是唯一運行的東西
    scheduler = BlockingScheduler()
    # 采用阻塞的方式
    
    # 采用date的方式,在特定時間只執行一次
    scheduler.add_job(job, 'date', run_date='2018-09-21 15:30:00')

    scheduler.start() 

每天間隔10分鍾執行一個任務

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
# 輸出時間
def job():
    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# BlockingScheduler

scheduler = BlockingScheduler()
scheduler.add_job(job, 'cron', day_of_week='*', hour='*', minute='10')
scheduler.start()	

定時調度(每一天上午10點半或者18點半執行任務)

from apscheduler.schedulers.blocking import BlockingScheduler

def everyday_job():
    print('Hello World!')
 
sched = BlockingScheduler()
#每隔一天 執行一次程序
# sched.add_job(everyday_job, 'interval', days=1)
#每天早上十點半和十八點半各執行一次程序
sched.add_job(everyday_job, 'cron', hour='10, 18', minute='30')
sched.start()

每隔2個整分鍾執行

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def everyday_job():
    print(datetime.datetime.now())
    print('Hello World!')


sched = BlockingScheduler()
# 每隔一天 執行一次程序
# sched.add_job(everyday_job, 'interval', days=1)
# 每天早上十點半和十八點半各執行一次程序
sched.add_job(everyday_job, 'cron', minute='*/2')
sched.start()

參考文獻:

https://zhuanlan.zhihu.com/p/95563033


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM