apscheduler 定時任務框架


一、APScheduler簡介:

    Python的一個定時任務框架,滿足用戶定時執行或者周期性執行任務的需求,提供了基於日期date、固定的時間間隔interval、以及類似於Linux上的定時任務crontab類型的定時任務。並且該框架不僅可以添加、刪除定時任務,還可以將任務存儲到數據庫中,實現任務的持久化。

Python的第三方庫,用來提供Python的后台程序。包含四個組件,分別是:

triggers:任務觸發器組件,提供任務觸發方式

job stores: 任務商店組件,提供任務保存方式

executors:任務調度組件,提供任務調度方式

schedulers: 任務調度組件,提供任務工作方式

 二、APScheduler安裝

1)利用pip安裝(推薦)

# pip install apscheduler
2 基於源碼:https://pypi.python.org/pypi/APScheduler/

# python setup.py install

三、基本概念

1、 APScheduler有四種組件及相關說明

1) triggers(觸發器):觸發器包含調度邏輯,每一個作業有它自己的觸發器,用於決定接下來哪一個作業會運行,除了他們自己初始化配置外,觸發器完全是無狀態的。
2 )job stores(作業存儲):用來存儲被調度的作業,默認的作業存儲器是簡單地把作業任務保存在內存中,其他作業存儲器可以將任務保存到各種數據庫中,支持MongoDB、Redis、SQLAlchemy存儲方式。當對作業任務進行持久化存儲的時候,作業的數據將被序列化,重新讀取作業時在反序列化。

3)executors(執行器):執行器用來執行定時任務,只是將需要執行的任務放在新的線程或者線程池中運行。當作業任務完成時,執行器就會通知調度器。對於執行器,默認情況下選擇ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任務如比較消耗CPU的任務則可以選擇ProcessPoolExecutor,當然根據實際需求可以同時使用兩種執行器

4)schedulers(調度器):調度器是將其他部分聯系在一起,一般在應用程序中只有一個調度器,應用開發者不會直接操作觸發器、任務存儲以及執行器。相反調度器提供了處理的接口。通過調度器完成任務的存儲以及執行器的配置操作,如可以添加、移除、修改任務作業。

APScheduler提供了多種調度器,可以根據具體需求來選擇合適的調度器,常用的調度器有:

BlockingScheduler:適合於只在進程中運行單個任務的情況,通常在調度器是你唯一要運行的東西時使用

BackgroundScheduler:適合於要求任務在程序后台運行的情況,當希望調度器在應用后台執行時使用。

AsyncIOScheduler:適合於使用asyncio框架的情況

GeventScheduler:適合於使用gevent框架的情況

TornadoScheduler:適合於使用Tornado框架的應用

TwistedScheduler:適合使用Twisted框架的應用

QtScheduler:適合使用QT的情況

2、配置調度器

APScheduler提供了許多不同的方式來配置調度器,你可以使用一個配置字典或者作為參數關鍵字的方式傳入。你也可以先創建調度器。在配置和添加作業,這樣可以在不同的環境中得到更大的靈活性。

3、簡單的實例

 

from apscheduler.schedulers.blocking import BlockingScheduler
import time
#實例化一個調度器
scheduler = BlockingScheduler()

def job1():
print "%s: 執行任務" % time.asctime()

# 添加任務並設置觸發方式為3s一次

scheduler.add_job(job1, 'interval', seconds=3)
#開始運行調度器
scheduler.start()

 

四、各組件功能

 1、trigger組件

trigger提供任務的觸發方式,共三種方式:

      date:只在某個時間點執行一次run_date(datetime|str)

scheduler.add_job(my_job, 'date', run_date=date(2017, 9, 8), args=[])
scheduler.add_job(my_job, 'date', run_date=datetime(2017, 9, 8, 21, 30, 5), args=[])
scheduler.add_job(my_job, 'date', run_date='2019-6-12 21:30:05', args=[])
# The 'date' trigger and datetime.now() as run_date are implicit
sched.add_job(my_job, args=[[])

 interval:每隔一段時間執行一次weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0,

start_date=None, end_date=None, timezone=None
scheduler.add_job(my_job, 'interval', hours=2)
scheduler.add_job(my_job, 'interval', hours=2, start_date='2017-9-8 21:30:00',
end_date='2019-06-12 21:30:00)
@scheduler.scheduled_job('interval', id='my_job_id', hours=2)
def my_job():
    print("Hello World")

 

cron:使用Linux下crontab的方式(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)

       sched.add_job(my_job, 'cron', hour=3, minute=30)

    sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2017-10-
30')
    @sched.scheduled_job('cron', id='my_job_id', day='last sun')
    def some_decorated_task():
        print("I am printed at 00:00:00 on the last Sunday of every month!")

2、scheduler組件

scheduler組件提供執行的方式,在不同的運行環境中選擇合適的方式

 BlockingScheduler:進程中只運行調度器時的方式

from apscheduler.schedulers.blocking import BlockingScheduler
import time
scheduler = BlockingScheduler()
def job1():

print "%s: 執行任務" % time.asctime()

scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()

BackgroundScheduler:不想使用任何框架時的方式

from apscheduler.schedulers.background import BackgroundScheduler
import time
scheduler = BackgroundScheduler()
def job1():

print "%s:執行任務 " % time.asctime() 

scheduler.add_job(job1, 'interval', seconds=3)

scheduler.start()
while True:
    pass

AsyncIOScheduler: asyncio module的方式( Python3)

from apscheduler.schedulers.asyncio import AsyncIOScheduler
try:
    import asyncio
except ImportError:
    import trollius as asyncio
...
...

# while True pass

try:
    asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
    pass

GeventScheduler: gevent方式 

from apscheduler.schedulers.gevent import GeventScheduler
...

...

g = scheduler.start()
# while True:pass
try:
    g.join()
except (KeyboardInterrupt, SystemExit):
    pass

TornadoScheduler: Tornado方式

from tornado.ioloop import IOLoop
from apscheduler.schedulers.tornado import TornadoScheduler

... 

...

# while True:pass
try:
    IOLoop.instance().start()
except (KeyboardInterrupt, SystemExit):
    pass

TwistedScheduler: Twisted方式

from twisted.internet import reactor
from apscheduler.schedulers.twisted import TwistedScheduler

... 

...

# while True:pass
try:
    reactor.run()
except (KeyboardInterrupt, SystemExit):
    pass

 

QtScheduler: Qt方式

3、executors組件

executors組件提供任務的調度方式

base

debug
gevent 

pool(max_workers=10) 

twisted 

4、jobstore組件

jobstore提供任務的各種持久化方式

base

memory
mongodb
    scheduler.add_jobstore('mongodb', collection='example_jobs') 

redis

    scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times')
rethinkdb
    scheduler.add_jobstore('rethinkdb', database='apscheduler_example') 

sqlalchemy

  scheduler.add_jobstore('sqlalchemy', url=url)

zookeeper

  scheduler.add_jobstore('zookeeper', path='/example_jobs')

 

五、任務操作

1、添加任務add_job(如上)

如果使用了任務的存儲,開啟時最好添加replace_existing=True,否則每次開啟時都會創建任務的副本,開啟后任務不會馬上啟動,可修改triger參數

2、刪除任務remove_job

#根據任務實例刪除

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

# 根據任務id刪除

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

3、任務的暫停pause_job和繼續resume_job

job = scheduler.add_job(myfunc, 'interval', minutes=2)
#根據任務實例
job.pause()
job.resume()

# 根據任務id暫停

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.pause_job('my_job_id')
4、任務的修飾modify和重設reschedule_job

修飾:job.modify(max_instances=6, name='Alternate name') 

重設:scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

5、調度器操作

開啟:scheduler.start()

關閉:scheduler.shotdown(wait=True | False)

暫停:scheduler.pause()

繼續:scheduler.resume() 

監聽:http://apscheduler.readthedocs.io/en/v3.3.0/modules/events.html#module-apscheduler.events

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
官方實例
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors,
job_defaults=job_default

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM