Python+flask+flask-apscheduer實現定時下發任務
背景:
使用python+flask+mamaca實現的自動化用例管理平台,可以下發任務到具體的節點,進行執行測試用例,沒有定時執行的功能,使用flask-apscheduler加上定時下發任務
一、Flask-apscheduler的基本內容介紹和基本操作
安裝:
Pip install flask-APScheduler 或者pycharm如下截圖,點擊【+】輸入名稱查找,install package
二、簡單的定時任務的例子
網上找了一個簡單的例子,先來試試定時任務能不能根據配置啟動起來
采用配置文件的方式,進行加載flask項目
定義一個config類,里面配置jobs的基本信息,
例子:定時器為cron,在指定的12月,1號,17小時,45分,20s,執行方法task
from flask_apscheduler import APScheduler from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.triggers.cron import CronTrigger import datetime from apscheduler.jobstores.mongodb import MongoDBJobStore aps = APScheduler(scheduler=BackgroundScheduler(timezone='Asia/Shanghai')) # 指定時區 class APScheduler(object): JOBS = [ { 'id': 'job31', 'func': 'APSchedulerJob:task', # 執行的方法,這個可以寫死,多層模塊之間用[.],文件模塊和方法要用[:]分割,否則類型會報錯Invalid reference # 'func': 'task', # 執行的方法,這個可以寫死 'args': (1, 2), # 參數可以不傳 'trigger': 'cron', 'month': '12', 'day': '01', # 具體到某一天的時間,這個取day傳進來 'hour': '17', # 取時間傳進來 'minute': '45', # 取具體的分鍾傳進來 'second': '20' # 具體的那一秒 } ] # 調度器開關 SCHEDULER_API_ENABLED = True # 通過數據庫mongodb存放job SCHEDULER_JOBSTORES = { 'default': MongoDBJobStore(host="127.0.0.1", port=27017, database="apscheduler", collection="jobs")} # 設置時區 SCHEDULER_TIMEZONE = 'Asia/Shanghai'
配置信息配置完成后,即可開始獲取配置信息,和添加定時任務
在app.config.from_object(APScheduler()) 獲取配置信息后,獲取到jobs的信息,會調起添加定時任務,檢查scheduler的狀態為開啟,即可添加定時任務,代碼執行到aps.start()時,代表scheduler開啟,正在等待接收定時任務,並在指定的時間執行
另外一個添加定時任務的方法,直接通過aps.add_job(添加對應的一些配置信息)
根據()括號內給定的參數,進行執行定時任務
時區問題,啟動的時候加載local報錯,類似timezone的錯,可能是時區的問題,設置一下時區
三、定時基本組成部分的介紹
基本的例子看完之后,可以從例子里面看到三個基本部分,第一個是使用的調度器scheduler、trigger、作業存儲job store,還有一個執行器,例子沒有加入,程序使用默認的執行器,下面可以繼續看看這些組成部分的詳細內容
APScheduler有四種組成部分
調度器(scheduler)是其他的組成部分。你通常在應用只有一個調度器,應用的開發者通常不會直接處理作業存儲、調度器和觸發器,相反,調度器提供了處理這些的合適的接口。配置作業存儲和執行器可以在調度器中完成,例如添加、修改和移除作業。Apscheduler提供的調度器有7種:
BlockingScheduler和BackgroundScheduler。
BlockingScheduler : 調度器在當前進程的主線程中運行,也就是會阻塞當前線程。
BackgroundScheduler : 調度器在后台線程中運行,不會阻塞當前線程。
AsyncIOScheduler : 結合 asyncio 模塊(一個異步框架)一起使用。
GeventScheduler : 程序中使用 gevent(高性能的Python並發框架)作為IO模型,和 GeventExecutor 配合使用。
TornadoScheduler : 程序中使用 Tornado(一個web框架)的IO模型,用 ioloop.add_timeout 完成定時喚醒。
TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定時喚醒。
QtScheduler : 你的應用是一個 Qt 應用,需使用QTimer完成定時喚醒。
觸發器(trigger)包含調度邏輯,每一個作業有它自己的觸發器,用於決定接下來哪一個作業會運行。除了他們自己初始配置意外,觸發器完全是無狀態的。
Date:是最基本的一種調度,任務只會在指定日期時間執行一次
interval觸發器:根據設置的時間間隔,每間隔執行一次
crontab觸發器:在特定時間周期性地觸發,如每天,周循環等場景
作業存儲(job store)存儲被調度的作業,默認的作業存儲是簡單地把作業保存在內存中,其他的作業存儲是將作業保存在數據庫中。一個作業的數據講在保存在持久化作業存儲時被序列化,並在加載時被反序列化。調度器不能分享同一個作業存儲。如果任務不考慮持久執行,可以不加入任務存儲器,如果考慮到任務可能需要重跑等,則可以加上任務存儲器,這個配置mongodb,配置對應的數據的地址,端口,數據庫以及具體的collections
SCHEDULER_JOBSTORES = {
'default': MongoDBJobStore(host="127.0.0.1", port=27017, database="apscheduler", collection="jobs")}
執行器(executor)處理作業的運行,他們通常通過在作業中提交制定的可調用對象到一個線程或者進城池來進行。當作業完成時,執行器將會通知調度器。
ThreadPoolExecutor: 線程池執行器。
ProcessPoolExecutor: 進程池執行器。
GeventExecutor: Gevent程序執行器。
TornadoExecutor: Tornado程序執行器。
TwistedExecutor: Twisted程序執行器。
AsyncIOExecutor: asyncio程序執行器。
觸發器(作業運行的控制)的例子
date觸發器:是最基本的一種調度,任務只會在指定日期時間執行一次。
參數說明:
run_date:任務的運行日期或時間 (datetime 或 str)
timezone:指定時區(datetime.tzinfo 或 str)
# 在2020年5月22日執行一次 scheduler.add_job(func=func, trigger="date", run_date=date(2020, 5, 22), timezone="Asia/Shanghai") # 在2020年8月13日 14:00:01執行一次 scheduler.add_job(func=func, trigger="date", run_date='2020-08-13 14:00:01')
2.interval觸發器:根據設置的時間間隔,每間隔執行一次。間隔開始計算時間,為任務創建時間
參數說明
weeks:間隔幾周。int
days:間隔天數。int。
hours:間隔小時數。int。
minutes:間隔分鍾數。int。
seconds:間隔秒數。int。
start_date:間隔觸發的起始時間。(datetime 或 str)
end_date:間隔觸發的結束時間。(datetime 或 str)
timezone:指定時區。(datetime 或 str)
# 每5秒執行一次func()函數 scheduler.add_job(func=func, trigger="interval", seconds=5) # 在8月13~20日之間,每天執行一次 scheduler.add_job(func=func, trigger="interval", days=1,start_date='2020-08-13 14:00:01', end_date='2020-08-20 14:00:01')
3. crontab觸發器:在特定時間周期性地觸發,如每天,周循環等場景,它是功能最強大的觸發器。
參數范圍只適用於int類型,str類型有無限可能,自己踩坑去吧
year: 年,4位數字。int 或 str
month: 月 (范圍1-12)。int 或 str
day: 日 (范圍1-31)。int 或 str
week:周 (范圍1-53)。int 或 str
day_of_week: 周內第幾天或者星期幾 (范圍0-6,0是周一,6是周天 或者 mon,tue,wed,thu,fri,sat,sun)。int 或 str
hour: 時 (范圍0-23)。(int 或 str)
minute: 分 (范圍0-59)。(int 或 str)
second: 秒 (范圍0-59)。(int 或 str)
start_date: 最早開始日期(包含)。(datetime 或 str)
end_date: 最晚結束時間(包含)。(datetime 或 str)
timezone: 指定時區。(datetime 或 str)
# 每天23點定時執行 scheduler.add_job(func=func, trigger="cron", day_of_week="0-6", hour=23 ) # 在每年 1、2、3、7、8、9 月份中的每月第4天和星期日中的 00:00, 01:00, 02:00 和 03:00 執行 func 任務 scheduler.add_job(func=func,trigger="cron",month="1-3,7-9",day="4th sun", hour="0-3") # 每周一早晨9點30分執行func任務 scheduler.add_job(func=func, trigger="cron", day_of_week=0, hour=9, minute=30) # 間隔5分鍾執行一次,與interval觸發器使用功能相同 scheduler.add_job(func=func, trigger="cron", minute="*/5" )
原文鏈接:https://www.jianshu.com/p/4c5bc85fc3fd
原文鏈接:https://blog.csdn.net/weixin_39241397/article/details/82746096
四、定時任務配置參數config
l id:可以隨便起;
l func:要執行的方法,這個可以寫死,多層模塊之間用【.】,文件模塊和方法要用【:】分割,【:】前配置的模塊不對的,后面添加jobs的時候,會無法導入module,這個時候要檢查模塊的導入地址,【:】后的為具體指定的方法,這個方法要在前面導入模塊的基礎上可以找的到,否則類型會報錯Invalid reference,即沒辦法在當前模塊下,找到這個方法
l args:參數,若func方法需要參數,可使用args傳入
l trigger:觸發器,date cron interval
l month、day、hour、minute、second:配置定時器執行的時間
l SCHEDULER_API_ENABLED:調度器開關,默認開啟
l SCHEDULER_JOBSTORES:jobs任務存儲器配置
l SCHEDULER_TIMEZONE:設置時區
l SCHEDULER_EXECUTORS:設置執行器,包括執行器類型、是否允許最大多少個同時
l SCHEDULER_JOB_DEFAULTS: 設置容錯時間為 2min,coalesce積攢得任務跑幾次,在時間允許得范圍內 True:默認最后一次,False:在時間允許范圍內全部提,max_instances 同時允許並發的最大並發量,misfire_grace_time 如果重啟任務在這個時間范圍內,就能繼續重啟
五、定時任務的執行順序
5.1添加任務
1、調起add_job方法
進而調到scheduler.py里面的_scheduler.add_job(**job_def)
2、調到schedulers\base.py
文件里面的add_jobs
進到add_job方法后,開始創建觸發器、分別讀取傳入參數
3、Job(self,**job_kwargs)初始化job
進到job.py里面的初始化任務,調起_modify()方法,對傳入的參數進行檢驗
4、參數校驗
具體的_modify()方法
先看傳入的func方法的參數,根據參數的類型,走不同的分支,方法進到的ref_to_obj,將func的文件名和具體的方法轉化為對象
Ref_to_obj:方法用於對傳進來的func參數,進行分割,分割出對應的模塊名和具體的方法,來導入模塊和檢查方法是否存在
和func相關聯的參數是args用來給func傳遞方法的參數,args一樣有程序的檢查規則,比如傳進來的參數個數和給定的func方法定義的參數一致,如果不一樣會報錯
如果傳入的參數個數不對,會報相應的錯誤,這個要檢查參數,對應類里面的定義的方法,一般第一個參數是帶有self的,這個也要作為參數,傳入到args里面
5、判斷調度器的狀態是否正常
一般啟動flask項目的時候,會加上aps.start(),啟動調度器,調度器正確,狀態為正常,添加定時任務成功
5.2 執行任務
前面添加了定時任務,Scheduler在指定時間內,執行任務
六、定時任務結合項目的實際操作
1、Jobs任務存儲,Mongodb存儲
有多種方式,可存放到mongo
1、 安裝mongodb服務
2、 安裝可視化工具,查看mongodb數據庫
3、 創建定時任務jobstore時,設置為mongodb,配置mongodb的配置文件
Robo 3t連接本地的數據庫報錯
原因:本機沒有安裝mongodb的服務,沒安裝之前,沒有MongoDB Server的服務,安裝成功之后,有MongoDB Server的服務
參考:https://www.cnblogs.com/simple-li/p/11334484.html
服務安裝成功后,再次使用robo 3t連接,即可連接成功
Flask配置信息里面配置Mongodb信息,配置具體的host,具體的數據庫、collections
2、定時任務的配置信息
定時任務的配置信息,和當前的flask項目的配置信息放到同一個config文件
BOOTSTRAP_SERVE_LOCAL = True from flask_apscheduler import APScheduler as _BaseAPScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.executors.pool import ThreadPoolExecutor # # 重寫實現上下文機制 class APScheduler(_BaseAPScheduler): def run_job(self, id, jobstore=None): with self.app.app_context(): super().run_job(id=id, jobstore=jobstore) # 定時器配置 # 持久化配置,數據持久化到mongodb SCHEDULER_JOBSTORES = {'default': MongoDBJobStore(host="127.0.0.1", port=27017, database="apscheduler", collection="jobs")} # 線程池配置,最大6個線程 SCHEDULER_EXECUTORS = {'default': ThreadPoolExecutor(6)} # 調度器開關開啟 SCHEDULER_API_ENABLED = True # 設置容錯時間為 2min # coalesce積攢得任務跑幾次,在時間允許得范圍內 True:默認最后一次,False:在時間允許范圍內全部提交 # max_instances 同時允許並發的最大並發量 # misfire_grace_time 如果重啟任務在這個時間范圍內,就能繼續重啟 SCHEDULER_JOB_DEFAULTS = {'coalesce': False, 'max_instances': 3, 'misfire_grace_time': 60} # 配置時區 SCHEDULER_TIMEZONE = 'Asia/Shanghai'
有的項目可能存在多個config,這個時候,要看flask項目啟動的時候,讀取是哪一個config文件,建議在app.run()斷點debug,大概就知道使用的是哪一個config配置文件,如果配置錯config文件,可能會出現flask項目啟動不了的情況,逐個debug找到問題,然后再解決
3、配置到flask項目
前端點擊定時執行,設置定時執行的時間
靜態頁面逐個配置,調到view里面的方法,進行add_jobs方法
aps = APScheduler(scheduler=BackgroundScheduler(timezone='Asia/Shanghai')) # test_suite_manage.test_suite_manage().new_test_run_list(id) # fun:前面的模塊要用[.],后面的具體的方法名用[:],這個方法應該是更新測試用例集的狀態,然后main方法會去檢查 # fun:需要傳入參數,要用args,不能直接在fun里面傳 # aps.add_job(id='run_job', func='app.core.coreservice:coreservice', trigger='cron', month=month, # day=day, hour=hour, minute=time, second='00') aps.add_job(id='run_job', func='app.db.test_suite_manage:test_suite_manage.new_test_run_list',args=('self',str(id)), trigger='cron', month=month, day=day, hour=hour, minute=time, second='00') aps.start()
如果有配置job持久化存儲,這個時候就可以到對應文件或數據庫,查看有沒有一個叫run_job的定時任務,如果有,定時任務就創建成功了,等待執行即可,如果沒有,則需要add_job debug逐層查看是不是在哪一步出現了錯誤,可參考上面的添加任務的步驟
和項目結合,看着雖然只有三個步驟,在參考資料較少的情況下,用了不少的時間啊。