閱讀目錄
一、apSheduler
二、實戰應用
apSheduler
1.安裝
pip install apscheduler
2.基礎組件
- triggers 觸發器
- job stores job存儲
- executors 執行器
- schedulers 調度器
3.選擇合適的調度器,存儲器,執行器,觸發器
3.1調度器(schedulers)
- BlockingScheduler: 進程中只有調度器
- BackgroundScheduler: 非以下框架,且希望運行在后台
- AsyncIOScheduler: 應用程序使用asyncio
- GeventScheduler: 應用程序使用gevent
- TornadoScheduler: 構建Tornado
- TwistedScheduler: 構建Twisted應用
- QtScheduler: 構建Qt應用
3.2存儲器(job stores)
- 持久化存儲job,通過SQLAlchemyJobStore設置存儲鏈接
- 非持久化存儲job,重啟時重新創建job,默認MemoryJobStore內存存儲
3.3執行器(executors)
- processpoolexecutor,CUP密集型業務,可選進程池,也可以同線程池同時使用
- threadpoolexecutor,默人線程池
3.4觸發器(triggers )
- date: 設置日期,針對某個時間點運行一次job
- interval: 固定時間間隔運行job
- cron: 類似linux-crontab,某個時間點定期運行job
4.配置調度器
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
jobstores = {
# 可以配置多個存儲
'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') # SQLAlchemyJobStore指定存儲鏈接
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20}, # 最大工作線程數20
'processpool': ProcessPoolExecutor(max_workers=5) # 最大工作進程數為5
}
job_defaults = {
'coalesce': False, # 關閉新job的合並,當job延誤或者異常原因未執行時
'max_instances': 3 # 並發運行新job默認最大實例多少
}
scheduler = BackgroundScheduler()
# .. do something else here, maybe add jobs etc.
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc作為調度程序的時區
5.調度器的增刪改查
import os
import time
from apscheduler.schedulers.background import BackgroundScheduler
def print_time(name):
print(f'{name} - {time.ctime()}')
def add_job(job_id, func, args, seconds):
"""添加job"""
print(f"添加job - {job_id}")
scheduler.add_job(id=job_id, func=func, args=args, trigger='interval', seconds=seconds)
def remove_job(job_id):
"""移除job"""
scheduler.remove_job(job_id)
print(f"移除job - {job_id}")
def pause_job(job_id):
"""停止job"""
scheduler.pause_job(job_id)
print(f"停止job - {job_id}")
def resume_job(job_id):
"""恢復job"""
scheduler.resume_job(job_id)
print(f"恢復job - {job_id}")
def get_jobs():
"""獲取所有job信息,包括已停止的"""
res = scheduler.get_jobs()
print(f"所有job - {res}")
def print_jobs():
print(f"詳細job信息")
scheduler.print_jobs()
def start():
"""啟動調度器"""
scheduler.start()
def shutdown():
"""關閉調度器"""
scheduler.shutdown()
if __name__ == '__main__':
scheduler = BackgroundScheduler()
start()
print('Press Ctrl+{0} to exit \n'.format('Break' if os.name == 'nt' else 'C'))
add_job('job_A', func=print_time, args=("A", ), seconds=1)
add_job('job_B', func=print_time, args=("B", ), seconds=2)
time.sleep(6)
pause_job('job_A')
get_jobs()
time.sleep(6)
print_jobs()
resume_job('job_A')
time.sleep(6)
remove_job('job_A')
time.sleep(6)
try:
shutdown()
except RuntimeError:
pass
6.調度事件
可以將事件偵聽器附加到調度程序。調度程序事件在某些情況下會被觸發,並且可能會在其中攜帶有關該特定事件詳細信息的附加信息。通過給add_listener()提供適當的掩碼參數或者將不同的常量放在一起,可以只監聽特定類型的事件。用一個參數調用偵聽器callable,即event對象。
def my_listener(event):
if event.exception:
print('The job crashed :(')
else:
print('The job worked :)')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
7.配置日志
import logging
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
實戰應用
1.fastapi動態添加定時任務
import asyncio
import datetime
import uvicorn
from fastapi import FastAPI, Body
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.triggers.cron import CronTrigger
app = FastAPI(title='fast-api')
scheduler = None
@app.on_event('startup')
def init_scheduler():
"""初始化"""
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') # SQLAlchemyJobStore指定存儲鏈接
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20}, # 最大工作線程數20
'processpool': ProcessPoolExecutor(max_workers=5) # 最大工作進程數為5
}
global scheduler
scheduler = AsyncIOScheduler()
scheduler.configure(jobstores=jobstores, executors=executors)
# 添加一個coroutine執行,結果很不理想...
scheduler.add_job(tick, 'interval', seconds=3)
print("啟動調度器...")
scheduler.start()
def print_time(name):
print(f'{name} - {datetime.datetime.now()}')
async def tick():
print('Tick! The time is: %s' % datetime.datetime.now())
await asyncio.sleep(1)
@app.post('/add-job')
async def add_job(job_id: str = Body(...), cron: str = Body(...)):
"""添加job"""
scheduler.add_job(id=job_id, func=print_time, args=(job_id, ), trigger=CronTrigger.from_crontab(cron))
return {"msg": "success!"}
@app.post('/remove-job')
async def remove_job(job_id: str = Body(..., embed=True)):
"""移除job"""
scheduler.remove_job(job_id)
return {"msg": "success!"}
if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=5566)
1.1問題
- AsyncIOScheduler()添加協程job會有問題
- apscheduler在多進程中會多次加載job,導致job重復執行,怎么解決呢?
- 官方推薦可以用http、rpyc、grpc等方式解決
2.搭建rpyc服務添加定時任務
2.1定時業務代碼在rpyc服務中
2.2通過rpyc服務回調添加定時任務
server.py
import datetime
import uvicorn
from fastapi import FastAPI, Body
import rpyc
app = FastAPI(title='fast-api')
conn = None
bgsrv = None
mon = None
@app.on_event('startup')
def init_scheduler():
"""初始化"""
global conn,bgsrv,mon
conn = rpyc.connect("localhost", 12345)
# create a bg thread to process incoming events
bgsrv = rpyc.BgServingThread(conn) # 執行回調函數必須在開始線程
mon = conn.root.Monitor(print_time)
def print_time(name):
print(f'{name} - {datetime.datetime.now()}')
def from_crontab(cron):
values = cron.split(' ')
return {
'minute': values[0],
'hour': values[1],
'day': values[2],
'month': values[3],
'day_of_week': values[4],
}
@app.post('/add-job')
async def add_job(job_id: str = Body(...), cron: str = Body(...)):
"""添加job"""
mon.add_job(id=job_id, args=(job_id, ), trigger='cron', **from_crontab(cron))
return {"msg": "success!"}
@app.post('/remove-job')
async def remove_job(job_id: str = Body(..., embed=True)):
"""移除job"""
mon.remove_job(job_id)
return {"msg": "success!"}
if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=5566)
rpc.py
import rpyc
from rpyc.utils.server import ThreadedServer
from apscheduler.schedulers.background import BackgroundScheduler
class SchedulerService(rpyc.Service):
class exposed_Monitor(object): # exposing names is not limited to methods :)
def __init__(self, callback):
# 這里需要用rpyc.async_異步加載回調函數
self.callback = rpyc.async_(callback)
def exposed_add_job(self, *args, **kwargs):
print("添加任務:", args, kwargs)
return scheduler.add_job(self.callback, *args, **kwargs)
def exposed_pause_job(self, job_id, jobstore=None):
return scheduler.pause_job(job_id, jobstore)
def exposed_resume_job(self, job_id, jobstore=None):
return scheduler.resume_job(job_id, jobstore)
def exposed_remove_job(self, job_id, jobstore=None):
scheduler.remove_job(job_id, jobstore)
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.start()
protocol_config = {'allow_public_attrs': True}
server = ThreadedServer(SchedulerService, port=12345, protocol_config=protocol_config)
try:
server.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
scheduler.shutdown()
2.3 rpyc服務添加定時任務為http請求
2.3.1rpyc服務后端
2.3.2調用示例
#!usr/bin/env python
import os
import rpyc
import logging
from app.conf import common_config
fast_api_env = os.environ.get('FAST_API_ENV')
def from_crontab(cron):
values = cron.split(' ')
return {
'minute': values[0],
'hour': values[1],
'day': values[2],
'month': values[3],
'day_of_week': values[4],
}
def run_task():
"""啟動任務"""
try:
rpyc_conn = rpyc.connect(common_config.HOST, common_config.PORT)
except ConnectionRefusedError:
logging.error('run scheduler tasks err, connect err!')
return False
# bgsrv = rpyc.BgServingThread(rpyc_conn) # 異步回調需要開啟線程處理器
# creates a bg thread to process incoming events
logging.info('run scheduler tasks')
# 添加job1
rpyc_conn.root.add_request_job(id=f'job1', task_id='job1',, trigger='cron',**from_crontab('* * * * *'))
# 添加job2
rpyc_conn.root.add_request_job(id=f'job2', task_id='job2', trigger='cron', **from_crontab('* * * * *'))
# bgsrv.stop()
rpyc_conn.close()
logging.info('run scheduler tasks success!')
2.3.3服務端接口
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.schemas.response import Response
from app.api.utils.common import get_db
# 具體的定時任務內容
from app.service.jobs import job1, job2
router = APIRouter()
@router.get("/run-task", response_model=Response)
def run_task(
task_id: str = Query(...),
db: Session = Depends(get_db),
):
"""調用接口,自定義的task_id對應一個job任務,可選擇數據庫存儲映射關系"""
if task_id == 'job1':
job1(db)
elif task_id == 'job2':
job2(db)
else:
return {'code': 1, 'error': '無效任務類型'}
return {'msg': 'completed!'}
2.4問題
- 2.1這種方式會導致業務代碼需要在rpyc服務在寫一遍,肯定造成業務代碼脫離項目
- 如果定時業務代碼和原項目代碼不關聯,可以采用此種方案
- 2.2通過rpyc服務回調添加定時任務,這種方式的弊端是FastAPI服務端與rpyc服務需要長連接,且不能做持久化存儲,服務之間強關聯
- 如果不考慮長連接影響可以采用此方案
- 2.3rpyc服務添加定時任務為http請求,這種方式會繞一個大彎
- 解決了服務之間強關聯,個人推薦使用方案,如果覺得low,建議采用其它方案(celery+MQ)做定時任務