記Python的scheduler自動調度


前言:因為去年后半年崗位調整,然后一直沒更新,現在有機會了一下。現在崗位是做一個python開發,所以記錄一下Python的自動調度。

網上也有Apscheduler的官方文檔,但是不是很詳細。所以本文編寫一些詳細的地方。 官網文檔翻譯版https://www.jianshu.com/p/4f5305e220f0

1.python的自動調度配置

def setup(self):
self.scheduler = BackgroundScheduler({
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite' #可以使用sqlite及系統配置的數據庫本文是以sqlite為例子
# 'url': self.app.config["TRIGGER_DATABASE_URL"]
},

'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '50'
},
'apscheduler.job_defaults.coalesce': 'true',
'apscheduler.job_defaults.max_instances': '1000',
'apscheduler.job_defaults.misfire_grace_time': 600, # 600秒的任務超時容錯
# 'apscheduler.timezone': 'UTC', UTC加上的話會導致時間相差8小時,剛好為東八區的時區,根據個人需要使用
})
2.添加調度任務
  
with self.app.app_context():
projects = AutoTestPlan.query.all() #將自動調度的信息存儲在這張表
for p in projects:
if p.enable and self.scheduler.get_job(p.id) is None: enable 標志任務是否啟動
if p.cron is None: #判斷cron表達式是否為空
continue
cron = p.cron.replace("\n", "").strip().split(" ")
if len(cron) < 5:
continue
j = self.scheduler.add_job(func=run_job, trigger='cron', name=p.name, replace_existing=True,
minute=cron[0], hour=cron[1], day=cron[2], month=cron[3], day_of_week=cron[4], id="%d" % (p.id,),
args=(p.id, p.project_type, p.project_id))

else:
self.update_job(p.id)
上述中 run_job是你自動調度需要執行的方法,args是方法傳遞的參數
import requests
import os

def run_job(id, project_type, project_id):
s = requests.Session()
port = os.environ.get("PORT")
if project_type == '1':
s.get("http://127.0.0.1:%s/test_run_auto/auto/%d" % (port, id))
else:
s.get("http://127.0.0.1:%s/test_run_auto_robot/%d" % (port, id))
上述就可以完成自動調度了,需要注意的是cron表達式,前端表達式的插件很多,要保持cron表達式正確否則可能會出現調度時間有誤。
下面是一個更為簡單的例子
def setup():
scheduler = BackgroundScheduler({
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs1.sqlite'
# 'url': self.app.config["TRIGGER_DATABASE_URL"]
},

'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '50'
},
'apscheduler.job_defaults.coalesce': 'true',
'apscheduler.job_defaults.max_instances': '1000',
'apscheduler.job_defaults.misfire_grace_time': 600, # 600秒的任務超時容錯
# 'apscheduler.timezone': 'UTC',
})
try:
p = scheduler.add_job(func=task_status, trigger='cron', name='task_status', replace_existing=True,
second='*/1', id='01')
scheduler.start()
except Exception as e:
print(e)
上述自動調度的task_status方法調用的邏輯
def task_status():
with app.app_context():
print('你要做的事情')



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM