python定時任務APScheduler


APScheduler定時任務

APScheduler 支持三種調度任務:固定時間間隔,固定時間點(日期),Linux 下的 Crontab 命令。同時,它還支持異步執行、后台執行調度任務。

一、基本架構

  1. 觸發器 triggers:設定觸發任務的條件

    描述一個任務何時被觸發,按日期或按時間間隔或按 cronjob 表達式三種方式觸發

  2. 任務存儲器 job stores:存放任務,可以放內存(默認)或數據庫

    注:調度器之間不能共享任務存儲器

  3. 執行器 executors:用於執行任務,可設定執行模式

    將指定的作業提交到線程池或者進程池中運行,任務完成通知調度器觸發相應的事件。

  4. 調度器 schedulers:將上方三個組件作為參數,創建調度器實例執行。

    協調三個組件的運行。

二、調度器組件(schedulers)

  • BlockingScheduler阻塞式調度器

    調度程序是進程中唯一運行的進程,調用start函數會阻塞當前線程,不能立即返回。

from apscheduler.schedulers.blocking import BlockingScheduler
import time

scheduler = BlockingScheduler()
 
def job1():
    print "%s: 執行任務"  % time.asctime()

scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()
  • BackgroundScheduler后台調度器

    當前線程不會阻塞,調度器后台執行

from apscheduler.schedulers.background import BackgroundScheduler
import time

scheduler = BackgroundScheduler()
 
def job1():
    print "%s: 執行任務"  % time.asctime()

scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()
time.sleep(10)

注:10秒執行完后,程序結束。

  • AsyncIOSchedulerAsyncIO調度器

    適用於使用了asyncio的情況

from apscheduler.schedulers.asyncio import AsyncIOScheduler
import asyncio
...
...
try:
    asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
    pass
  • GeventSchedulerGevent調度器

    使用了Gevent的情況

from apscheduler.schedulers.gevent import GeventScheduler
...
...
g = scheduler.start()
try:
    g.join()
except (KeyboardInterrupt, SystemExit):
    pass
  • TornadoSchedulerTornado調度器

    適用於構建Tornado應用

  • TwistedScheduler Twisted調度器

    適用於構建Twisted應用

  • QtScheduler Qt調度器

    適用於構建Qt應用

三、觸發器組件(trigger)

  1. date:只在某個時間點執行一次,具體日期

    run_date(datetime|str)

    scheduler.add_job(my_job, 'date', run_date=datetime(2019, 7, 12, 15, 30, 5), args=[])
    scheduler.add_job(my_job, 'date', run_date="2019-07-12", args=[])
    

    timezone 指定時區

  2. interval:每隔一段時間允許一次,時間間隔

    weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None

    scheduler.add_job(my_job, 'interval', hours=2)
    scheduler.add_job(my_job, 'interval', hours=2, start_date='2017-9-8 21:30:00', end_date='2018-06-15 21:30:00)
    
  3. cron:任務的運行周期

    (year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)

    除了week和 day_of_week,它們的默認值是*

    例如day=1, minute=20,這就等於year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0,工作將在每個月的第一天以每小時20分鍾的時間執行

    表達式類型

    表達式 參數類型 描述
    * 所有 通配符。例:minutes=*即每分鍾觸發
    */a 所有 可被a整除的通配符。
    a-b 所有 范圍a-b觸發
    a-b/c 所有 范圍a-b,且可被c整除時觸發
    xth y 第幾個星期幾觸發。x為第幾個,y為星期幾
    last x 一個月中,最后個星期幾觸發
    last 一個月最后一天觸發
    x,y,z 所有 組合表達式,可以組合確定值或上方的表達式

注:當設置的時間間隔小於,任務的執行時間,線程會阻塞住,等待執行完了才能執行下一個任務,可以設置max_instance指定一個任務同一時刻有多少個實例在運行,默認為1

四、配置調度器

轉自:https://www.jianshu.com/p/4f5305e220f0

線程池執行器默認為10,內存任務存儲器為memoryjobstore,如果想自己配置的話可以執行以下操作

需求:

兩個任務儲存器分別搭配兩個執行器;同時,還要修改任務的默認參數;最后還要改時區
  • 名稱為“mongo”的MongoDBJobStore
  • 名稱為“default”的SQLAlchemyJobStore
  • 名稱為“ThreadPoolExecutor ”的ThreadPoolExecutor,最大線程20個
  • 名稱“processpool”的ProcessPoolExecutor,最大進程5個
  • UTC時間作為調度器的時區
  • 默認為新任務關閉合並模式()
  • 設置新任務的默認最大實例數為3
方法一:
from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
方法二:
from apscheduler.schedulers.background import BackgroundScheduler


# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
    'apscheduler.jobstores.mongo': {
         'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
})

方法三:
from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler()

# ..這里可以添加任務

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

五、啟動調度器

除了BlockingScheduler外,其他非阻塞的調度器都會立即返回,運行之后的代碼。

BlockingScheduler需要將運行的代碼放在start()之前

1.添加任務

1.調用add_job()  #可以傳參max_instance,同一任務的運行實例個數
  當有任務中途中斷,后面恢復后,有N個任務沒有執行  
    coalesce:true ,恢復的任務會執行一次
    coalesce:false,恢復后的任務會執行N次配合misfire_grace_time使用
    misfire_grace_time設置時間差值,由於某些原因沒有運行,再次提交時,大於設置的時間,實例不會運行。
2.裝飾器scheduled_job()
立即運行可以不設置trigger參數

2.移除任務

# 根據任務實例刪除
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

# 根據任務id刪除
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

3.暫停任務

job = scheduler.add_job(myfunc, 'interval', minutes=2)
# 根據任務實例
job.pause()  #暫停
job.resume() #繼續

# 根據任務id暫停
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.pause_job('my_job_id')
scheduler.resume_job('my_job_id')

4.調度器操作

scheduler.start() #開啟
scheduler.shotdown(wait=True|False) #關閉  False 無論任務是否執行,強制關閉

異常捕獲

# 可以添加apscheduler日志至DEBUG級別,這樣就能捕獲異常信息
import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM