Flask-APScheduler 是Flask框架的一個擴展庫,增加了Flask對apScheduler的支持,可以用作特定於平台的調度程序(如cron守護程序或Windows任務調度程序)的跨平台。
APScheduler有三個可以使用的內置調度系統:
- Cron式調度(可選的開始/結束時間)
- 基於區間的執行(偶數間隔運行作業,可選的開始/結束時間)
- 一次性延遲執行(在設定的日期/時間運行一次作業)
APScheduler
是一個python的第三方庫,用來提供python的后台程序。包含四個組件,分別是:
- triggers: 任務觸發器組件,提供任務觸發方式
- job stores: 任務商店組件,提供任務保存方式
- executors: 任務調度組件,提供任務調度方式
- schedulers: 任務調度組件,提供任務工作方式
支持多種存儲空間
- RAM
- 基於文件的簡單數據庫
- SQLAlchem
- MongoDB
- Redis
簡單使用
appblueprints/task.py
import uuid import utils from flask_apscheduler import APScheduler from flask import Blueprint, jsonify, request Taskapi = Blueprint("task", __name__, url_prefix="/task") Scheduler = None taskdict = {} def init(): global Scheduler Scheduler = APScheduler() return Scheduler # 暫停任務 # http://127.0.0.1:5000/task/pause?id=2 @Taskapi.route('/pause', methods=['GET']) def pause_job(): job_id = request.args.get('id') Scheduler.pause_job(str(job_id)) response = {} response["msg"] = "success" return jsonify(response) # 恢復任務 # http://127.0.0.1:5000/task/resume?id=2 @Taskapi.route('/resume', methods=['GET']) def resume_job(): job_id = request.args.get('id') Scheduler.resume_job(str(job_id)) response = {} response["msg"] = "success" return jsonify(response) # 獲取任務 # http://127.0.0.1:5000/task/getjobs @Taskapi.route('/getjobs', methods=['GET']) def get_task(): # jobs = Scheduler.get_jobs() # print(str(pickle.dumps(jobs))) return jsonify(taskdict) # 移除任務 @Taskapi.route('/removejob', methods=['GET']) def remove_job(): job_id = request.args.get('id') Scheduler.remove_job(str(job_id)) response = {} response["msg"] = "success" return jsonify(response) # 添加任務 # http://cab912dac880.ngrok.io/task/addjob?tasktype=interval&minute=10&psm=caijing.charge.union_service&tag=prod&env=product&chat_id=6911623998451269634 @Taskapi.route('/addjob', methods=['GET']) def add_task(): global taskdict psm = request.args.get('psm', "cmp.ecom.settle") tag = request.args.get('tag', "prod") env = request.args.get('env', "boe") chat_id = request.args.get('chat_id', "6911623998451269634") tasktype = request.args.get('tasktype', "interval") minute = request.args.get('minute', "10") minute = float(minute) response = {} response["msg"] = "success" seconds = minute * 60 # trigger='cron' 表示是一個定時任務 # if tasktype == 'corn': # id = str(uuid.uuid4()) # response["taskid"] = id # Scheduler.add_job(func=test, id='1', args=(1, 1), trigger='cron', day_of_week='0-6', hour=18, minute=24, # second=10, replace_existing=True) # trigger='interval' 表示是一個循環任務,每隔多久執行一次 if tasktype == "interval": id = str(uuid.uuid4()) response["taskid"] = id response["data"] = [psm,env,tag,str(seconds)+"秒"] taskdict[id] = response["data"] Scheduler.add_job(func=utils.start, id=id, args=(psm,env,tag,seconds,id,minute,chat_id), trigger='interval', seconds=seconds, replace_existing=True) else: response["id"] = "" response["msg"] = "tasktype 類型不存在" return jsonify(response)
main.py
from flask import Flask from appblueprints import task # 創建app app = Flask(__name__)# 注冊藍圖 app.register_blueprint(task.Taskapi) if __name__ == '__main__': scheduler = task.init() scheduler.init_app(app=app) scheduler.start() app.run(host="0.0.0.0", port=6000)
triggers: 支持三種任務觸發方式
-
date:固定日期觸發器,任務只運行一次,運行完畢自動清除;若錯過指定運行時間,任務不會被創建
| 參數 | 說明 |
| :——————————– | :——————- |
| run_date (datetime 或 str) | 作業的運行日期或時間 |
| timezone (datetime.tzinfo 或 str) | 指定時區 |1
2例如# 在 2019-4-24 00:00:01 時刻運行一次 start_system 方法
scheduler .add_job(start_system, 'date', run_date='2019-4-24 00:00:01', args=['text'])
-
interval:時間間隔觸發器,每個一定時間間隔執行一次。
| 參數 | 說明 |
| —————————- | ———- |
| weeks (int) | 間隔幾周 |
| days (int) | 間隔幾天 |
| hours (int) | 間隔幾小時 |
| minutes (int) | 間隔幾分鍾 |
| seconds (int) | 間隔多少秒 |
| start_date (datetime 或 str) | 開始日期 |
| end_date (datetime 或 str) | 結束日期 |1
2# 在 2019-4-24 00:00:00 - 2019-4-24 08:00:00 之間, 每隔兩小時執行一次 alarm_job 方法
scheduler .add_job(alarm_job, 'interval', hours=2, start_date='2019-4-24 00:00:00' , end_date='2019-4-24 08:00:00')
- cron:cron風格的任務觸發
參數 | 說明 | |
---|---|---|
year (int 或 str) | 表示四位數的年份 (2019) | |
month(int\ | str) | 月 (范圍1-12) |
day(int\ | str) | 日 (范圍1-31) |
week(int\ | str) | 周 (范圍1-53) |
day_of_week (int\ | str) | 表示一周中的第幾天,既可以用0-6表示也可以用其英語縮寫表示 |
hour (int\ | str) | 表示取值范圍為0-23時 |
minute (int\ | str) | 表示取值范圍為0-59分 |
second (int\ | str) | 表示取值范圍為0-59秒 |
start_date (datetime\ | str) | 表示開始時間 |
end_date (datetime\ | str) | 表示結束時間 |
timezone (datetime.tzinfo\ | str) | 表示時區取值 |
(
int
|str
) 表示參數既可以是int
類型,也可以是str
類型
(datetime |str
) 表示參數既可以是datetime類型,也可以是str
類型
例如:表示每5秒執行該程序一次,相當於interval 間隔調度中seconds = 5
1 |
sched.add_job(my_job, 'cron',second = '*/5') |
job stores: 支持四種任務存儲方式
- memory:默認配置任務存在內存中
- mongdb:支持文檔數據庫存儲
- sqlalchemy:支持關系數據庫存儲
- redis:支持鍵值對數據庫存儲
schedulers: 調度器主要分三種,一種獨立運行的,一種是后台運行的,最后一種是配合其它程序使用
- BlockingScheduler: 當這個調度器是你應用中
唯一要運行
的東西時使用 - BackgroundScheduler: 當
不運行其它框架
的時候使用,並使你的任務在后台運行
- AsyncIOScheduler: 當你的程序是
異步IO模型
的時候使用 - GeventScheduler: 和
gevent
框架配套使用 - TornadoScheduler: 和
tornado
框架配套使用 - TwistedScheduler: 和
Twisted
框架配套使用 - QtScheduler: 開發
qt
應用的時候使用