APscheduler使用總結
APscheduler是執行定時任務的python庫,其作用可以代替Linux系統下的crontab,github上有該庫的例子。
APsheduler基本使用
該模塊由4個基本組件組成:
- triggers 觸發器
- job stores 任務儲存
- executors 執行器
- schedulers 調度器
其中triggers定義了定時任務的類別、觸發條件以及具體要執行的任務名。
job stores可以將任務儲存起來,有些需要重復執行的任務,或有數據生成的任務,有將數據儲存的要求,可以通過數據庫或文件的形式將數據儲存起來以便下次使用。
executors負責具體的任務執行。
schedulers是最高級的組件,負責其它的管理和調度工作,該模塊有幾種可選的調度器可供選擇,使用方法都是一樣的:
BlockingScheduler: use when the scheduler is the only thing running in your process
BackgroundScheduler: use then you’re not using any of the frameworks below, and want the scheduler to run in the background inside your application
AsyncIOScheduler: use if your application uses the asyncio module
GeventScheduler: use if your application uses gevent
TornadoScheduler: use if you’re building a Tornado application
TwistedScheduler: use if you’re building a Twisted application
QtScheduler: use if you’re building a Qt application
定時任務的執行內容放在一個函數中使用,而調度的類型和時間等設置則在創建這個job的時候進行定義,在這個模塊中,一個具體的任務是一個JOB實例,因此可以通過修改這個實例的屬性對任務進行重新設置,而每一個job實例都是依賴於一個scheduler,所以也可以通過sheduler對sheduler進行重新設定。
定時任務的類型說明
參考源碼中scheduler.add_job()的參數說明
'cron'跟linux下crontab一樣的定時類型寫法,
'interval'使用時間間隔的寫法
...
說明
from apscheduler.schedulers.background import BackgroundScheduler
# 創建scheduler
scheduler = BackgroundScheduler()
# 創建job
# 方法1
job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
# 方法2
@scheduler.scheduled_job
def myfunc():
# do something
pass
# 啟動
scheduler.start()
# 添加job
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
# 刪除job
scheduler.remove_job('my_job_id')
# 暫停job
scheduler.pause_job()
# 恢復job
scheduler.resume_job()
# 查看jobs
scheduler.get_jobs()
scheduler.print_jobs()
# 修改job
scheduler.modify_job(max_instances=6, name='Alternate name')
# 重新設置定時任務類型
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
# 關閉
scheduler.shutdown(wait=False)
scheduler配置
配置內容
- a MongoDBJobStore named “mongo”
- an SQLAlchemyJobStore named “default” (using SQLite)
- a ThreadPoolExecutor named “default”, with a worker count of 20
- a ProcessPoolExecutor named “processpool”, with a worker count of 5
- utc標准時區
- coalescing turned off for new jobs by default
- 默認最多3個jobs
方法1
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
方法2
from apscheduler.schedulers.background import BackgroundScheduler
# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
'apscheduler.jobstores.mongo': {
'type': 'mongodb'
},
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '5'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'UTC',
})
方法3
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
jobstores = {
'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler()
# .. do something else here, maybe add jobs etc.
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
flask下使用APscheduler
flask下使用該模塊可以使用flask擴展模塊flask-apsheduler
基本使用
from flask import Flask
from flask_apscheduler import APScheduler
class Config(object):
JOBS = [
{
'id': 'job1',
'func': 'jobs:job1',
'args': (1, 2),
'trigger': 'interval',
'seconds': 10
}
]
SCHEDULER_API_ENABLED = True
def job1(a, b):
print(str(a) + ' ' + str(b))
if __name__ == '__main__':
app = Flask(__name__)
app.config.from_object(Config())
scheduler = APScheduler()
# it is also possible to enable the API directly
# scheduler.api_enabled = True
scheduler.init_app(app)
scheduler.start()
app.run()
故障排除
在使用gunicorn對flask進行管理的時候如何保證只定時任務只啟動一次
問題在使用gunicorn對flask應用進行控制的時候如果設置了gunicorn的--worker參數大於1時,會出現一個定時任務執行多次的問題,此時要給gunicorn提供一個額外的--preload參數,這樣,flask的app在run之前的所有操作只會執行一次,因此定時任務就只會執行一次。
env/bin/gunicorn module_containing_app:app -b 0.0.0.0:8080 --workers 3 --preload
flask在調試模式下調用flask-apscheduler定時任務時會執行多次
問題與上面的問題類似,調試模式下只需給app.run()添加一個參數use_reloader即可。
app.run(debug=True, use_reloader=False)