閱讀目錄
一、apSheduler
二、Flask-APScheduler
三、動態定時任務
四、uwsgi部署注意事項
一、apSheduler
第一部分內容限於apSheduler3.0以下版本,以上版本可移步至 FastAPI+apSheduler動態定時任務
1. 引子(Introduction)
Advanced Python Scheduler (APScheduler) 是一個輕量級但功能強大的進程內任務調度器,允許您調度函數(或任何其他python可調用文件)在您選擇的時間執行。
2. 特性(Features)
- 沒有(硬)外部依賴性
- api線程安全
- 支持CPython、Jython、PyPy
- 可配置的調度機制(觸發器):
- 類似cron調度
- 單次運行延遲調度(如UNIX“at”命令)
- 基於時間間隔(以指定的時間間隔運行)
- 支持多種存儲空間
- RAM
- 基於文件的簡單數據庫
- SQLAlchem
- MongoDB
- Redis
3. 使用(Usage)
3.1 安裝
- pip install apscheduler
3.2 啟動調度程序
from apscheduler.scheduler import Scheduler
sched = Scheduler()
sched.start()
3.3 調度job
3.3.1 簡單日期調度job
在指定時間執行一次job。這是相當於UNIX“at”命令的進程內命令
from datetime import date
from apscheduler.scheduler import Scheduler
# Start the scheduler
sched = Scheduler()
sched.start()
# Define the function that is to be executed
def my_job(text):
print text
# The job will be executed on November 6th, 2009
exec_date = date(2009, 11, 6)
# 添加一個job
job = sched.add_date_job(my_job, exec_date, ['text'])
更具體地安排時間
from datetime import datetime
# The job will be executed on November 6th, 2009 at 16:30:05
job = sched.add_date_job(my_job, datetime(2009, 11, 6, 16, 30, 5), ['text'])
甚至可以將日期指定為字符串文本
job = sched.add_date_job(my_job, '2009-11-06 16:30:05', ['text'])
# 支持微秒級別
job = sched.add_date_job(my_job, '2009-11-06 16:30:05.720400', ['text'])
3.3.2 基於時間間隔的調度job
job的執行在給定延遲后開始,或者在start_date(如果指定)開始,start_date參數可以作為date/datetime對象或字符串文本給出。
from datetime import datetime
from apscheduler.scheduler import Scheduler
# Start the scheduler
sched = Scheduler()
sched.start()
def job_function():
print "Hello World"
# Schedule job_function to be called every two hours
sched.add_interval_job(job_function, hours=2)
# The same as before, but start after a certain time point
sched.add_interval_job(job_function, hours=2, start_date='2010-10-10 09:30')
裝飾語法
from apscheduler.scheduler import Scheduler
# Start the scheduler
sched = Scheduler()
sched.start()
# Schedule job_function to be called every two hours
@sched.interval_schedule(hours=2)
def job_function():
print "Hello World"
如果需要取消對裝飾功能的job,可以這樣做
scheduler.unschedule_job(job_function.job)
3.3.3 cron調度job
與crontab表達式不同,您可以省略不需要的字段。大於最低有效明確定義字段的字段默認為,而較小的字段默認為其最小值,除了默認為。例如,如果僅指定day=1,minute=20,則作業將在每年每月的第一天以每小時20分鍾的速度執行。下面的代碼示例應該進一步說明這種行為。
省略字段默認為*
from apscheduler.scheduler import Scheduler
# Start the scheduler
sched = Scheduler()
sched.start()
def job_function():
print "Hello World"
# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_cron_job(job_function, month='6-8,11-12', day='3rd fri', hour='0-3')
# Schedule a backup to run once from Monday to Friday at 5:30 (am)
sched.add_cron_job(job_function, day_of_week='mon-fri', hour=5, minute=30)
裝飾語法
@sched.cron_schedule(day='last sun')
def some_decorated_task():
print "I am printed at 00:00:00 on the last Sunday of every month!"
如果需要取消對裝飾功能的job,可以這樣做
scheduler.unschedule_job(job_function.job)
3.3.4 使用自定義觸發器調度
以上事例基於內置觸發器調度job,如果需要使用自定義觸發器調度需要使用add_job()方法
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def aps_test(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5')
scheduler.add_job(func=aps_test, args=('一次性任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))
scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3)
scheduler.start()
3.4 關閉調度器
sched.shutdown()
# 默認情況下,調度程序關閉其線程池,並等待直到所有當前正在執行的job完成。為了更快地退出,可以:
sched.shutdown(wait=False)
# 這仍然會關閉線程池,但不會等待任何正在運行的任務完成。此外,如果您給調度程序一個要在其他地方管理的線程池,您可能希望完全跳過線程池關閉:
sched.shutdown(shutdown_threadpool=False)
# 自動關閉調度程序的一個巧妙方法是為此使用atexit掛鈎:
import atexit
sched = Scheduler(daemon=True)
atexit.register(lambda: sched.shutdown(wait=False))
# Proceed with starting the actual application
3.5 Job stores
如果沒有指定stores存儲位置,則將轉到默認job存儲 -> ramjobstore不提供持久化保存
其它存儲stores:
ShelveJobStore
SQLAlchemyJobStore
MongoDBJobStore
RedisJobStore
通過配置選項或add_jobstore()方法添加作業存儲。因此,以下是相等的:
config = {'apscheduler.jobstores.file.class': 'apscheduler.jobstores.shelve_store:ShelveJobStore',
'apscheduler.jobstores.file.path': '/tmp/dbfile'}
sched = Scheduler(config)
3.6 獲取調度器列表
sched.print_jobs()
二、Flask-APScheduler
1. 引子(Introduction)
- Flask-APScheduler 是Flask框架的一個擴展庫,增加了Flask對apScheduler的支持
2. 特性(Features)
- 根據Flask配置加載調度器配置
- 根據Flask配置加載調度器job
- 允許指定調度程序將運行的主機名
- 提供REST API來管理調度job
- 為REST API提供認證
3. 安裝(Installation)
pip install Flask-APScheduler
4. 使用(Usage)
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from flask import Flask
from flask_apscheduler import APScheduler
class Config(object):
# 配置執行job
JOBS = [
{
'id': 'job1',
'func': 'advanced:job1',
'args': (1, 2),
'trigger': 'interval',
'seconds': 10
}
]
# 存儲位置
SCHEDULER_JOBSTORES = {
'default': SQLAlchemyJobStore(url='sqlite://')
}
# 線程池配置
SCHEDULER_EXECUTORS = {
'default': {'type': 'threadpool', 'max_workers': 20}
}
SCHEDULER_JOB_DEFAULTS = {
'coalesce': False,
'max_instances': 3
}
# 調度器開關
SCHEDULER_API_ENABLED = True
def job1(a, b):
print(str(a) + ' ' + str(b))
if __name__ == '__main__':
app = Flask(__name__)
app.config.from_object(Config())
scheduler = APScheduler()
# 注冊app
scheduler.init_app(app)
scheduler.start()
app.run()
三、動態定時任務
- Flask + flask_apscheduler實現一個類似Jenkins的定時任務的功能,前端設置crontab,后端可以創建,修改,暫停,移除,恢復一個執行任務
文件目錄
|--app
|----config.py 配置文件
|----run_tasks.py 開啟任務
|----tasks.py 任務job
|----apSheduler.py 提供接口函數
|----extensions.py flask擴展
|----__init__.py 初始化文件
|----views.py 業務代碼
|--manage.py 項目啟動文件
1. config.py配置flask_apscheduler
class Config(object):
# 開關
SCHEDULER_API_ENABLED = True
# 持久化配置
SCHEDULER_JOBSTORES = {
'default': SQLAlchemyJobStore(url='sqlite:///flask_context.db')
}
SCHEDULER_EXECUTORS = {
'default': {'type': 'threadpool', 'max_workers': 20}
}
2. init.py創建app
from app.config import Config
from app.extensions import scheduler
# 創建app
def create_app(config=None, app_name=None, blueprints=None):
app = Flask(app_name, static_folder='thanos/static',
template_folder='thanos/resource/report')
# 導入flask配置 -> 這里根據自己的項目導入配置就好哇
# config = Config.get_config_from_host(app.name)
app.config.from_object(config)
# 初始化調度器配置
configure_scheduler(app)
def configure_scheduler(app):
"""Configure Scheduler"""
scheduler.init_app(app)
scheduler.start()
# 加載任務,選擇了第一次請求flask后端時加載,可以選擇別的方式...
@app.before_first_request
def load_tasks():
# 開啟任務
from app import run_tasks
3. extensions.py實例化scheduler
from flask_apscheduler import APScheduler
scheduler = APScheduler()
4. apSheduler.py提供調度器接口
"""此文件可以根據具體業務復雜化選擇寫或者直接調用原apscheduler接口"""
from flask import current_app
# from .extensions import scheduler 直接導入單例對象操作也行
class APScheduler(object):
"""調度器控制方法"""
def add_job(self, jobid, func, args, **kwargs):
"""
添加任務
:param args: 元祖 -> (1,2)
:param jobstore: 存儲位置
:param trigger:
data -> run_date datetime表達式
cron -> second/minute/day_of_week
interval -> seconds 延遲時間
next_run_time -> datetime.datetime.now() + datetime.timedelta(seconds=12))
:return:
"""
job_def = dict(kwargs)
job_def['id'] = jobid
job_def['func'] = func
job_def['args'] = args
job_def = self.fix_job_def(job_def)
self.remove_job(jobid) # 刪除原job
current_app.apscheduler.scheduler.add_job(**job_def)
def remove_job(self, jobid, jobstore=None):
"""刪除任務"""
current_app.apscheduler.remove_job(jobid, jobstore=jobstore)
def resume_job(self, jobid, jobstore=None):
"""恢復任務"""
current_app.apscheduler.resume_job(jobid, jobstore=jobstore)
def pause_job(self, jobid, jobstore=None):
"""恢復任務"""
current_app.apscheduler.pause_job(jobid, jobstore=jobstore)
def fix_job_def(self, job_def):
"""維修job工程"""
if job_def.get('trigger') == 'date':
job_def['run_date'] = job_def.get('run_date') or None
elif job_def.get('trigger') == 'cron':
job_def['hour'] = job_def.get('hour') or "*"
job_def['minute'] = job_def.get('minute') or "*"
job_def['week'] = job_def.get('week') or "*"
job_def['day'] = job_def.get('day') or "*"
job_def['month'] = job_def.get('month') or "*"
elif job_def.get('trigger') == 'interval':
job_def['seconds'] = job_def.get('seconds') or "*"
else:
if job_def.get("andTri"):
job_def['trigger'] = AndTrigger([job_def.pop("andTri", None), ])
# job_def['next_run_time'] = job_def.get('next_run_time') or None
return job_def
5. views.py 實現調度器接口
from app.apSheduler import APScheduler
# croniter庫解析Linux cron格式的計划
# 以添加為例子 暫停 刪除 恢復可以根據業務場景自己寫接口
def add_crontab_task(self, params):
"""添加一個crontab任務"""
try:
self.crontab = params.get("crontab")
self.id = params.get("id")
self.task_id = params.get("task_id")
except Exception as e:
return False, str(e)
# 記錄數據庫
res = addSql()
# 更新任務信息
APScheduler().add_job(jobid=self.id, func=task_func,
args=(self.task_id,), andTri=CronTrigger.from_crontab(self.crontab))
if res is False:
return False, "數據庫操作異常"
return True, croniter(self.crontab, datetime.now()).get_next(datetime)
def get_next_execute_time(self, params):
"""獲取下一次執行時間"""
try:
self.crontab = params.get("crontab")
except Exception as e:
return False, str(e)
return True, str(croniter(self.crontab, datetime.now()).get_next(datetime))
6. tasks.py 任務job
def task_func(task_id):
"""業務邏輯"""
# 發郵件、寫詩、畫畫 -> 愛干啥干啥
7. run_tasks.py 開啟任務調度大門
from .task import task_func
from apscheduler.triggers.cron import CronTrigger # 可以很友好的支持添加一個crontab表達式
def run_task():
# 查詢數據庫的crontab信息 -> 定時任務信息
res = fetall("select * from crontab_table")
# 遍歷添加任務
shche = APScheduler()
for rs in res:
shche.add_job(jobid=rs.get(id), func=task_func,
args=(rs.get(task_id)), andTri=CronTrigger.from_crontab(rs.get(crontab)))
# 最重要的
run_task() # 這樣當__init__.py創建app時加載這個文件,就會執行添加歷史任務啦!
8. manage.py 啟動項目
from app import create_app
app = create_app()
app.run()
四、uwsgi部署注意事項
1. 常見問題及解決方案
1.1 線上部署uWSGI+APScheduler執行定時任務卡死
1.1.1問題分析:
APScheduler運行環境需要為多線程,uwsgi默認是one thread ,one process,需要在配置文件里面加上一條 enable-thread = true,也就是允許程序內部啟動多線程。
1.1.2解決方案:
# uwsgi.ini文件追加以下配置
enable-threads = true
preload=True #用--preload啟動uWSGI,確保scheduler只在loader的時候創建一次
lazy-apps=true
1.2 定時任務多次執行的問題
1.2.1問題分析:
1.本地原因,錯過了上次執行時間,下次會多次執行
2.線上部署的,如uWSGI部署,配置了processes>1導致加載了多此apscheduler(apscheduler當前沒有任何進程間同步和信令方案)
1.2.3解決方案:
1. 本地多次執行可以在Flask啟動方法中加use_reloader=False
app.run(host="0.0.0.0", port=8888, use_reloader=False)
2.線上linux可以借鑒下面的方法,網上借鑒的
在__init__.py文件中修改中configure_scheduler(),用全局鎖確保scheduler只運行一次, 代碼如下:
import atexit
import fcntl # 只能用於linux
from .extensions import scheduler
def configure_scheduler(app):
"""Configure Scheduler"""
f = open("scheduler.lock", "wb")
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
scheduler.init_app(app)
scheduler.start()
# 加載任務
@app.before_first_request
def load_tasks():
from thanos import run_tasks
except:
pass
def unlock():
fcntl.flock(f, fcntl.LOCK_UN)
f.close()
atexit.register(unlock)
init函數為flask項目初始化所調用,這里為scheduler模塊的初始化部分。首先打開(或創建)一個scheduler.lock文件,並加上非阻塞互斥鎖。成功后創建scheduler並啟動。如果加文件鎖失敗,說明scheduler已經創建,就略過創建scheduler的部分。
最后注冊一個退出事件,如果這個flask項目退出,則解鎖並關閉scheduler.lock文件的鎖。
3.官網推薦rpyc/grpc解決
可以查看 FastAPI+apSheduler動態定時任務