熟悉Python的人可能都知道,Apscheduler是python里面一款非常優秀的任務調度框架,這個框架是從鼎鼎大名的Quartz移植而來。
之前有用過Flask版本的Apscheduler做定時任務。剛好前不久接觸了Tornado,順便玩玩Tornado版本的Apscheduler。
本篇做了一個簡單的Wdb頁面,用於添加和刪除定時任務,小伙伴們可以基於這個做一些擴展,比如把定時定時任務寫入數據庫,改變cron規則等等。
主要功能點如下:
#新增任務(需要動態改變job_id的值)
http://localhost:8888/scheduler?job_id=1&action=add
#刪除任務(需要動態改變job_id的值)
http://localhost:8888/scheduler?job_id=1&action=remov
執行結果可以在console看到
from datetime import datetime from tornado.ioloop import IOLoop, PeriodicCallback from tornado.web import RequestHandler, Application from apscheduler.schedulers.tornado import TornadoScheduler scheduler = None job_ids = [] # 初始化
def init_scheduler(): global scheduler scheduler = TornadoScheduler() scheduler.start() print('[Scheduler Init]APScheduler has been started') # 要執行的定時任務在這里
def task1(options): print('{} [APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), options)) class MainHandler(RequestHandler): def get(self): self.write('<a href="/scheduler?job_id=1&action=add">add job</a><br><a href="/scheduler?job_id=1&action=remove">remove job</a>') class SchedulerHandler(RequestHandler): def get(self): global job_ids job_id = self.get_query_argument('job_id', None) action = self.get_query_argument('action', None) if job_id: # add
if 'add' == action: if job_id not in job_ids: job_ids.append(job_id) scheduler.add_job(task1, 'interval', seconds=3, id=job_id, args=(job_id,)) self.write('[TASK ADDED] - {}'.format(job_id)) else: self.write('[TASK EXISTS] - {}'.format(job_id)) # remove
elif 'remove' == action: if job_id in job_ids: scheduler.remove_job(job_id) job_ids.remove(job_id) self.write('[TASK REMOVED] - {}'.format(job_id)) else: self.write('[TASK NOT FOUND] - {}'.format(job_id)) else: self.write('[INVALID PARAMS] INVALID job_id or action') if __name__ == "__main__": routes = [ (r"/", MainHandler), (r"/scheduler/?", SchedulerHandler), ] init_scheduler() app = Application(routes, debug=True) app.listen(8888) IOLoop.current().start()