Python下定時任務框架APScheduler的使用


1.APScheduler簡介: 

      APScheduler是Python的一個定時任務框架,可以很方便的滿足用戶定時執行或者周期執行任務的需求,它提供了基於日期date、固定時間間隔interval 、以及類似於Linux上的定時任務crontab類型的定時任務。並且該框架不僅可以添加、刪除定時任務,還可以將任務存儲到數據庫中,實現任務的持久化,所以使用起來非常方便。

2.APScheduler安裝:

      APScheduler的安裝相對來說也非常簡單,可以直接利用pip安裝,如果沒有pip可以下載源碼,利用源碼安裝。

      1).利用pip安裝:(推薦) 

      # pip install apscheduler 

      2).基於源碼安裝:https://pypi.python.org/pypi/APScheduler/ 

      # python setup.py install 

      3.基本概念

      APScheduler有四種組件及相關說明:

      1) triggers(觸發器):觸發器包含調度邏輯,每一個作業有它自己的觸發器,用於決定接下來哪一個作業會運行,除了他們自己初始化配置外,觸發器完全是無狀態的。

      2)job stores(作業存儲):用來存儲被調度的作業,默認的作業存儲器是簡單地把作業任務保存在內存中,其它作業存儲器可以將任務作業保存到各種數據庫中,支持MongoDB、Redis、SQLAlchemy存儲方式。當對作業任務進行持久化存儲的時候,作業的數據將被序列化,重新讀取作業時在反序列化。

      3) executors(執行器):執行器用來執行定時任務,只是將需要執行的任務放在新的線程或者線程池中運行。當作業任務完成時,執行器將會通知調度器。對於執行器,默認情況下選擇ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任務如比較消耗CPU的任務則可以選擇ProcessPoolExecutor,當然根據根據實際需求可以同時使用兩種執行器。

      4) schedulers(調度器):調度器是將其它部分聯系在一起,一般在應用程序中只有一個調度器,應用開發者不會直接操作觸發器、任務存儲以及執行器,相反調度器提供了處理的接口。通過調度器完成任務的存儲以及執行器的配置操作,如可以添加。修改、移除任務作業。  

    APScheduler提供了多種調度器,可以根據具體需求來選擇合適的調度器,常用的調度器有:

      BlockingScheduler:適合於只在進程中運行單個任務的情況,通常在調度器是你唯一要運行的東西時使用。

      BackgroundScheduler: 適合於要求任何在程序后台運行的情況,當希望調度器在應用后台執行時使用。

      AsyncIOScheduler:適合於使用asyncio框架的情況

      GeventScheduler: 適合於使用gevent框架的情況

      TornadoScheduler: 適合於使用Tornado框架的應用

      TwistedScheduler: 適合使用Twisted框架的應用

      QtScheduler: 適合使用QT的情況

 

   1)下面一個簡單的示例:

      import time
      from apscheduler.schedulers.blocking import BlockingScheduler
      def test_job():
        print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
      scheduler = BlockingScheduler()
      '''
      #該示例代碼生成了一個BlockingScheduler調度器,使用了默認的默認的任務存儲MemoryJobStore,以及默認的執行器ThreadPoolExecutor,並且最大線程數為10。
      '''
      scheduler.add_job(test_job, 'interval', seconds=5, id='test_job')
      '''
      #該示例中的定時任務采用固定時間間隔(interval)的方式,每隔5秒鍾執行一次。
      #並且還為該任務設置了一個任務id
    scheduler.start()

  2)如果想執行一些復雜任務,如上邊所說的同時使用兩種執行器,或者使用多種任務存儲方式,並且需要根據具體情況對任務的一些默認參數進行調整。可以參考下面的方式。(源碼解析:http://apscheduler.readthedocs.io/en/latest/userguide.html)

第一種方式:

      from pytz import utc
      from apscheduler.schedulers.background import BackgroundScheduler  # 導入調度器
from apscheduler.jobstores.mongodb import MongoDBJobStore # 導入作業存儲
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore # 導入作業存儲
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor # 導入執行器
jobstores
= {   'mongo': MongoDBJobStore(),   'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = {   'default': ThreadPoolExecutor(20),   'processpool': ProcessPoolExecutor(5) } job_defaults = {   'coalesce': False,   'max_instances': 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

第二種方式:

      from apscheduler.schedulers.background import BackgroundScheduler
      scheduler = BackgroundScheduler({
        'apscheduler.jobstores.mongo': {
          'type': 'mongodb'
        },
        'apscheduler.jobstores.default': {
          'type': 'sqlalchemy',
          'url': 'sqlite:///jobs.sqlite'
        },
        'apscheduler.executors.default': {
          'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
          'max_workers': '20'
        },
        'apscheduler.executors.processpool': {
          'type': 'processpool',
          'max_workers': '5'
        },
        'apscheduler.job_defaults.coalesce': 'false',
        'apscheduler.job_defaults.max_instances': '3',
        'apscheduler.timezone': 'UTC',
      })

 第三種方式:

      from pytz import utc
      from apscheduler.schedulers.background import BackgroundScheduler
      from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
      from apscheduler.executors.pool import ProcessPoolExecutor
      jobstores = {
        'mongo': {'type': 'mongodb'},
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
      }
      executors = {
        'default': {'type': 'threadpool', 'max_workers': 20},
        'processpool': ProcessPoolExecutor(max_workers=5)
      }
      job_defaults = {
        'coalesce': False,
        'max_instances': 3
      }
      scheduler = BackgroundScheduler()
      scheduler.configure(jobstores=jobstores, executors=executors,job_defaults=job_defaults, timezone=utc)

5.對任務作業的基本操作:

      1).添加作業有兩種方式:第一種可以直接調用add_job(),第二種使用scheduled_job()修飾器。

      而add_job()是使用最多的,它可以返回一個apscheduler.job.Job實例,因而可以對它進行修改或者刪除,而使用修飾器添加的任務添加之后就不能進行修改。

      #!/usr/bin/env python
      #-*- coding:UTF-8
      import time
      import datetime
      from apscheduler.schedulers.blocking import BlockingScheduler
      def job1(f):
          print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), f
      def job2(arg1, args2, f):
          print f, args1, args2
      def job3(**args):
          print args

 APScheduler支持以下三種定時任務:

      cron: crontab類型任務

      interval: 固定時間間隔任務

      date: 基於日期時間的一次性任務

scheduler = BlockingScheduler()

      #循環任務示例

      scheduler.add_job(job1, 'interval', seconds=5, args=(1,), id='test_job1')

      #定時任務示例

      scheduler.add_job(job1, 'cron', second='*/5', args=(1,2,3,), id='test_job2')

      #一次性任務示例

      scheduler.add_job(job1, next_run_time=(datetime.datetime.now() + datetime.timedelta(seconds=10)), args=(1,), id='test_job3')

傳遞參數的方式有元組(tuple)、列表(list)、字典(dict)

      注意:不過需要注意采用元組傳遞參數時后邊需要多加一個逗號     

      #基於list

      scheduler.add_job(job2, 'interval', seconds=5, args=['a','b','list'], id='test_job4')

      #基於tuple

      scheduler.add_job(job2, 'interval', seconds=5, args=('a','b','tuple',), id='test_job5')

      #基於dict

      scheduler.add_job(job3, 'interval', seconds=5, kwargs={'f':'dict', 'a':1,'b':2}, id='test_job6)

      print scheduler.get_jobs()

      scheduler.start()

或者使用scheduled_job()修飾器來添加作業: 

      @sched.scheduled_job('cron', second='*/5' ,id='my_job_id',)

      def test_task():

        print("Hello world!")

2).獲得任務列表:

      可以通過get_jobs方法來獲取當前的任務列表,也可以通過get_job()來根據job_id來獲得某個任務的信息。並且apscheduler還提供了一個print_jobs()方法來打印格式化的任務列表。

      例如: 

      scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')

      print scheduler.get_job('my_job_id')

      print scheduler.get_jobs()

3).修改任務:

      修改任務的屬性可以使用apscheduler.job.Job.modify()或者modify_job()方法,可以修改除了id的其它任何屬性。

      例如: 

      job = scheduler.add_job(my_job, 'interval', seconds=5, id='my_job' name='test_job')

      job.modify(max_instances=5, name='my_job')

 4).刪除任務:

      刪除調度器中的任務有可以用remove_job()根據job ID來刪除指定任務或者使用remove(),如果使用remove()需要事先保存在添加任務時返回的實例對象,任務刪除后就不會在執行。

      注意:通過scheduled_job()添加的任務只能使用remove_job()進行刪除。

      例如: 

      job = scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')

      job.remove() 

      或者 

      scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')

      scheduler.remove_job('my_job')

5).暫停與恢復任務:

      暫停與恢復任務可以直接操作任務實例或者調度器來實現。當任務暫停時,它的運行時間會被重置,暫停期間不會計算時間。

      暫停任務: 

      apscheduler.job.Job.pause()

      apscheduler.schedulers.base.BaseScheduler.pause_job() 

      恢復任務 

      apscheduler.job.Job.resume()

      apscheduler.schedulers.BaseScheduler.resume_job()

      

6).啟動調度器

      可以使用start()方法啟動調度器,BlockingScheduler需要在初始化之后才能執行start(),對於其他的Scheduler,調用start()方法都會直接返回,然后可以繼續執行后面的初始化操作。

      例如: 

      from apscheduler.schedulers.blocking import BlockingScheduler

      def my_job():

        print "Hello world!"

      scheduler = BlockingScheduler()

      scheduler.add_job(my_job, 'interval', seconds=5)

      scheduler.start()

 7).關閉調度器:

      使用下邊方法關閉調度器: 

      scheduler.shutdown() 

      默認情況下調度器會關閉它的任務存儲和執行器,並等待所有正在執行的任務完成,如果不想等待,可以進行如下操作: 

      scheduler.shutdown(wait=False)

 

注意:

      當出現No handlers could be found for logger “apscheduler.scheduler”次錯誤信息時,說明沒有 logging模塊的logger存在,所以需要添加上,對應新增內容如下所示(僅供參):

      import logging

      logging.basicConfig(

    level=logging.DEBUG,

        format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',

        datafmt='%a, %d %b %Y %H:%M:%S',

        filename='/var/log/aaa.txt',

        filemode='a'

  )

 

   

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM