from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler
def my_job():
print 'hello world'
def your_job():
print 'hello python'
'''
sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=1)
sched.add_job(your_job, 'interval', seconds=2)
sched.start()
'''
'''
sched = BlockingScheduler()
# Runs from Monday to Friday at 5:30 (am) until 2016-09-30 00:00:00
sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=21, minute=39 end_date='2016-09-30' )
sched.start()
'''
'''
sched = BlockingScheduler()
# Runs from 2016-09-21 at 21:46 (pm) until ....
sched.add_job(my_job, 'cron', start_date ='2016-09-21', hour=21, minute=46 )
sched.start()
'''
sched = BlockingScheduler()
#從2016-09-21 22:16:00 開始執行,間隔5秒執行一次
sched.add_job(my_job, 'interval', start_date ='2016-09-21 22:16:00', seconds=5 )
sched.start()
轉: http://debugo.com/apscheduler/
zzzsdfaskdkdkdddddddddw111ddd
APScheduler是一個Python定時任務框架,使用起來十分方便。提供了基於日期、固定時間間隔以及crontab類型的任務,並且可以持久化任務、並以daemon方式運行應用。目前最新版本為3.0.x。
在APScheduler中有四個組件:
觸發器(trigger)包含調度邏輯,每一個作業有它自己的觸發器,用於決定接下來哪一個作業會運行。除了他們自己初始配置意外,觸發器完全是無狀態的。
作業存儲(job store)存儲被調度的作業,默認的作業存儲是簡單地把作業保存在內存中,其他的作業存儲是將作業保存在數據庫中。一個作業的數據講在保存在持久化作業存儲時被序列化,並在加載時被反序列化。調度器不能分享同一個作業存儲。
執行器(executor)處理作業的運行,他們通常通過在作業中提交制定的可調用對象到一個線程或者進城池來進行。當作業完成時,執行器將會通知調度器。
調度器(scheduler)是其他的組成部分。你通常在應用只有一個調度器,應用的開發者通常不會直接處理作業存儲、調度器和觸發器,相反,調度器提供了處理這些的合適的接口。配置作業存儲和執行器可以在調度器中完成,例如添加、修改和移除作業。
你需要選擇合適的調度器,這取決於你的應用環境和你使用APScheduler的目的。通常最常用的兩個:
– BlockingScheduler: 當調度器是你應用中唯一要運行的東西時使用。
– BackgroundScheduler: 當你不運行任何其他框架時使用,並希望調度器在你應用的后台執行。
安裝APScheduler非常簡單:pip install apscheduler
選擇合適的作業存儲,你需要決定是否需要作業持久化。如果你總是在應用開始時重建job,你可以直接使用默認的作業存儲(MemoryJobStore).但是如果你需要將你的作業持久化,以避免應用崩潰和調度器重啟時,你可以根據你的應用環境來選擇具體的作業存儲。例如:使用Mongo或者SQLAlchemyJobStore (用於支持大多數RDBMS)
然而,調度器的選擇通常是為你如果你使用上面的框架之一。然而,默認的ThreadPoolExecutor 通常用於大多數用途。如果你的工作負載中有較大的CPU密集型操作,你可以考慮用ProcessPoolExecutor來使用更多的CPU核。你也可以在同一時間使用兩者,將進程池調度器作為第二執行器。
配置調度器
APScheduler提供了許多不同的方式來配置調度器,你可以使用一個配置字典或者作為參數關鍵字的方式傳入。你也可以先創建調度器,再配置和添加作業,這樣你可以在不同的環境中得到更大的靈活性。
下面是一個簡單使用BlockingScheduler,並使用默認內存存儲和默認執行器。(默認選項分別是MemoryJobStore和ThreadPoolExecutor,其中線程池的最大線程數為10)。配置完成后使用start()方法來啟動。
1
2
3
4
5
6
7
|
from apscheduler.schedulers.blocking import BlockingScheduler
def my_job():
print 'hello world'
sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5)
sched.start()
|
在運行程序5秒后,將會輸出第一個Hello world。
下面進行一個更復雜的配置,使用兩個作業存儲和兩個調度器。在這個配置中,作業將使用mongo作業存儲,信息寫入到MongoDB中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
from pymongo import MongoClient
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
def my_job():
print 'hello world'
host = '127.0.0.1'
port = 27017
client = MongoClient(host, port)
jobstores = {
'mongo': MongoDBJobStore(collection='job', database='test', client=client),
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(10),
'processpool': ProcessPoolExecutor(3)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(my_job, 'interval', seconds=5)
try:
scheduler.start()
except SystemExit:
client.close()
|
查詢MongoDB可以看到作業的運行情況如下:
1
2
3
4
5
|
{
"_id" : "55ca54ee4bb744f8a5ab08cc4319bc24",
"next_run_time" : 1434017278.797,
"job_state" : new BinData(0, "gAJ9cQEoVQRhcmdzcQIpVQhleGVjdXRvcnEDVQdkZWZhdWx0cQRVDW1heF9pbnN0YW5jZXNxBUsDVQRmdW5jcQZVD19fbWFpbl9fOm15X2pvYnEHVQJpZHEIVSA1NWNhNTRlZTRiYjc0NGY4YTVhYjA4Y2M0MzE5YmMyNHEJVQ1uZXh0X3J1bl90aW1lcQpjZGF0ZXRpbWUKZGF0ZXRpbWUKcQtVCgffBgsSBzoMKUhjcHl0egpfcApxDChVDUFzaWEvU2hhbmdoYWlxDU2AcEsAVQNDU1RxDnRScQ+GUnEQVQRuYW1lcRFVBm15X2pvYnESVRJtaXNmaXJlX2dyYWNlX3RpbWVxE0sBVQd0cmlnZ2VycRRjYXBzY2hlZHVsZXIudHJpZ2dlcnMuaW50ZXJ2YWwKSW50ZXJ2YWxUcmlnZ2VyCnEVKYFxFn1xF1UPaW50ZXJ2YWxfbGVuZ3RocRhHQBQAAAAAAABzfXEZKFUIdGltZXpvbmVxGmgMKGgNTehxSwBVA0xNVHEbdFJxHFUIaW50ZXJ2YWxxHWNkYXRldGltZQp0aW1lZGVsdGEKcR5LAEsFSwCHUnEfVQpzdGFydF9kYXRlcSBoC1UKB98GCxIHIQwpSGgPhlJxIVUIZW5kX2RhdGVxIk51hmJVCGNvYWxlc2NlcSOJVQd2ZXJzaW9ucSRLAVUGa3dhcmdzcSV9cSZ1Lg==")
}
|
操作作業
1. 添加作業
上面是通過add_job()來添加作業,另外還有一種方式是通過scheduled_job()修飾器來修飾函數。
1
2
3
4
|
@sched.scheduled_job('cron', id='my_job_id', day='last sun')
def some_decorated_task():
print("I am printed at 00:00:00 on the last Sunday of every month!")
|
2. 移除作業
1
2
3
4
5
6
|
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
Same, using an explicit job ID:
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
|
3. 暫停和恢復作業
暫停作業:
– apscheduler.job.Job.pause()
– apscheduler.schedulers.base.BaseScheduler.pause_job()
恢復作業:
– apscheduler.job.Job.resume()
– apscheduler.schedulers.base.BaseScheduler.resume_job()
4. 獲得job列表
獲得調度作業的列表,可以使用get_jobs()
來完成,它會返回所有的job實例。或者使用print_jobs()
來輸出所有格式化的作業列表。
5. 修改作業
1
2
|
def some_decorated_task():
print("I am printed at 00:00:00 on the last Sunday of every month!")</pre>
|
6. 關閉調度器
默認情況下調度器會等待所有正在運行的作業完成后,關閉所有的調度器和作業存儲。如果你不想等待,可以將wait選項設置為False。
1
2
|
scheduler.shutdown()
scheduler.shutdown(wait=False)
|
作業運行的控制
add_job的第二個參數是trigger,它管理着作業的調度方式。它可以為date, interval或者cron。對於不同的trigger,對應的參數也相同。
(1). cron定時調度
year (int|str) – 4-digit year
month (int|str) – month (1-12)
day (int|str) – day of the (1-31)
week (int|str) – ISO week (1-53)
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – hour (0-23)
minute (int|str) – minute (0-59)
second (int|str) – second (0-59)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
和Linux的Crontab一樣,它的值格式為:
Expression | Field | Description |
---|---|---|
* | any | Fire on every value |
*/a | any | Fire every a values, starting from the minimum |
a-b | any | Fire on any value within the a-b range (a must be smaller than b) |
a-b/c | any | Fire every c values within the a-b range |
xth y | day | Fire on the x -th occurrence of weekday y within the month |
last x | day | Fire on the last occurrence of weekday x within the month |
last | day | Fire on the last day within the month |
x,y,z | any | Fire on any matching expression; can combine any number of any of the above expressions |
幾個例子如下:
1
2
3
4
5
|
# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# Runs from Monday to Friday at 5:30 (am) until 2014-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
|
(2). interval 間隔調度
它的參數如下:
weeks (int) – number of weeks to wait
days (int) – number of days to wait
hours (int) – number of hours to wait
minutes (int) – number of minutes to wait
seconds (int) – number of seconds to wait
start_date (datetime|str) – starting point for the interval calculation
end_date (datetime|str) – latest possible date/time to trigger on
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
例子:
1
2
|
# Schedule job_function to be called every two hours
sched.add_job(job_function, 'interval', hours=2)
|
(3). date 定時調度
最基本的一種調度,作業只會執行一次。它的參數如下:
run_date (datetime|str) – the date/time to run the job at
timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
例子:
1
2
3
4
|
# The job will be executed on November 6th, 2009
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
# The job will be executed on November 6th, 2009 at 16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
|
^^