python APScheduler模塊


簡介

一般來說, Celery是python可以執行定時任務的常用第三方庫, 但是Celery不支持動態添加定時任務 (Django有插件可以動態添加). 除此之外, APScheduler相對於Celery來說更加輕量級, 有學習的價值.

APScheduler有很多優點, 如: 支持持久化, 且可以動態添加定時任務.

官方文檔

$pip install apscheduler

APScheduler的各個組件的關系, 如下圖:
APScheduler各個部件

一般使用

步驟:

  1. 創建調度器

  2. 配置調度器

    • 任務存儲器
    • 執行器
    • 全局配置
  3. 添加任務

  4. 運行調度任務

  5. 修改/刪除任務

除此之外, 可以監聽事件, 執行自定義的函數

import datetime
from pytz import timezone

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore

job_stores = {
    'default': MemoryJobStore()
}
executors = {
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}


def hello_world():
    print("hello world")


# 阻塞調度器
scheduler = BlockingScheduler()
scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults)

# 在當前時間的3秒后, 觸發執行hello_world, 詳情見: "觸發器與調度器API"
scheduler.add_job(hello_world, "date", run_date=datetime.datetime.now() + datetime.timedelta(seconds=3),
                  timezone=timezone("Asia/Shanghai"))

scheduler.start()

調度器

配置作業存儲器和執行器可以在調度器中完成。例如添加、修改、移除作業,根據不同的應用場景,可以選擇不同的調度器,可選擇的調度器如下:

# 阻塞式調度器 [ 調度器是你程序中唯一要運行的東西 ]
from apscheduler.schedulers.blocking import BlockingScheduler

# 后台調度器 [ 應用程序后台靜默運行 ]
from apscheduler.schedulers.background import BackgroundScheduler

# AsyncIO調度器 [ 如果你的程序使用了 asyncio 庫 ]
from apscheduler.schedulers.asyncio import AsyncIOScheduler

# Gevent調度器  [ 如果你的程序使用了 gevent 庫 ]
from apscheduler.schedulers.gevent import GeventScheduler

# Tornado調度器 [ 如果你打算構建一個 Tornado 程序 ]
from apscheduler.schedulers.tornado import TornadoScheduler

# Twisted調度器 [ 如果你打算構建一個 Twisted 程序 ]
from apscheduler.schedulers.twisted import TwistedScheduler

# Qt調度器 [ 如果你打算構建一個 Qt 程序 ]
from apscheduler.schedulers.qt import QtScheduler

在使用非阻塞的調度器時需要注意:程序是否會退出從而無法執行任務

配置

有3種方式配置

方式一

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

# 鍵為名稱
# 值為字典 實例化對象作為值, 參數直接在實例化時傳入

jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, 
	job_defaults=job_defaults, timezone=utc)

方式二

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor

# 鍵為名稱,值要為字典,type指定調度器, 其它鍵值對指定參數

jobstores = {
    'mongo': {'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler()

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

方式三

from apscheduler.schedulers.background import BackgroundScheduler

# 前綴 "apscheduler." 是硬編碼的

# apscheduler.jobstores指定任務存儲器
# apscheduler.executors指定執行器
# 最后的 "." 指定名稱

scheduler = BackgroundScheduler({
    'apscheduler.jobstores.mongo': {
        'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
})

default這個名字是可以手動指定,但不指定時,APScheduler會使用默認值(調用add_executor/add_jobstore

執行器

處理作業的運行,通常通過在作業中提交指定的可調用對象到一個線程或者進程池來進行,當作業完成時,執行器會將通知調度器
步驟:

  1. 將執行器加入到調度器
  2. 添加任務時,指定執行器
# 線程池執行器
from apscheduler.executors.pool import ThreadPoolExecutor

# 進程池執行器
from apscheduler.executors.pool import ProcessPoolExecutor

# AsyncIO事件循環執行器
from apscheduler.executors.asyncio import AsyncIOExecutor

# Gevent事件循環執行器
from apscheduler.executors.gevent import GeventExecutor

# Tornado事件循環執行器
from apscheduler.executors.tornado import TornadoExecutor

默認ThreadPoolExecutor
ThreadPoolExecutorProcessPoolExecutor分別調用concurrent.futures.ThreadPoolExecutorconcurrent.futures.ProcessPoolExecutor, 參數有:max_workers=10, pool_kwargs=None

使用例子

import datetime
from pytz import timezone

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor

executors = {
    'pool': ThreadPoolExecutor(max_workers=5)
}


def hello_world():
    print("hello world")


scheduler = BlockingScheduler()
# 添加到配置文件
scheduler.configure(executors=executors)
# 指定執行器
scheduler.add_job(hello_world, "date", run_date=datetime.datetime.now() + datetime.timedelta(seconds=3),
                  timezone=timezone("Asia/Shanghai"), executor="pool")
scheduler.start()

任務存儲器

存儲被調度的作業,默認的作業存儲器只是簡單地把作業保存在內存中,其他的作業存儲器則是將作業保存在數據庫中,當作業被保存在一個持久化的作業存儲器中的時候,該作業的數據會被序列化,並在加載時被反序列化,需要說明的是,作業存儲器不能共享調度器
步驟:

  1. 定義任務存儲器
  2. 使用任務存儲器
# 內存任務存儲器
from apscheduler.jobstores.memory import MemoryJobStore

# 使用SQLAlchemy ORM的任務存儲器
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

# MongoDB任務存儲器
from apscheduler.jobstores.mongodb import MongoDBJobStore

# Redis任務存儲器
from apscheduler.jobstores.redis import RedisJobStore

# RethinkDB任務存儲器
from apscheduler.jobstores.rethinkdb import RethinkDBJobStore

# ZooKeeper任務存儲器
from apscheduler.jobstores.zookeeper import ZooKeeperJobStore

默認MemoryJobStore

一般使用

import datetime
from pytz import timezone

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore

job_stores = {
    'redis': RedisJobStore()
}


def hello_world():
    print("hello world")


scheduler = BlockingScheduler()
scheduler.configure(jobstores=job_stores)
scheduler.add_job(hello_world, "date", run_date=datetime.datetime.now() + datetime.timedelta(seconds=3),
                  timezone=timezone("Asia/Shanghai"), jobstore="redis")

scheduler.start()

SQLAlchemyJobStore使用

sqlalchemy + mysql

"""
SQLAlchemyJobStore(url=None, engine=None, tablename='apscheduler_jobs', 
	metadata=None, ..., tableschema=None, engine_options=None):

指定URL時,內部調用,create_engine

URL的字符串形式為 dialect[+driver]://user:password@host/dbname[?key=value..]
在哪里 dialect 是數據庫名稱,例如 mysql , oracle , postgresql 等,
以及 driver DBAPI的名稱,例如 psycopg2 , pyodbc , cx_oracle 或者

# 使用DB API格式建立建立連接, 見PEP: https://www.python.org/dev/peps/pep-0249/
"""

import datetime
from pytz import timezone

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

MYSQL = {
    "url": "mysql+pymysql://root:123456@localhost/test"

}
job_stores = {
    'mysql': SQLAlchemyJobStore(**MYSQL)
}


def hello_world():
    print("hello world")


scheduler = BlockingScheduler()
scheduler.configure(jobstores=job_stores)
scheduler.add_job(hello_world, "date", run_date=datetime.datetime.now() + datetime.timedelta(seconds=3),
                  timezone=timezone("Asia/Shanghai"), jobstore="mysql")

scheduler.start()

RedisJobStore使用

"""
RedisJobStore(db=0, jobs_key='apscheduler.jobs', run_times_key='apscheduler.run_times', ..., **connect_args)

調用 Redis(db=int(db), **connect_args)

Redis的參數:
host='localhost', port=6379,
db=0, password=None, socket_timeout=None,
socket_connect_timeout=None,
socket_keepalive=None, socket_keepalive_options=None,
connection_pool=None, unix_socket_path=None,
encoding='utf-8', encoding_errors='strict',
charset=None, errors=None,
decode_responses=False, retry_on_timeout=False,
ssl=False, ssl_keyfile=None, ssl_certfile=None,
ssl_cert_reqs='required', ssl_ca_certs=None,
ssl_check_hostname=False,
max_connections=None, single_connection_client=False,
health_check_interval=0, client_name=None, username=None
"""

import datetime
from pytz import timezone

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore

REDIS = {
    'host': '127.0.0.1',
    'port': '6379',
    'db': 0,
}
job_stores = {
    'redis': RedisJobStore(**REDIS)
}


def hello_world():
    print("hello world")


scheduler = BlockingScheduler()
scheduler.configure(jobstores=job_stores)
scheduler.add_job(hello_world, "date", run_date=datetime.datetime.now() + datetime.timedelta(seconds=3),
                  timezone=timezone("Asia/Shanghai"), jobstore="redis")

scheduler.start()

其它自己查資料

全局配置

from apscheduler.schedulers.blocking import BlockingScheduler


job_defaults = {
    'coalesce': False,  # 關閉聚合(coalescing)功能
    'max_instances': 3,  # 默認限制最大實例數為 3
    "timezone": "UTC",  # 調度器的時區
}

scheduler = BlockingScheduler()
scheduler.configure(job_defaults=job_defaults)

關於coalescing, 見: 錯過的作業執行以及合並操作

調度器API

以下方法為調度器的API

添加任務

使用.add_job直接添加或使用.scheduled_job作為裝飾器添加任務,

例如:

# ....

def hello_world():
    print("hello_world")
	
	
scheduler = BlockingScheduler()
scheduler.add_job(hello_world, ...)

# ....

# ############## 或

# ...

scheduler = BlockingScheduler()

@scheduler.scheduled_job(...)
def hello_world():
    print("hello_world")
	
# ...

add_job簽名 (scheduled_job參數相同)

def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
			misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
			next_run_time=undefined, jobstore='default', executor='default',
			replace_existing=False, **trigger_args)
  • func: 任務函數
  • trigger: 觸發器
  • args: 給func的位置參數
  • kwargs: 給func的關鍵字參數
  • id: 指定任務的標識
  • name: 任務的說明
  • misfire_grace_time: 見: 錯過的作業執行以及合並操作
  • coalesce: 如果調度器確定任務應該連續運行一次以上,則運行一次而不是多次, 見: 錯過的作業執行以及合並操作
  • max_instances: 任務允許的最大並發運行實例數
  • next_run_time: 沒用過
  • jobstore 指定任務存儲器
  • executor 指定執行器
  • replace_existing : True時, 用相同的 id 替換現有任務
  • trigger_args: 給觸發器的關鍵字參數

.add_job返回apscheduler.job.Job實例, 見: Job

觸發器為空時, 立即執行

例子

import datetime
from apscheduler.schedulers.blocking import BlockingScheduler


def hello_world():
    print("hello_world")


scheduler = BlockingScheduler()
scheduler.add_job(hello_world, "date", run_date=datetime.datetime.now() + datetime.timedelta(seconds=3),
                  timezone="Asia/Shanghai")
scheduler.start()

觸發器

由於add_jobscheduled_jobtrigger_args參數, 所以可以通過使用關鍵字參數傳入到觸發器中
但是, 假如不用字符串的方式傳入觸發器而是用對象的話, 可以直接傳入
.add_job(f, 'date', run_date=...).add_job(f, DateTrigger(run_date=...), ...)的區別

觸發器中包含調度邏輯,每個作業都有自己的觸發器來決定下次運行時間。除了它們自己初始配置以外,觸發器完全是無狀態的。

  • date日期觸發

    只執行一次

    簽名:

    classapscheduler.triggers.date.DateTrigger(run_time: datetime.datetime, 
    	timezone: Union[datetime.tzinfo, str] = 'local')
    
    # run_time: 任務執行時間 datetime
    # timezone: 時區
    

    例子:

    from datetime import date
    
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    scheduler = BlockingScheduler()
    
    
    def my_job(text):
    	print(text)
    
    
    # 在2021年12月3日執行
    scheduler.add_job(my_job, 'date', run_date=date(2021, 12, 3), args=['text'])
    
    scheduler.start()
    
    

    立即執行

    from datetime import date
    
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    scheduler = BlockingScheduler()
    
    
    def my_job(text):
    	print(text)
    
    
    # 立刻運行
    scheduler.add_job(my_job, 'date', args=['text'], timezone="Asia/shanghai")
    
    scheduler.start()
    
    
  • interval間隔觸發

    每隔一段時間執行一次

    簽名:

    classapscheduler.triggers.interval.IntervalTrigger(*, weeks: float = 0, days: float = 0,
    	hours: float = 0, minutes: float = 0, seconds: float = 0,  
    	microseconds: float = 0, start_time: Optional[datetime.datetime] = None,
    	end_time: Optional[datetime.datetime] = None, timezone: Union[datetime.tzinfo, str] = 'local')
    
    
    # weeks 間隔禮拜數 (int)
    # days 間隔天數 (int)
    # hours 間隔小時數 (int)
    # minutes 間隔分鍾數 (int)
    # seconds 間隔秒數 (int)
    # start_date 周期執行的起始時間點(datetime|str) 
    # end_date 最后可能觸發時間 (datetime|str) 
    # timezone 計算date/time類型時需要使用的時區 (datetime.tzinfo|str)
    
    

    例子:

    from datetime import datetime
    
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    
    def job_function():
    	print("Hello World")
    
    
    schedulers = BlockingScheduler()
    
    # 每隔2秒, 執行一次
    schedulers.add_job(job_function, 'interval', seconds=2, timezone="Asia/shanghai")
    
    schedulers.start()
    
    
  • cron周期觸發

    使用類似crontab的格式定義觸發時間

    簽名:

    classapscheduler.triggers.cron.CronTrigger(*, year: Optional[Union[int, str]] = None,
    	month: Optional[Union[int, str]] = None, day: Optional[Union[int, str]] = None, 
    	week: Optional[Union[int, str]] = None, day_of_week: Optional[Union[int, str]] = None,
    	hour: Optional[Union[int, str]] = None, minute: Optional[Union[int, str]] = None, 
    	second: Optional[Union[int, str]] = None, 
    	start_time: Optional[Union[datetime.datetime, str]] = None,
    	end_time: Optional[Union[datetime.datetime, str]] = None,
    	timezone: Optional[Union[str, datetime.tzinfo]] = None)
    
    """
    參數:
    
    year(int|str)  4 位年份
    month(int|str)  2 位月份(1-12)
    day(int|str)  一個月內的第幾天(1-31)
    week(int|str)  ISO 禮拜數(1-53)
    day_of_week(int|str)  一周內的第幾天(0-6 或者 mon, tue, wed, thu, fri, sat, sun)
    hour(int|str)  小時(0-23)
    minute(int|str)  分鍾(0-59)
    second(int|str)  秒(0-59)
    start_date(datetime|str)  最早可能觸發的時間(date/time),含該時間點
    end_date(datetime|str)  最后可能觸發的時間(date/time),含該時間點
    timezone(datetime.tzinfo|str)  計算 date/time 時所指定的時區(默認為 scheduler 的時區)
    """
    

    不指定參數時, 為*

    一般使用:

    from apscheduler.schedulers.blocking import BlockingScheduler
    
    
    def job_function():
    	print("Hello World")
    
    
    scheduler = BlockingScheduler()
    
    # 每分鍾的第2秒執行一次
    scheduler.add_job(job_function, 'cron', second=2, timezone="Asia/shanghai")
    
    scheduler.start()
    
    

    假如熟練使用corn, 可以使用corntab語法,

    表達 應用字段 描述
    * any 任意時間
    */a any 每隔多長時間, 如: */10 4 * * *, 4點每隔10分鍾執行一次(4:10 4:20 ...)
    a-b any a-b范圍內的通配符
    a-b/c any a-b范圍內可被c整除的通配符
    xth y day 表示一個月內的第x個禮拜的星期y
    last x day 表示一個月內最后的星期x觸發
    last day 表示月末當天觸發
    x,y,z any 其他表達式可以組合的形式, 即不連續的時間

    例子:
    注意沒有cron, 直接指定觸發器

    from apscheduler.schedulers.blocking import BlockingScheduler
    from apscheduler.triggers.cron import CronTrigger
    
    
    def job_function():
    	print("Hello World")
    
    
    scheduler = BlockingScheduler()
    
    # 五個占位符:
    # 第一個 一小時的第幾分鍾
    # 第二個 一天的第幾個小時
    # 第三個 一月的第幾天
    # 第四個 一年的第幾月
    # 第五個 一周的星期幾
    
    # 例子:
    """
    45 22 * * * 每天22:45
    0 17 * * 1 每周一的17:00
    0 4 1,15 * * 1號或15號的4:00
    40 4 * * 1-5 周一到周五的4:40
    */10 4 * * * 四點的每10分鍾(4:10、4:20......)
    """
    
    # 每2分鍾執行一次
    scheduler.add_job(job_function, CronTrigger.from_crontab("*/2 * * * *", timezone="Asia/shanghai"))
    
    scheduler.start()
    
    
  • calendarinterval
    見: apscheduler.triggers.calendarinterval

  • combining
    見: apscheduler.triggers.combining

移除任務

當從 scheduler 中移除一個 job 時,它會從關聯的 job store 中被移除,不再被執行。

兩種方法:

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

# 或使用ID
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

修改任務

例子:

job = scheduler.add_job(myfunc, 'interval', minutes=2, id="my_job_id")
job.modify(args=["lczmx", ]max_instances=6, name='Alternate name')

# 根據ID修改
scheduler.modify_job("my_job_id", args=["lczmx", ])

# 重新調度
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

暫停或恢復任務

通過 Job 實例或者 scheduler 本身你可以輕易地暫停和恢復 job 。當一個 job 被暫停,它的下一次運行時間將會被清空,同時不再計算之后的運行時間,直到這個 job 被恢復。

from apscheduler.schedulers.blocking import BlockingScheduler


def job_function():
    print("Hello world")


scheduler = BlockingScheduler()
job = scheduler.add_job(job_function, "interval", seconds=2, timezone="Asia/shanghai", id="my_job_id")
# ################# 暫停 ###########
job.pause()
# 或
scheduler.pause_job("my_job_id")

# ################# 恢復 ###########
job.resume()
# 或
scheduler.resume_job("my_job_id")

scheduler.start()

查看任務信息

from apscheduler.schedulers.blocking import BlockingScheduler


def job_function():
    print("Hello world")


scheduler = BlockingScheduler()
job = scheduler.add_job(job_function, "interval", seconds=2, timezone="Asia/shanghai", id="my_job_id")
# 獲取某個任務的信息, 需要id, 可以指定job store
print(scheduler.get_job("my_job_id"))

# 獲取全部任務信息列表, 可以指定job store
print(scheduler.get_jobs())

# 格式化輸出任務信息, 可以指定job store
# !! 內部調用print
scheduler.print_jobs()

scheduler.start()

終止調度器

# 一般使用
# 默認會等待 目前 正在執行 所有任務執行完
scheduler.shutdown()

# 使用wait參數指定不等待
scheduler.shutdown(wait=False)

暫停/恢復調度器

from apscheduler.schedulers.blocking import BlockingScheduler


def job_function():
    print("Hello world")


scheduler = BlockingScheduler()
job = scheduler.add_job(job_function, "interval", seconds=2, timezone="Asia/shanghai", id="my_job_id")

# 休眠這個調度器
scheduler.pause()

# 恢復這個調度器
scheduler.resume()

# 使用 .start , 喚醒處於暫停狀態的調度器
scheduler.start(paused=True)

scheduler.start()

添加事件

你可以為 scheduler 綁定事件監聽器(event listen)。Scheduler 事件在某些情況下會被觸發,而且它可能攜帶有關特定事件的細節信息。

使用.add_listener來添加時間監聽, 參數:

  • callback 回調函數
  • mask 事件

所有事件有如下表

事件 說明 回調函數的參數(Event類)
EVENT_SCHEDULER_STARTED 調度器已啟動 SchedulerEvent
EVENT_SCHEDULER_SHUTDOWN 調度器被關閉 SchedulerEvent
EVENT_SCHEDULER_PAUSED 調度器任務暫停 SchedulerEvent
EVENT_SCHEDULER_RESUMED 調度器任務處理恢復 SchedulerEvent
EVENT_EXECUTOR_ADDED 在調度器中添加任務 SchedulerEvent
EVENT_EXECUTOR_REMOVED 在調度器中移除任務 SchedulerEvent
EVENT_JOBSTORE_ADDED 在調度器中添加任務存儲器 SchedulerEvent
EVENT_JOBSTORE_REMOVED 在調度器中移除任務存儲器 SchedulerEvent
EVENT_ALL_JOBS_REMOVED 調度器中的所有任務被移除 SchedulerEvent
EVENT_JOB_ADDED 在任務存儲器中添加任務 JobEvent
EVENT_JOB_REMOVED 在任務存儲器中移除任務 JobEvent
EVENT_JOB_MODIFIED 在調度器外部修改任務 JobEvent
EVENT_JOB_SUBMITTED 將任務提交到執行器 JobSubmissionEvent
EVENT_JOB_MAX_INSTANCES 執行器的可執行任務數達到最大值 JobSubmissionEvent
EVENT_JOB_EXECUTED 成功執行一個任務 JobExecutionEvent
EVENT_JOB_ERROR 一個任務在執行時發生錯誤 JobExecutionEvent
EVENT_JOB_MISSED 一個任務在執行時錯過 JobExecutionEvent
EVENT_ALL 所有事件 根據上面事件動態傳入類

與調度器相關事件: apscheduler.events.SchedulerEvent屬性

  • code 事件代碼
  • alias 任務存儲器或執行器添加或刪除的別名

與任務相關事件: apscheduler.events.JobEvent屬性

  • code 事件代碼
  • job_id 任務id
  • jobstore 任務存儲器別名

向執行器提交任務的相關事件: apscheduler.events.JobSubmissionEvent屬性

  • code 事件代碼
  • job_id 任務id
  • jobstore 任務存儲器別名
  • scheduled_run_times 任務調度的時間列表 (datetime.datetime列表)

任務執行在執行器的相關事件: apscheduler.events.JobExecutionEvent屬性

  • code 事件代碼
  • job_id 任務id
  • jobstore 任務存儲器別名
  • scheduled_run_time 任務調度的時間 (datetime.datetime)
  • retval 任務的返回值
  • exception 任務拋出的異常
  • traceback 異常追蹤信息

例子:

from apscheduler.schedulers.blocking import Blockin gScheduler
from apscheduler.events import *
from apscheduler.events import SchedulerEvent


def my_listener(event):
    if event.exception:
        print('發生異常')
    else:
        print('任務已經執行')


def job_function():
    print("Hello world")


scheduler = BlockingScheduler()
# 立即執行
job = scheduler.add_job(job_function, timezone="Asia/shanghai")

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

scheduler.start()

故障排查

如果 scheduler 沒有如預期般正常運行,可以嘗試將apschedulerlogger的日志級別提升到DEBUG等級。
如果你還沒有在一開始就將日志啟用起來,那么你可以:

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

這會提供 scheduler 運行時大量的有用信息。

最大允許實例

默認情況下,每個任務同時只會有一個實例在運行。這意味着如果 一個任務到達計划運行時間點時,前一個任務尚未完成,那么這個 任務最近的一次運行計划將會 misfire(錯過)。
可以通過在添加任務時指定max_instances關鍵字參數, 來設置具體任務的最大實例數目,以便 scheduler隨后可以並發地執行它。

錯過的作業執行以及合並操作

即: coalescing

有時候scheduler無法在被調度的任務的計划運行時間點去執行這個任務。
常見的原因是: 這個 任務是在持久化的job store中,恰好在其打算運行的時刻scheduler被關閉或重啟了。
這樣,這個 任務 就被定義為 misfired (錯過)。scheduler 稍后會檢查 任務每個被錯過的執行時間的misfire_grace_time選項(可以單獨給每個 任務設置或者給 scheduler 做全局設置),以此來確定這個執行操作是否要繼續被觸發。這可能到導致連續多次執行。
如果這個行為不符合你的實際需要,可以使用coalescing來, 回滾所有的被錯過的執行操作為唯一的一個操作。如果對 任務啟用coalescing ,那么即便 scheduler 在隊列中看到這個 任務一個或多個執行計划,scheduler 都只會觸發一次

注意:
如果因為進程(線程)池中沒有可用的進程(線程)而導致 任務的運行被推遲了,那么 執行器 會直接跳過它,因為相對於原計划的執行時間來說實在太 "晚" 了。
如果在你的應用程序中出現了這種情況,你可以增加 執行器的線程(進程)的數目,或者調整misfire_grace_time,設置一個更高的值。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM