APScheduler的使用


定時框架APScheduler

APSScheduler是python的一個定時任務框架,它提供了基於日期date、固定時間間隔interval、以及linux上的crontab類型的定時任務。該框架不僅可以添加、刪除定時任務,還可以將任務存儲到數據庫中、實現任務的持久化。

APScheduler有四種組件

  • triggers(觸發器):觸發器包含調度邏輯,每一個作業有它自己的觸發器,用於決定接下來哪一個作業會運行,除了他們自己初始化配置外,觸發器完全是無狀態的。

  • job stores(作業存儲):用來存儲被調度的作業,默認的作業存儲器是簡單地把作業任務保存在內存中,其它作業存儲器可以將任務作業保存到各種數據庫中,支持MongoDB、Redis、SQLAlchemy存儲方式。當對作業任務進行持久化存儲的時候,作業的數據將被序列化,重新讀取作業時在反序列化。

  • executors(執行器):執行器用來執行定時任務,只是將需要執行的任務放在新的線程或者線程池中運行。當作業任務完成時,執行器將會通知調度器。對於執行器,默認情況下選擇ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任務如比較消耗CPU的任務則可以選擇ProcessPoolExecutor,當然根據根據實際需求可以同時使用兩種執行器。

  • schedulers(調度器):調度器是將其它部分聯系在一起,一般在應用程序中只有一個調度器,應用開發者不會直接操作觸發器、任務存儲以及執行器,相反調度器提供了處理的接口。通過調度器完成任務的存儲以及執行器的配置操作,如可以添加。修改、移除任務作業

APScheduler提供了七種調度器

  • BlockingScheduler:適合於只在進程中運行單個任務的情況,通常在調度器是你唯一要運行的東西時使用。
  • BackgroundScheduler: 適合於要求任何在程序后台運行的情況,當希望調度器在應用后台執行時使用。
  • AsyncIOScheduler:適合於使用asyncio異步框架的情況
  • GeventScheduler: 適合於使用gevent框架的情況
  • TornadoScheduler: 適合於使用Tornado框架的應用
  • TwistedScheduler: 適合使用Twisted框架的應用
  • QtScheduler: 適合使用QT的情況

APScheduler提供了四種存儲方式

  • MemoryJobStore
  • sqlalchemy
  • mongodb
  • redis

APScheduler提供了三種任務觸發器

  • data:固定日期觸發器:任務只運行一次,運行完畢自動清除;若錯過指定運行時間,任務不會被創建
  • interval:時間間隔觸發器
  • cron:cron風格的任務觸發

示例

示例1 BlockingScheduler

  • BlockingScheduler:在進程中運行單個任務,調度器是唯一運行的東西
  • 該示例代碼生成了一個BlockingScheduler調度器,使用了默認的任務存儲MemoryJobStore,以及默認的執行器ThreadPoolExecutor,並且最大線程數為10。

定義job

def jod1():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ' #####job1 ' + str(random.randint(0, 10)))

定義run方法

def run1():
    scheduler = BlockingScheduler()
    scheduler.add_job(jod1, 'interval', seconds=5)
    scheduler.start()

完整代碼

import time
import random
from apscheduler.schedulers.blocking import BlockingScheduler


def jod1():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ' #####job1 ' + str(random.randint(0, 10)))


def run1():
    scheduler = BlockingScheduler()
    scheduler.add_job(jod1, 'interval', seconds=5)
    scheduler.start()


if __name__ == '__main__':
    run1()

運行結果

2019-05-16 11:09:05 #####job1 2
2019-05-16 11:09:10 #####job1 5
2019-05-16 11:09:15 #####job1 1
2019-05-16 11:09:20 #####job1 0
2019-05-16 11:09:25 #####job1 8
2019-05-16 11:09:30 #####job1 0
2019-05-16 11:09:35 #####job1 9
2019-05-16 11:09:40 #####job1 1
2019-05-16 11:09:45 #####job1 4
......

測試1

我們在示例1的基礎上再加一個job2,看看什么情況

job2示例代碼

def jod2():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ' #####job2 ' + str(random.randint(0, 10)))

run2示例代碼

def run2():
    scheduler = BlockingScheduler()
    scheduler.add_job(jod1, 'interval', seconds=5)
    scheduler.add_job(jod2, 'interval', seconds=5)
    scheduler.start()

實行結果:

2019-05-16 11:10:46 #####job2 9
2019-05-16 11:10:46 #####job1 2
2019-05-16 11:10:51 #####job2 3
2019-05-16 11:10:51 #####job1 2
2019-05-16 11:10:56 #####job2 9
2019-05-16 11:10:56 #####job1 1
2019-05-16 11:11:01 #####job2 9
2019-05-16 11:11:01 #####job1 1
2019-05-16 11:11:06 #####job2 2
2019-05-16 11:11:06 #####job1 5
2019-05-16 11:11:11 #####job2 6
2019-05-16 11:11:11 #####job1 7
2019-05-16 11:11:16 #####job2 1
2019-05-16 11:11:16 #####job1 6
......

從執行結果來看,在同一個調度器中添加2個job,這兩個job會在同一時刻同時執行

測試2

我們將job的運行時間強制改為6秒,看看調度器會怎么處理

修改后的job1

def jod1():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ' #####job1 ' + str(random.randint(0, 10)))
    time.sleep(6)

run1 不變,運行結果

2019-05-16 11:18:11 #####job1 1
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 11:18:16 CST)" skipped: maximum number of running instances reached (1)
2019-05-16 11:18:21 #####job1 9
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 11:18:26 CST)" skipped: maximum number of running instances reached (1)
2019-05-16 11:18:31 #####job1 5
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 11:18:36 CST)" skipped: maximum number of running instances reached (1)
2019-05-16 11:18:41 #####job1 0
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 11:18:46 CST)" skipped: maximum number of running instances reached (1)

從執行結果來看,由於job的運行時間超過了執行器需要執行時的時間,所以本次任務跳過,就變成了10秒執行一次的任務了。

示例2 BackgroundScheduler

完整示例:

import time
import random
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler


def jod1():
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + ' #####job1 ' + str(random.randint(0, 10)))
    time.sleep(6)
    
def run3():
    scheduler = BackgroundScheduler()
    scheduler.add_job(jod1, 'interval', seconds=5)
    scheduler.start()

    while True:
        print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(2)
        print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        

if __name__ == '__main__':
    # run1()
    # run2()
    run3()

測試1

開啟一個任務,然后另外開啟一個線程

def run3():
    scheduler = BackgroundScheduler()
    scheduler.add_job(jod1, 'interval', seconds=5)
    scheduler.start()

    while True:
        print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(2)
        print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

運行結果

main-start: 2019-05-16 18:42:13
main-end: 2019-05-16 18:42:15
main-start: 2019-05-16 18:42:15
main-end: 2019-05-16 18:42:17
main-start: 2019-05-16 18:42:17
2019-05-16 18:42:18 #####job1 6
main-end: 2019-05-16 18:42:19
main-start: 2019-05-16 18:42:19
main-end: 2019-05-16 18:42:21
main-start: 2019-05-16 18:42:21
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 18:42:23 CST)" skipped: maximum number of running instances reached (1)
main-end: 2019-05-16 18:42:23
main-start: 2019-05-16 18:42:23
main-end: 2019-05-16 18:42:25
main-start: 2019-05-16 18:42:25
main-end: 2019-05-16 18:42:27
main-start: 2019-05-16 18:42:27
2019-05-16 18:42:28 #####job1 5
main-end: 2019-05-16 18:42:29
main-start: 2019-05-16 18:42:29
main-end: 2019-05-16 18:42:31
main-start: 2019-05-16 18:42:31
Execution of job "jod1 (trigger: interval[0:00:05], next run at: 2019-05-16 18:42:33 CST)" skipped: maximum number of running instances reached (1)
main-end: 2019-05-16 18:42:33
main-start: 2019-05-16 18:42:33
main-end: 2019-05-16 18:42:35
main-start: 2019-05-16 18:42:35
main-end: 2019-05-16 18:42:37
main-start: 2019-05-16 18:42:37
2019-05-16 18:42:38 #####job1 1
......

示例3 采用cron的方式

采用cron的方式來調度任務

def run4():
    scheduler = BackgroundScheduler()
    scheduler.add_job(jod1, 'cron', day_of_week='fri', second='*/5')
    scheduler.start()

    while True:
        print('main-start:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        time.sleep(2)
        print('main-end:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM