Python任務調度模塊APScheduler


一、APScheduler 是什么&APScheduler四種組成部分?

APScheduler全程為Advanced Python Scheduler,是一款輕量級的Python任務調度框架。它允許你像Cron那樣安排定期執行的任務,並且支持Python函數或任意可調用的對象。

1、調度器(scheduler)

調度器(scheduler)是其他的組成部分。你通常在應用只有一個調度器,應用的開發者通常不會直接處理作業存儲、調度器和觸發器,相反,調度器提供了處理這些的合適的接口。配置作業存儲和執行器可以在調度器中完成,例如添加、修改和移除作業。 

對於不同的不場景,可以選擇的調度器:

  • BlockingScheduler : 當調度器是你應用中唯一要運行的東西時。
  • BackgroundScheduler : 當你沒有運行任何其他框架並希望調度器在你應用的后台執行時使用(常用)。
  • AsyncIOScheduler : 當你的程序使用了asyncio(一個異步框架)的時候使用。
  • GeventScheduler : 當你的程序使用了gevent(高性能的Python並發框架)的時候使用。
  • TornadoScheduler : 當你的程序基於Tornado(一個web框架)的時候使用。
  • TwistedScheduler : 當你的程序使用了Twisted(一個異步框架)的時候使用
  • QtScheduler : 如果你的應用是一個Qt應用的時候可以使用。
# BackgroundScheduler: 調度器在后台線程中運行,不會阻塞當前線程。


from apscheduler.schedulers.background import BackgroundScheduler
import time

scheduler = BackgroundScheduler()
 
def job1():
    print "%s: 執行任務"  % time.asctime()

scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()

while True:
    pass

2、作業存儲(job store)

作業存儲(job store)主要用來存儲被調度的作業,默認的作業存儲是簡單地把作業保存在內存中。

jobstore提供對scheduler中job的增刪改查接口,根據存儲方式的不同,分以下幾種:

  • MemoryJobStore:沒有序列化,jobs就存在內存里,增刪改查也都是在內存中操作
  • SQLAlchemyJobStore:所有sqlalchemy支持的數據庫都可以做為backend,增刪改查操作轉化為對應backend的sql語句
  • MongoDBJobStore:用mongodb作backend
  • RedisJobStore: 用redis作backend
  • RethinkDBJobStore: 用rethinkdb 作backend
  • ZooKeeperJobStore:用ZooKeeper做backend

3、執行器(executor)

執行器(executor)主要處理任務的運行,主要是把定時任務中的可調用對象(function)提交給一個一個線程或者進程來進行。當任務完成時,執行器將會通知調度器。

最常用的 執行器(executor) 有兩種:

  • ProcessPoolExecutor(進程池)
  • ThreadPoolExecutor(線程池,max:10)

4、觸發器(triggers)

當調度一個任務時,需要為它設置一個觸發器(triggers)。觸發器決定在什么日期/時間、用什么樣的形式來執行執行這個定時任務。

APScheduler 有三種內建的 trigger:

  • date: 指定某個確定的時間點,job僅執行一次。
  • interval: 指定時間間隔(fixed intervals)周期性執行。
  • cron: 使用cron風格表達式周期性執行,用於(在指定時間內)定期運行job的場景。使用同linux下crontab的方式。

4.1、date 定時調度(作業只會執行一次)

參數如下:

  • run_date (datetime|str) – 作業的運行日期或時間
  • timezone (datetime.tzinfo|str) – 指定時區
<!--# 2016-12-12運行一次job_function-->
sched.add_job(job_function, 'date', run_date=date(2016, 12, 12), args=['text'])

<!--# 2016-12-12 12:00:00運行一次job_function-->
<!--args=[]中是傳給job_function的參數-->
sched.add_job(job_function, 'date', run_date=datetime(2016, 12, 12, 12, 0, 0), args=['text'])

4.2、interval: 每隔一段時間執行一次

weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None

  • weeks (int) – 間隔幾周
  • days (int) – 間隔幾天
  • hours (int) – 間隔幾小時
  • minutes (int) – 間隔幾分鍾
  • seconds (int) – 間隔多少秒
  • start_date (datetime|str) – 開始日期
  • end_date (datetime|str) – 結束日期
  • timezone (datetime.tzinfo|str) – 時區
<!--每隔2小時執行一次-->
scheduler.add_job(my_job, 'interval', hours=2)


<!--設置時間范圍,在設置的時間范圍內每隔2小時執行一次-->
scheduler.add_job(my_job, 'interval', hours=2, start_date='2017-9-8 21:30:00', end_date='2018-06-15 21:30:00)


<!--使用裝飾器的方式添加定時任務-->
@scheduler.scheduled_job('interval', id='my_job_id', hours=2)
def my_job():
    print("Hello World")

4.3、cron: 使用同linux下crontab的方式

(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)

除了week和 day_of_week,它們的默認值是 *

  • 例如 day=1, minute=20 ,這就等於
year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0

工作將在每個月的第一天以每小時20分鍾的時間執行

表達式 參數類型 描述
* 所有 通配符。例: minutes=* 即每分鍾觸發
*/a 所有 可被a整除的通配符。
a-b 所有 范圍a-b觸發
a-b/c 所有 范圍a-b,且可被c整除時觸發
xth y 第幾個星期幾觸發。x為第幾個,y為星期幾
last x 一個月中,最后個星期幾觸發
last 一個月最后一天觸發
x,y,z 所有 組合表達式,可以組合確定值或上方的表達式
  • year (int|str) – 年,4位數字
  • month (int|str) – 月 (范圍1-12)
  • day (int|str) – 日 (范圍1-31)
  • week (int|str) – 周 (范圍1-53)
  • day_of_week (int|str) – 周內第幾天或者星期幾 (范圍0-6 或者 mon,tue,wed,thu,fri,sat,sun)
  • hour (int|str) – 時 (范圍0-23)
  • minute (int|str) – 分 (范圍0-59)
  • second (int|str) – 秒 (范圍0-59)
  • start_date (datetime|str) – 最早開始日期(包含)
  • end_date (datetime|str) – 最晚結束時間(包含)
  • timezone (datetime.tzinfo|str) – 指定時區
sched.add_job(my_job, 'cron', hour=3, minute=30)
sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2017-10-30')

@sched.scheduled_job('cron', id='my_job_id', day='last sun')
def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")
    

二、 How:APSched 怎么用?

安裝

  • pip 安裝
pip install apscheduler
python setup.py install

快速上手

# first.py

from apscheduler.schedulers.blocking import BlockingScheduler
import time

# 實例化一個調度器
scheduler = BlockingScheduler()
 
def job1():
    print "%s: 執行任務"  % time.asctime()

# 添加任務並設置觸發方式為3s一次
scheduler.add_job(job1, 'interval', seconds=3)

# 開始運行調度器
scheduler.start()
  • 執行輸出:
> python first.py

Fri Sep  8 20:41:55 2017: 執行任務
Fri Sep  8 20:41:58 2017: 執行任務
...

任務操作

1、添加任務

方法一:調用add_job()方法

調用add_job()方法返回一個apscheduler.job.Job 的實例,可以用來改變或者移除 job

job = scheduler.add_job(myfunc, 'interval', minutes=2)

方法二:使用裝飾器scheduled_job()

適用於應用運行期間不會改變的 job

from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
# 裝飾器
@sched.scheduled_job('interval', id='my_job_id', seconds=5)
def job_function():
    print("Hello World")
# 開始
sched.start()

2、刪除任務

 <!--方法一:通過作業ID或別名調用remove_job()刪除作業-->
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')


<!-- 方法二:通過add_job()返回的job實例調用remove()方法刪除作業-->
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

3、暫停&繼續任務

可以通過Job實例或調度程序本身輕松暫停和恢復作業。 當作業暫停時,下一個運行時間將被清除,直到作業恢復,不會再計算運行時間。 要暫停作業,請使用以下任一方法:

<!--根據任務實例-->
job = scheduler.add_job(myfunc, 'interval', minutes=2)
<!--暫停-->
job.pause() 
<!--繼續-->
job.resume() 


 <!--根據任務id暫停-->
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.pause_job('my_job_id')
scheduler.resume_job('my_job_id')

4、修改任務屬性

<!--修飾:-->
job.modify(max_instances=6, name='Alternate name')


<!--重設:-->
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

5、獲得job列表

  • 使用get_jobs()來獲取所有的job實例。
  • 使用print_jobs()來輸出所有格式化的作業列表。
  • 使用get_job(任務ID)獲取指定任務的作業列表。
<!--獲取所有的job實例-->
apscheduler.get_jobs()

6、開始&關閉任務


scheduler.start() #開啟
scheduler.shotdown(wait=True|False) #關閉 False 無論任務是否執行,強制關閉

三、一些定時任務腳本

1、定時任務運行腳本每日凌晨00:30:30執行

import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from app.untils.log_builder import sys_logging

scheduler = BlockingScheduler()   # 后台運行

 # 設置為每日凌晨00:30:30時執行一次調度程序
@scheduler.scheduled_job("cron", day_of_week='*', hour='1', minute='30', second='30')
def rebate():
        print "schedule execute"
        sys_logging.debug("statistic scheduler execute success" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))


if __name__ == '__main__':
    try:
        scheduler.start()
        sys_logging.debug("statistic scheduler start success")
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()
        sys_logging.debug("statistic scheduler start-up fail")

2、每天晚上0點 - 早上8點期間,每5秒執行一次任務。

# 每天晚上0點 - 早上8點期間,每5秒執行一次任務。
scheduler.add_job(my_job, 'cron',day_of_week='*',hour = '0-8',second = '*/5')

3、在0、10、20、30、40、50分時執行任務。

<!--# 可以被10整除的時間點執行任務,這個任務就表示在0、10、20、30、40、50分時都會執行任務-->
scheduler.add_job(my_job, 'cron',day_of_week='*',minute = '*/10')

4、直到2020-05-30,每周從周一到周五的早上5:30都執行一次定時任務

<!--直到2020-05-30 00:00:00,每周星期從星期一到星期五的早上5:30都執行一次定時任務-->
sched.add_job(my_job(),'cron', day_of_week='mon-fri', hour=5, minute=30,end_date='2020-05-30')


<!--# 截止到2016-12-30 00:00:00,每周一到周五早上五點半運行job_function-->
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2016-12-31')

5、在6,7,8,11,12月的第3個周五的1,2,3點執行定時任務

<!--job_function將會在6,7,8,11,12月的第3個周五的1,2,3點運行-->

sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

6、每5秒執行該程序一次

<!--#表示每5秒執行該程序一次,相當於interval 間隔調度中seconds = 5-->
sched.add_job(my_job, 'cron',second = '*/5')

參考資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM