python中定時任務的實現有很多種方法
1、最簡單的方法:在一個死循環中每隔一定時間執行一次任務
2、threading.Timer模塊:
在規定的時間間隔后執行一次任務
from datetime import datetime from threading import Timer def job(): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) def jobTask(): Timer(5, job).start() # 間隔時間, 任務名稱 jobTask() ============== 5秒過后 2020-04-13 14:40:57
3、標准庫sched
scheduler
類來調度一次事件,從而達到定時執行任務的效果。
操作步驟:
1、構造一個sched.scheduler
類
它接受兩個參數:timefunc(當前時間)
和 delayfunc(暫停運行的時間單元), 一般使用默認參數就行,即
time.time
和 time.sleep。
2、添加調度任務
enter(delay, priority, action, argument=(), kwargs={})
delay: 延遲多少秒執行
priority: 數字越低優先級越高
action:執行函數
argument
和 kwargs
分別是函數的位置和關鍵字參數
scheduler.enterabs(time, priority, action, argument=(), kwargs={})
time:任務會在 time
這時刻執行, 其它參數含義同上
from datetime import datetime import sched import time def job(): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) def jobTask(): scheduler = sched.scheduler(time.time, time.sleep) # 初始化任務 scheduler.enter(2, 1, job) # 2秒后執行一次job任務 scheduler.run() jobTask()
4、高級python高度器Advanced Python Scheduler, 簡稱APScheduler。
是一個輕量級的 Python 定時任務調度框架。APScheduler 支持三種調度任務:
固定時間間隔
固定時間點(日期)
Linux 下的 Crontab 命令
。
同時還支持異步執行、后台執行調度任務。
安裝:
pip install APScheduler -i https://pypi.tuna.tsinghua.edu.cn/simple # windows
pip install apscheduler -i https://pypi.tuna.tsinghua.edu.cn/simple # linux
使用方法:
1、新建調度器
2、添加后台定時任務
3、啟動
簡單示例:
import datetime import time from apscheduler.schedulers.background import BackgroundScheduler # 將執行的任務 def job(): print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) scheduler = BackgroundScheduler() # 組件:新建調度器 scheduler.add_job(job, 'interval', seconds=2) # 添加后台任務, 每2秒執行一次 scheduler.start() # 開始任務 while True: print(time.time()) time.sleep(1) # 執行結果 1586764381.963936 1586764382.9640276 2020-04-13 07:53:03.963 1586764383.9644287 1586764384.9647956 2020-04-13 07:53:05.963 ……
調度器:
1、BlockingScheduler : 阻塞當前線程, 在當前進程的主線程中運行。
2、BackgroundScheduler : 不阻塞當前線程, 在后台線程運行。
3、AsyncIOScheduler : 結合 asyncio
模塊(一個異步框架)一起使用
4、GeventScheduler : 程序中使用 gevent
(高性能的Python並發框架)作為IO模型,和 GeventExecutor
配合使用
5、TornadoScheduler : 程序中使用 Tornado
(一個web框架)的IO模型,用 ioloop.add_timeout
完成定時喚醒。
6、TwistedScheduler : 配合 TwistedExecutor
,用 reactor.callLater
完成定時喚醒
7、QtScheduler : 應用是一個 Qt 應用,需使用QTimer完成定時喚醒。
2、觸發器
1、date觸發器
最基本的調度, 在特定的時間點觸發, 任務只會執行一次。
參數 | 說明 |
run_date(date,datetime或str) | 作業的運行日期或時間 |
timezone (datetime.tzinfo 或 str) | 指定時區 |
from datetime import datetime from datetime import date from apscheduler.schedulers.background import BackgroundScheduler import time # 被執行的函數 def job(text): print(text) # 初始化調度程序 scheduler = BackgroundScheduler() # 被執行的函數 date觸發器 執行時間(執行一次) 參數 scheduler.add_job(job, 'date', run_date='2020-04-14 11:36:00', args=['text']) scheduler.add_job(job, 'date', run_date=datetime(2020, 4, 14, 11, 37, 0), args=['text']) scheduler.start() while True: time.sleep(10) print(time.time())
2、interval觸發器
時間間隔觸發器, 參數如下
示例:
import datetime import time from apscheduler.schedulers.background import BackgroundScheduler # 被執行的任務 def job(): print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) # 初始化調度程序 scheduler = BackgroundScheduler() # 被執行函數 interval觸發器 間隔參數1分鍾 scheduler.add_job(job, 'interval', minutes=1) # 被執行函數 interval觸發器 間隔參數1分鍾 間隔參數開始時間 間隔參數結束時間 scheduler.add_job(job, 'interval',minutes=1, start_date='2020-04-14 12:00:10', end_date='2020-04-15 12:10:10') scheduler.start() while True: time.sleep(10) print("ok")
3、cron觸發器
功能最強大的觸發器, 在特定時間周期性觸發, 和Linux crontab格式兼容。
參數如下:
支持的運算格式
示例:
import datetime from apscheduler.schedulers.background import BackgroundScheduler # 被執行的任務 def job(): print("當前時間:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) # 初始化調度函數 scheduler = BackgroundScheduler() # 任務名稱 cron調度器 月份為1-3, 7-9 日為星期一星期二 小時為1:00, 2:00, 3:00 scheduler.add_job(job, 'cron', month='1-3, 7-9', day='0, tue', hour='0-3') scheduler.start()
作業存儲 job store
添加job:
1、add_job(): apscheduler.job.Job 實例, 可改變或移除job
2、scheduled_job()
修飾器來修飾函數: 不會改變的job
scheduled_job 示例:
import datetime import time from apscheduler.schedulers.background import BackgroundScheduler #初始化調度實例 scheduler = BackgroundScheduler() # 間隔觸發器 每1分鍾執行一次 @scheduler.scheduled_job('interval', minutes=1) def job(): print("當前時間:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) scheduler.start() while True: time.sleep(10) print(time.time()) 執行結果: [INFO][2020-04-14 14:48:40,293][base.py:440]Adding job tentatively -- it will be properly scheduled when the scheduler starts [INFO][2020-04-14 14:48:40,294][base.py:881]Added job "job" to job store "default" [INFO][2020-04-14 14:48:40,294][base.py:166]Scheduler started 1586846930.2962093 1586846940.297304 1586846950.2980711 1586846960.2989216 1586846970.2994986 [INFO][2020-04-14 14:49:40,298][base.py:123]Running job "job (trigger: interval[0:01:00], next run at: 2020-04-14 14:49:40 CST)" (scheduled at 2020-04-14 14:49:40.293767+08:00) 當前時間: 2020-04-14 06:49:40.299 1586846980.3026278 [INFO][2020-04-14 14:49:40,299][base.py:144]Job "job (trigger: interval[0:01:00], next run at: 2020-04-14 14:49:40 CST)" executed successfully 1586846990.3045704
移除job
remove_job(): 創建job時指定一個id, 根據此id刪除job
job.remove():對job執行remove方法
import datetime from apscheduler.schedulers.background import BackgroundScheduler # 初始化調度實例 scheduler = BackgroundScheduler() # 被執行的任務 def job(): print("當前時間:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) # 添加作業, 指定id scheduler.add_job(job,'interval', minutes=2, id='job_one') print("job1: ", scheduler.get_jobs()) # 根據id刪除job scheduler.remove_job('job_one') print("job2: ", scheduler.get_jobs()) # 添加作業, 返回對象 job = scheduler.add_job(job,'interval', minutes=2) print("job3: ", scheduler.get_jobs()) # 根據對象刪除job job.remove() print("job4: ", scheduler.get_jobs()) 執行結果: [INFO][2020-04-14 15:08:32,005][base.py:440]Adding job tentatively -- it will be properly scheduled when the scheduler starts job1: [<Job (id=job_one name=job)>] job2: [] [INFO][2020-04-14 15:08:32,006][base.py:627]Removed job job_one job3: [<Job (id=aaba0c4a60ab439492a5e0a4c4c6f879 name=job)>] job4: [] [INFO][2020-04-14 15:08:32,006][base.py:440]Adding job tentatively -- it will be properly scheduled when the scheduler starts [INFO][2020-04-14 15:08:32,007][base.py:627]Removed job aaba0c4a60ab439492a5e0a4c4c6f879
獲取job列表
通過 scheduler.get_jobs()
方法能夠獲取當前調度器中的所有 job 的列表
修改job
modify_job(): 根據job id來修改job
Job.modify()
: 根據job實例來修改job
示例:
import datetime from apscheduler.schedulers.background import BackgroundScheduler # 初始化調度實例 scheduler = BackgroundScheduler() # 被執行的任務 def job(): print("當前時間:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) # 添加作業, 指定id scheduler.add_job(job,'interval', minutes=2, id='job_one') # 根據id修改job scheduler.modify_job('job_one', minutes=5) # 添加作業, 返回對象 job = scheduler.add_job(job,'interval', minutes=2) # 根據對象修改job job.modify( minutes=2)
關閉job
scheduler.shutdown() scheduler.shutdown(wait=false)
執行器
ProcessPoolExecutor:
ThreadPoolExecutor: