前提:使用 MySQL
作為存儲器
import time
from datetime import date, datetime
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from pytz import utc
# 調度器配置方式一
# scheduler = BackgroundScheduler({
# 'apscheduler.jobstores.default': {
# 'type': 'sqlalchemy',
# 'url': 'mysql+pymysql://root:root@127.0.0.1:3306/test_apscheduler?charset=utf8',
# 'tablename': 'api_job'
# },
# 'apscheduler.executors.default': {
# 'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
# 'max_workers': '20'
# },
# 'apscheduler.executors.processpool': {
# 'type': 'processpool',
# 'max_workers': '10'
# },
# 'apscheduler.job_defaults.coalesce': 'false',
# 'apscheduler.job_defaults.max_instances': '10',
# 'apscheduler.timezone': 'UTC',
# })
# 調度器配置方式二
# 作業存儲器
jobstores = {
'default': SQLAlchemyJobStore(url='mysql+pymysql://root:root@127.0.0.1:3306/test_apscheduler?charset=utf8')
}
# 執行器
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False, # 關閉作業合並
'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
def my_listener(event):
"""調度事件監聽"""
if event.exception:
print('The job crashed :(')
else:
print('The job worked :)')
def my_job():
print('當前時間:', datetime.now())
# replace_existing表示如果有重名的任務,直接覆蓋
# 其中id表示任務的唯一標識符,coalesce表示忽略服務器宕機時間段內的任務執行(否則就會出現服務器恢復之后一下子執行多次任務的情況)
scheduler.add_job(my_job, 'interval', seconds=3, id="135", coalesce=True, replace_existing=True)
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.start()
while True:
time.sleep(15)
注意:
BackgroundScheduler
運行在Backgroud
,但是並不會阻止主程序自己終止,而主程序終止后,BackgroundScheduler
也會終止。
參考