记Python的scheduler自动调度


前言:因为去年后半年岗位调整,然后一直没更新,现在有机会了一下。现在岗位是做一个python开发,所以记录一下Python的自动调度。

网上也有Apscheduler的官方文档,但是不是很详细。所以本文编写一些详细的地方。 官网文档翻译版https://www.jianshu.com/p/4f5305e220f0

1.python的自动调度配置

def setup(self):
self.scheduler = BackgroundScheduler({
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite' #可以使用sqlite及系统配置的数据库本文是以sqlite为例子
# 'url': self.app.config["TRIGGER_DATABASE_URL"]
},

'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '50'
},
'apscheduler.job_defaults.coalesce': 'true',
'apscheduler.job_defaults.max_instances': '1000',
'apscheduler.job_defaults.misfire_grace_time': 600, # 600秒的任务超时容错
# 'apscheduler.timezone': 'UTC', UTC加上的话会导致时间相差8小时,刚好为东八区的时区,根据个人需要使用
})
2.添加调度任务
  
with self.app.app_context():
projects = AutoTestPlan.query.all() #将自动调度的信息存储在这张表
for p in projects:
if p.enable and self.scheduler.get_job(p.id) is None: enable 标志任务是否启动
if p.cron is None: #判断cron表达式是否为空
continue
cron = p.cron.replace("\n", "").strip().split(" ")
if len(cron) < 5:
continue
j = self.scheduler.add_job(func=run_job, trigger='cron', name=p.name, replace_existing=True,
minute=cron[0], hour=cron[1], day=cron[2], month=cron[3], day_of_week=cron[4], id="%d" % (p.id,),
args=(p.id, p.project_type, p.project_id))

else:
self.update_job(p.id)
上述中 run_job是你自动调度需要执行的方法,args是方法传递的参数
import requests
import os

def run_job(id, project_type, project_id):
s = requests.Session()
port = os.environ.get("PORT")
if project_type == '1':
s.get("http://127.0.0.1:%s/test_run_auto/auto/%d" % (port, id))
else:
s.get("http://127.0.0.1:%s/test_run_auto_robot/%d" % (port, id))
上述就可以完成自动调度了,需要注意的是cron表达式,前端表达式的插件很多,要保持cron表达式正确否则可能会出现调度时间有误。
下面是一个更为简单的例子
def setup():
scheduler = BackgroundScheduler({
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs1.sqlite'
# 'url': self.app.config["TRIGGER_DATABASE_URL"]
},

'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '50'
},
'apscheduler.job_defaults.coalesce': 'true',
'apscheduler.job_defaults.max_instances': '1000',
'apscheduler.job_defaults.misfire_grace_time': 600, # 600秒的任务超时容错
# 'apscheduler.timezone': 'UTC',
})
try:
p = scheduler.add_job(func=task_status, trigger='cron', name='task_status', replace_existing=True,
second='*/1', id='01')
scheduler.start()
except Exception as e:
print(e)
上述自动调度的task_status方法调用的逻辑
def task_status():
with app.app_context():
print('你要做的事情')



免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM