作者:shhnwangjian
APScheduler定時框架
APScheduler是一個Python定時任務框架,使用起來十分方便。提供了基於日期,固定時間間隔及crontab類型的任務,並且可以持久化任務,並以daemon方式運行應用。
github:https://github.com/agronholm/apscheduler
官網文檔:https://apscheduler.readthedocs.io/en/latest/
安裝
pip install apscheduler
簡單示例(首先添加一個周一到周五定時執行任務)
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime # 輸出時間 def job(): print(datetime.now().strtime("%Y-%m-%d %H:%M:%S")) # BlockingScheduler scheduler = BlockingScheduler() scheduler.add_job(job, "cron", day_of_week="1-5", hour=6, minute=30) schduler.start()
BlockingScheduler是APScheduler中的調度器,APScheduler中有兩種常用的調度器,BlockingScheduler和BackgroundScheduler,當調度器是應用中唯一要運行的定時任務時,使用BlockingScheduler,如果希望調度器在后台執行,使用BackgroundScheduler。
APScheduler組成
APScheduler五個組成分別為:觸發器(trigger),作業存儲(job store),執行器(executor),調度器(scheduler)、任務或作業(task)。
任務(job)
描述一個任務本身。
觸發器(trigger)
包含調度邏輯,每一個作業有它自己的觸發器,用於決定接下來哪一個作業會運行。除了他們自己初始配置意外,觸發器完全是無狀態的
APScheduler 有三種內建的 trigger:
- interval: 固定時間間隔觸發
- date: 特定的時間點觸發
- cron: 在特定時間周期性地觸發
interval 間隔調度(每隔多久執行)
weeks (
int
) – number of weeks to wait
days (
int
) – number of days to wait
hours (
int
) – number of hours to wait
minutes (
int
) – number of minutes to wait
seconds (
int
) – number of seconds to wait
start_date (datetime|
str
) – starting point
for
the interval calculation
end_date (datetime|
str
) – latest possible date
/
time to trigger on
timezone (datetime.tzinfo|
str
) – time zone to use
for
the date
/
time calculations
|
例子:
|
#表示每隔3天17時19分07秒執行一次任務
sched.add_job(my_job,
'interval'
, days
=
03
, hours
=
17
, minutes
=
19
, seconds
=
07
)
#間隔3秒鍾執行一次
scheduler.add_job(job3,
'interval'
, seconds
=
3
)
|
date 定時調度(作業只會執行一次)
|
run_date (datetime|
str
) – the date
/
time to run the job at
-
(任務開始的時間)
timezone (datetime.tzinfo|
str
) – time zone
for
run_date
if
it doesn’t have one already
|
例子:
|
#在指定的時間,只執行一次
scheduler.add_job(tick,
'date'
, run_date
=
'2016-02-14 15:01:05'
)
# The job will be executed on November 6th, 2009
sched.add_job(my_job,
'date'
, run_date
=
date(
2009
,
11
,
6
), args
=
[
'text'
])
# The job will be executed on November 6th, 2009 at 16:30:05
sched.add_job(my_job,
'date'
, run_date
=
datetime(
2009
,
11
,
6
,
16
,
30
,
5
), args
=
[
'text'
])
|
cron定時調度(某一定時時刻執行)
|
(
int
|
str
) 表示參數既可以是
int
類型,也可以是
str
類型
(datetime |
str
) 表示參數既可以是datetime類型,也可以是
str
類型
year (
int
|
str
) –
4
-
digit year
-
(表示四位數的年份,如
2008
年)
month (
int
|
str
) – month (
1
-
12
)
-
(表示取值范圍為
1
-
12
月)
day (
int
|
str
) – day of the (
1
-
31
)
-
(表示取值范圍為
1
-
31
日)
week (
int
|
str
) – ISO week (
1
-
53
)
-
(格里歷
2006
年
12
月
31
日可以寫成
2006
年
-
W52
-
7
(擴展形式)或
2006W527
(緊湊形式))
day_of_week (
int
|
str
) – number
or
name of weekday (
0
-
6
or
mon,tue,wed,thu,fri,sat,sun)
-
(表示一周中的第幾天,既可以用
0
-
6
表示也可以用其英語縮寫表示)
hour (
int
|
str
) – hour (
0
-
23
)
-
(表示取值范圍為
0
-
23
時)
minute (
int
|
str
) – minute (
0
-
59
)
-
(表示取值范圍為
0
-
59
分)
second (
int
|
str
) – second (
0
-
59
)
-
(表示取值范圍為
0
-
59
秒)
start_date (datetime|
str
) – earliest possible date
/
time to trigger on (inclusive)
-
(表示開始時間)
end_date (datetime|
str
) – latest possible date
/
time to trigger on (inclusive)
-
(表示結束時間)
timezone (datetime.tzinfo|
str
) – time zone to use
for
the date
/
time calculations (defaults to scheduler timezone)
-
(表示時區取值)
|
參數的取值格式:
例子:
|
#表示2017年3月22日17時19分07秒執行該程序
sched.add_job(my_job,
'cron'
, year
=
2017
,month
=
03
,day
=
22
,hour
=
17
,minute
=
19
,second
=
07
)
#表示任務在6,7,8,11,12月份的第三個星期五的00:00,01:00,02:00,03:00 執行該程序
sched.add_job(my_job,
'cron'
, month
=
'6-8,11-12'
, day
=
'3rd fri'
, hour
=
'0-3'
)
#表示從星期一到星期五5:30(AM)直到2014-05-30 00:00:00
sched.add_job(my_job(),
'cron'
, day_of_week
=
'mon-fri'
, hour
=
5
, minute
=
30
,end_date
=
'2014-05-30'
)
#表示每5秒執行該程序一次,相當於interval 間隔調度中seconds = 5
sched.add_job(my_job,
'cron'
,second
=
'*/5'
)
|
作業儲存(job store)
存儲被調度的作業,默認的作業存儲是簡單地把作業保存在內存中,其他的作業存儲是將作業保存在數據庫中。一個作業的數據保存在持久化作業存儲時被序列化,並在加載時被反序列化。調度器不能分享同一個作業存儲。
APScheduler 默認使用 MemoryJobStore,可以修改使用 db 存儲方案。
jobstore提供給scheduler一個序列化jobs的統一抽象,提供對scheduler中job的增刪改查接口,根據存儲backend的不同,分以下幾種
- MemoryJobStore:沒有序列化,jobs就存在內存里,增刪改查也都是在內存中操作
- SQLAlchemyJobStore:所有sqlalchemy支持的數據庫都可以做為backend,增刪改查操作轉化為對應backend的sql語句
- MongoDBJobStore:用mongodb作backend
- RedisJobStore: 用redis作backend
- ZooKeeperJobStore:用ZooKeeper做backend
執行器(executor)
處理作業的運行,他們通常通過在作業中提交制定的可調用對象到一個線程或者進城池來進行。當作業完成時,執行器將會通知調度器。
最常用的 executor 有兩種:
- ProcessPoolExecutor
- ThreadPoolExecutor
調度器(scheduler)
通常在應用中只有一個調度器,應用的開發者通常不會直接處理作業存儲、調度器和觸發器,相反,調度器提供了處理這些的合適的接口。配置作業存儲和執行器可以在調度器中完成,例如添加、修改和移除作業。
調度器(scheduler)的IO模型
scheduler由於IO模型的不同,可以有多種實現:
- BlockingScheduler:main_loop就在當前進程的主線程內運行,所以調用start函數后會阻塞當前線程。通過一個threading.Event條件變量對象完成scheduler的定時喚醒。
- BackgroundScheduler:和BlockingScheduler基本一樣,除了main_loop放在了單獨線程里,所以調用start后主線程不會阻塞
- AsyncIOScheduler:使用asyncio作為IO模型的scheduler,和AsyncIOExecutor配合使用,用asynio中event_loop的call_later完成定時喚醒
- GeventScheduler:和BlockingScheduler基本一樣,使用gevent作為IO模型,和GeventExecutor配合使用
- QtScheduler:使用QTimer完成定時喚醒
- TornadoScheduler:使用tornado的IO模型,用ioloop.add_timeout完成定時喚醒
- TwistedScheduler:配合TwistedExecutor,用reactor.callLater完成定時喚醒
1、BlockingScheduler簡單應用:
|
from
apscheduler.schedulers.blocking
import
BlockingScheduler
def
my_job():
print
'hello world'
scheduler
=
BlockingScheduler()
scheduler.add_job(my_job,
'interval'
, seconds
=
5
)
scheduler.start()
|
上面的例子表示每隔5s執行一次my_job函數,輸出hello world
2、BackgroundScheduler簡單應用
from
apscheduler.schedulers.background
import
BackgroundScheduler
def
job3():
print
(time.strftime(
'%Y-%m-%d %H:%M:%S'
,time.localtime(time.time())))
print
(
"I'm working job_3"
)
scheduler
=
BackgroundScheduler()
scheduler.add_job(job3,
'interval'
, seconds
=
3
)
#間隔3秒鍾執行一次
scheduler.start()
|
配置調度器
APScheduler提供了許多不同的方式來配置調度器,你可以使用一個配置字典或者作為參數關鍵字的方式傳入。你也可以先創建調度器,再配置和添加作業,這樣你可以在不同的環境中得到更大的靈活性。
下面來看一個簡單的 BlockingScheduler 例子
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime def job(): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 定義BlockingScheduler sched = BlockingScheduler() sched.add_job(job, 'interval', seconds=5) sched.start()
上述代碼創建了一個 BlockingScheduler,並使用默認內存存儲和默認執行器。(默認選項分別是 MemoryJobStore 和 ThreadPoolExecutor,其中線程池的最大線程數為10)。配置完成后使用 start() 方法來啟動。
如果想要顯式設置 job store(使用mongo存儲)和 executor 可以這樣寫:
from datetime import datetime from pymongo import MongoClient from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor # MongoDB 參數 host = '127.0.0.1' port = 27017 client = MongoClient(host, port) # 輸出時間 def job(): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 存儲方式 jobstores = { 'mongo': MongoDBJobStore(collection='job', database='test', client=client), 'default': MemoryJobStore() } executors = { 'default': ThreadPoolExecutor(10), 'processpool': ProcessPoolExecutor(3) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) scheduler.add_job(job, 'interval', seconds=5, jobstore='mongo') scheduler.start()
配置
import
time
import
redis
from
pytz
import
utc
from
pymongo
import
MongoClient
from
apscheduler.schedulers.background
import
BackgroundScheduler
from
apscheduler.jobstores.mongodb
import
MongoDBJobStore
from
apscheduler.jobstores.memory
import
MemoryJobStore
from
apscheduler.jobstores.redis
import
RedisJobStore
from
apscheduler.executors.pool
import
ThreadPoolExecutor, ProcessPoolExecutor
from
apscheduler.events
import
EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_MISSED
client
=
MongoClient(host
=
'127.0.0.1'
, port
=
27017
)
pool
=
redis.ConnectionPool(host
=
'127.0.0.1'
, port
=
16379
)
jobstores
=
{
'mongo'
: MongoDBJobStore(collection
=
'job'
, database
=
'test'
, client
=
client),
'redis'
: RedisJobStore(connection_pool
=
pool),
'default'
: MemoryJobStore(),
'default_test'
: MemoryJobStore()
}
executors
=
{
'default'
: ThreadPoolExecutor(
200
),
'processpool'
: ProcessPoolExecutor(
10
)
}
job_defaults
=
{
'coalesce'
:
True
,
'max_instances'
:
1
,
'misfire_grace_time'
:
60
}
scheduler
=
BackgroundScheduler(jobstores
=
jobstores, executors
=
executors, job_defaults
=
job_defaults, timezone
=
utc)
|
- coalesce:當由於某種原因導致某個job積攢了好幾次沒有實際運行(比如說系統掛了5分鍾后恢復,有一個任務是每分鍾跑一次的,按道理說這5分鍾內本來是“計划”運行5次的,但實際沒有執行),如果coalesce為True,下次這個job被submit給executor時,只會執行1次,也就是最后這次,如果為False,那么會執行5次(不一定,因為還有其他條件,看后面misfire_grace_time的解釋)
- max_instance: 就是說同一個job同一時間最多有幾個實例再跑,比如一個耗時10分鍾的job,被指定每分鍾運行1次,如果我們max_instance值為5,那么在第6~10分鍾上,新的運行實例不會被執行,因為已經有5個實例在跑了。默認情況下同一個job,只允許一個job實例運行。這在某個job在下次運行時間到達之后仍未執行完畢時,能達到控制的目的。你也可以該變這一行為,在你調用add_job時可以傳遞max_instances=5來運行同時運行同一個job的5個job實例。
- misfire_grace_time:設想和上述coalesce類似的場景,如果一個job本來14:00有一次執行,但是由於某種原因沒有被調度上,現在14:01了,這個14:00的運行實例被提交時,會檢查它預訂運行的時間和當下時間的差值(這里是1分鍾),大於我們設置的30秒限制,那么這個運行實例不會被執行。
- executor的選擇:線程池和進程池。默認default是線程池方式。這個數是執行任務的實際並發數,如果你設置的小了而job添加的比較多,可能出現丟失調度的情況。同時對於python多線程場景,如果是計算密集型任務,實際的並發度達不到配置的數量。所以這個數字要根據具體的要求設置。
對job的操作
添加jop
添加job有兩種方式:
(1)add_job():調用函數添加,返回返回一個apscheduler.job.Job 的實例,可以用來改變或者移除 job。
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime # 輸出時間 def job(): print(datetime.now().strtime("%Y-%m-%d %H:%M:%S")) # BlockingScheduler scheduler = BlockingScheduler() scheduler.add_job(job, "cron", day_of_week="1-5", hour=6, minute=30) schduler.start()
(2)scheduled_job():裝飾器方式,只適用於應用運行期間不會改變的 job
from apscheduler.schedulers.blocking import BlockingScheduler sched = BlockingScheduler() # 裝飾器 @sched.scheduled_job('interval', id='my_job_id', seconds=5) def job_function(): print("Hello World") # 開始 sched.start()
移除job
移除job有兩種方法:
1,remove_job():apscheduler實例調用remove_job使用jobID移除。
# id scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.remove_job('my_job_id')
2,job.remove():apscheduler.job.Job 實例調用自身函數屬性。
job = scheduler.add_job(myfunc, 'interval', minutes=2) job.remove()
暫停jod
apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()
apscheduler.job.Job 是 add_job() 返回的實例
獲取job列表
獲得可調度 job 列表,可以使用get_jobs() 來完成,它會返回所有的 job 實例。
也可以使用print_jobs() 來輸出所有格式化的 job 列表。
修改job
除了 jobID 之外 job 的所有屬性都可以修改,使用 apscheduler.job.Job.modify() 或者 modify_job() 修改一個 job 的屬性
job.modify(max_instances=6, name='Alternate name') modify_job('my_job_id', trigger='cron', minute='*/5')
關閉job
默認情況下調度器會等待所有的 job 完成后,關閉所有的調度器和作業存儲。將 wait 選項設置為 False 可以立即關閉。
scheduler.shutdown()
scheduler.shutdown(wait=False)