python定時任務框架APSchedule


一直想做一個分布式的調度系統,但不知道如何實現類似crontab那樣的定時調度,發現有現成的輪子可用,那就是——APSchedule

它有三種調度方式,分別是

date 日期:觸發任務運行的具體日期

interval 間隔:觸發任務運行的時間間隔

cron 周期:觸發任務運行的周期

下面以案例的方式,給大家做個簡單的介紹

date.py

from datetime import datetime

from apscheduler.schedulers.blocking import BlockingScheduler

#任務會在指定的固定時刻啟動
def job(text):
    now = datetime.now()
    print(now.strftime('%Y-%m-%d %H:%M:%S'))
    print(text)


scheduler = BlockingScheduler()
scheduler.add_job(job, 'date', run_date=date(2020, 9, 4), args=['text1'])
scheduler.add_job(job, 'date', run_date=datetime(2020, 9, 4, 11, 4, 0), args=['text2'])
scheduler.start()

 

 

interval.py

from datetime import datetime

from apscheduler.schedulers.blocking import BlockingScheduler


# 任務會每間隔一段時間執行一次,也可以設置開始和結束的時間
def job(text):
    now = datetime.now()
    print(now.strftime('%Y-%m-%d %H:%M:%S'))
    print(text)


scheduler = BlockingScheduler()
scheduler.add_job(job, 'interval', minutes=1, args=['text1'])
scheduler.add_job(job, 'interval', seconds=30, start_date='2020-09-04 11:02:01', end_date='2020-09-04 11:05:04',
                  args=['text1'])
scheduler.start()

 

 

cron.py

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

# 最接近於Linux系統的crontab
# 以裝飾器的方式運行
@scheduler.scheduled_job('cron', second='*/7', args=['job'])
def job(text):
    now = datetime.now()
    print(now.strftime('%Y-%m-%d %H:%M:%S'))
    print(text)


# 調用add_job運行
# scheduler.add_job(job, 'cron', hour=11, minute='*/1', args=['job1'])
scheduler.start()

 

 

 

添加任務

添加任務的方法有兩種:

  1. 通過調用add_job()
  2. 通過裝飾器scheduled_job()

第一種方法是最常用的方法。第二種方法主要是方便地聲明在應用程序運行時不會更改的任務。該 add_job()方法返回一個apscheduler.job.Job實例,可以使用該實例稍后修改或刪除該任務。

 

參考:

https://www.cnblogs.com/BlueSkyyj/p/11678665.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM