python設定時間間隔和時間范圍執行任務: Python任務調度模塊APScheduler


https://segmentfault.com/a/1190000011084828

 

介紹

官網文檔:http://apscheduler.readthedoc...
API:http://apscheduler.readthedoc...

APScheduler是一個python的第三方庫,用來提供python的后台程序。包含四個組件,分別是:

  • triggers: 任務觸發器組件,提供任務觸發方式

  • job stores: 任務商店組件,提供任務保存方式

  • executors: 任務調度組件,提供任務調度方式

  • schedulers: 任務調度組件,提供任務工作方式

安裝

pip 安裝

$ pip install apscheduler

源碼安裝

$ python setup.py install

簡單的實例

from apscheduler.schedulers.blocking import BlockingScheduler
import time

# 實例化一個調度器 scheduler = BlockingScheduler() def job1(): print "%s: 執行任務" % time.asctime() # 添加任務並設置觸發方式為3s一次 scheduler.add_job(job1, 'interval', seconds=3) # 開始運行調度器 scheduler.start()

輸出:

$ python first.py Fri Sep 8 20:41:55 2017: 執行任務 Fri Sep 8 20:41:58 2017: 執行任務 ...

各組件功能

trigger組件

trigger提供任務的觸發方式,共三種方式:

  • date:只在某個時間點執行一次run_date(datetime|str)

scheduler.add_job(my_job, 'date', run_date=date(2017, 9, 8), args=[]) scheduler.add_job(my_job, 'date', run_date=datetime(2017, 9, 8, 21, 30, 5), args=[]) scheduler.add_job(my_job, 'date', run_date='2017-9-08 21:30:05', args=[]) # The 'date' trigger and datetime.now() as run_date are implicit sched.add_job(my_job, args=[[])
  • interval: 每隔一段時間執行一次weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None

scheduler.add_job(my_job, 'interval', hours=2) scheduler.add_job(my_job, 'interval', hours=2, start_date='2017-9-8 21:30:00', end_date='2018-06-15 21:30:00) @scheduler.scheduled_job('interval', id='my_job_id', hours=2) def my_job(): print("Hello World")
  • cron: 使用同linux下crontab的方式(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)

sched.add_job(my_job, 'cron', hour=3, minute=30) sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2017-10-30') @sched.scheduled_job('cron', id='my_job_id', day='last sun') def some_decorated_task(): print("I am printed at 00:00:00 on the last Sunday of every month!")

scheduler組件

scheduler組件提供執行的方式,在不同的運用環境中選擇合適的方式

  • BlockingScheduler: 進程中只運行調度器時的方式

from apscheduler.schedulers.blocking import BlockingScheduler import time scheduler = BlockingScheduler() def job1(): print "%s: 執行任務" % time.asctime() scheduler.add_job(job1, 'interval', seconds=3) scheduler.start()
  • BackgroundScheduler: 不想使用任何框架時的方式

from apscheduler.schedulers.background import BackgroundScheduler import time scheduler = BackgroundScheduler() def job1(): print "%s: 執行任務" % time.asctime() scheduler.add_job(job1, 'interval', seconds=3) scheduler.start() while True: pass
  • AsyncIOScheduler: asyncio module的方式(Python3)

from apscheduler.schedulers.asyncio import AsyncIOScheduler try: import asyncio except ImportError: import trollius as asyncio ... ... # while True:pass try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): pass
  • GeventScheduler: gevent方式

from apscheduler.schedulers.gevent import GeventScheduler ... ... g = scheduler.start() # while True:pass try: g.join() except (KeyboardInterrupt, SystemExit): pass
  • TornadoScheduler: Tornado方式

from tornado.ioloop import IOLoop from apscheduler.schedulers.tornado import TornadoScheduler ... ... # while True:pass try: IOLoop.instance().start() except (KeyboardInterrupt, SystemExit): pass
  • TwistedScheduler: Twisted方式

from twisted.internet import reactor from apscheduler.schedulers.twisted import TwistedScheduler ... ... # while True:pass try: reactor.run() except (KeyboardInterrupt, SystemExit): pass
  • QtScheduler: Qt方式

executors組件

executors組件提供任務的調度方式

  • base

  • debug

  • gevent

  • pool(max_workers=10)

  • twisted

jobstore組件

jobstore提供任務的各種持久化方式

  • base

  • memory

  • mongodb
    scheduler.add_jobstore('mongodb', collection='example_jobs')

  • redis
    scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times')

  • rethinkdb
    scheduler.add_jobstore('rethinkdb', database='apscheduler_example')

  • sqlalchemy
    scheduler.add_jobstore('sqlalchemy', url=url)

  • zookeeper
    scheduler.add_jobstore('zookeeper', path='/example_jobs')

任務操作

添加任務add_job(如上)

如果使用了任務的存儲,開啟時最好添加replace_existing=True,否則每次開啟都會創建任務的副本
開啟后任務不會馬上啟動,可修改trigger參數

刪除任務remove_job

# 根據任務實例刪除
job = scheduler.add_job(myfunc, 'interval', minutes=2) job.remove() # 根據任務id刪除 scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.remove_job('my_job_id')

任務的暫停pause_job和繼續resume_job

job = scheduler.add_job(myfunc, 'interval', minutes=2) # 根據任務實例 job.pause() job.resume() # 根據任務id暫停 scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.pause_job('my_job_id') scheduler.resume_job('my_job_id')

任務的修飾modify和重設reschedule_job

修飾:job.modify(max_instances=6, name='Alternate name')
重設:scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

調度器操作

  • 開啟 scheduler.start()

  • 關閉 scheduler.shotdown(wait=True | False)

  • 暫停 scheduler.pause()

  • 繼續 scheduler.resume()

  • 監聽 http://apscheduler.readthedoc...

def my_listener(event):
    if event.exception: print('The job crashed :(') else: print('The job worked :)') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

官方實例

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { 'mongo': MongoDBJobStore(), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaul

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM