安裝apscheduler 模塊
pip install apscheduler pip install django-apscheduler
將 django-apscheduler 加到項目中settings的INSTALLED_APPS中
INSTALLED_APPS = [ .... 'django_apscheduler', ]
執行:
# python manage.py migrate 沒有其他表結構不必運行 python manage.py makemigrations
會創建兩張表:django_apscheduler_djangojob/django_apscheduler_djangojobexecution
通過進入后台管理能方便管理定時任務。
在Django工程目錄下的urls.py文件中,或者說主urls.py中引入如下內容
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), "default")
# 時間間隔3秒鍾打印一次當前的時間
@register_job(scheduler, "interval", seconds=3, id='test_job')
def test_job():
print("我是apscheduler任務")
# per-execution monitoring, call register_events on your scheduler
register_events(scheduler)
scheduler.start()
print("Scheduler started!")

運行結果如下:

APScheduler中兩種調度器的區別及使用過程中要注意的問題
APScheduler中有很多種不同類型的調度器,BlockingScheduler與BackgroundScheduler是其中最常用的兩種調度器。區別主要在於BlockingScheduler會阻塞主線程的運行,而BackgroundScheduler不會阻塞。所以,我們在不同的情況下,選擇不同的調度器:
BlockingScheduler: 調用start函數后會阻塞當前線程。當調度器是你應用中唯一要運行的東西時(如上例)使用。BackgroundScheduler: 調用start后主線程不會阻塞。當你不運行任何其他框架時使用,並希望調度器在你應用的后台執行。
BlockingScheduler的真實例子
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def job():
print('job 3s')
if __name__=='__main__':
sched = BlockingScheduler(timezone='MST')
sched.add_job(job, 'interval', id='3_second_job', seconds=3)
sched.start()
while(True):
print('main 1s')
time.sleep(1)
運行結果:
job 3s
job 3s
job 3s
job 3s
可見,BlockingScheduler調用start函數后會阻塞當前線程,導致主程序中while循環不會被執行到。
BackgroundScheduler的真實例子
from apscheduler.schedulers.background import BackgroundScheduler
import time
def job():
print('job 3s')
if __name__=='__main__':
sched = BackgroundScheduler(timezone='MST')
sched.add_job(job, 'interval', id='3_second_job', seconds=3)
sched.start()
while(True):
print('main 1s')
time.sleep(1)
運行結果:
main 1s
main 1s
main 1s
job 3s
main 1s
main 1s
main 1s
job 3s
可見,BackgroundScheduler調用start函數后並不會阻塞當前線程,所以可以繼續執行主程序中while循環的邏輯。
通過這個輸出,我們也可以發現,調用start函數后,job()並不會立即開始執行。而是等待3s后,才會被調度執行。
如何讓job在start()后就開始運行
如何才能讓調度器調用start函數后,job()就立即開始執行呢?
其實APScheduler並沒有提供很好的方法來解決這個問題,但有一種最簡單的方式,就是在調度器start之前,就運行一次job(),如下
from apscheduler.schedulers.background import BackgroundScheduler
import time
def job():
print('job 3s')
if __name__=='__main__':
job()
sched = BackgroundScheduler(timezone='MST')
sched.add_job(job, 'interval', id='3_second_job', seconds=3)
sched.start()
while(True):
print('main 1s')
time.sleep(1)
運行結果:
job 3s
main 1s
main 1s
main 1s
job 3s
main 1s
main 1s
main 1s
這樣雖然沒有絕對做到“讓job在start()后就開始運行”,但也能做到“不等待調度,而是剛開始就運行job”。
如果job執行時間過長會怎么樣
如果執行job()的時間需要5s,但調度器配置為每隔3s就調用一下job(),會發生什么情況呢?我們寫了如下例子:
from apscheduler.schedulers.background import BackgroundScheduler
import time
def job():
print('job 3s')
time.sleep(5)
if __name__=='__main__':
sched = BackgroundScheduler(timezone='MST')
sched.add_job(job, 'interval', id='3_second_job', seconds=3)
sched.start()
while(True):
print('main 1s')
time.sleep(1)
運行結果:
main 1s
main 1s
main 1s
job 3s
main 1s
main 1s
main 1s
Execution of job "job (trigger: interval[0:00:03], next run at: 2018-05-07 02:44:29 MST)" skipped: maximum number of running instances reached (1)
main 1s
main 1s
main 1s
job 3s
main 1s
可見,3s時間到達后,並不會“重新啟動一個job線程”,而是會跳過該次調度,等到下一個周期(再等待3s),又重新調度job()。
為了能讓多個job()同時運行,我們也可以配置調度器的參數max_instances,如下例,我們允許2個job()同時運行
from apscheduler.schedulers.background import BackgroundScheduler
import time
def job():
print('job 3s')
time.sleep(5)
if __name__=='__main__':
job_defaults = { 'max_instances': 2 }
sched = BackgroundScheduler(timezone='MST', job_defaults=job_defaults)
sched.add_job(job, 'interval', id='3_second_job', seconds=3)
sched.start()
while(True):
print('main 1s')
time.sleep(1)
運行結果:
main 1s
main 1s
main 1s
job 3s
main 1s
main 1s
main 1s
job 3s
main 1s
main 1s
main 1s
job 3s
每個job是怎么被調度的
通過上面的例子,我們發現,調度器是定時調度job()函數,來實現調度的。
那job()函數會被以進程的方式調度運行,還是以線程來運行呢?
為了弄清這個問題,我們寫了如下程序:
from apscheduler.schedulers.background import BackgroundScheduler
import time,os,threading
def job():
print('job thread_id-{0}, process_id-{1}'.format(threading.get_ident(), os.getpid()))
time.sleep(50)
if __name__=='__main__':
job_defaults = { 'max_instances': 20 }
sched = BackgroundScheduler(timezone='MST', job_defaults=job_defaults)
sched.add_job(job, 'interval', id='3_second_job', seconds=3)
sched.start()
while(True):
print('main 1s')
time.sleep(1)
運行結果:
main 1s
main 1s
main 1s
job thread_id-10644, process_id-8872
main 1s
main 1s
main 1s
job thread_id-3024, process_id-8872
main 1s
main 1s
main 1s
job thread_id-6728, process_id-8872
main 1s
main 1s
main 1s
job thread_id-11716, process_id-8872
可見,每個job()的進程ID都相同,但線程ID不同。所以,job()最終是以線程的方式被調度執行。
BlockingScheduler定時任務及其他方式的實現
#BlockingScheduler定時任務
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
首先看看周一到周五定時執行任務
# 輸出時間
def job():
print(datetime.now().strtime("%Y-%m-%d %H:%M:%S"))
# BlockingScheduler
scheduler = BlockingScheduler()
scheduler.add_job(job, "cron", day_of_week="1-5", hour=6, minute=30)
schduler.start()
scheduler.add_job(job, 'cron', hour=1, minute=5)
hour =19 , minute =23 這里表示每天的19:23 分執行任務
hour ='19', minute ='23' 這里可以填寫數字,也可以填寫字符串
hour ='19-21', minute= '23' 表示 19:23、 20:23、 21:23 各執行一次任務
#每300秒執行一次
scheduler .add_job(job, 'interval', seconds=300)
#在1月,3月,5月,7-9月,每天的下午2點,每一分鍾執行一次任務
scheduler .add_job(func=job, trigger='cron', month='1,3,5,7-9', day='*', hour='14', minute='*')
# 當前任務會在 6、7、8、11、12 月的第三個周五的 0、1、2、3 點執行
scheduler .add_job(job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
#從開始時間到結束時間,每隔倆小時運行一次
scheduler .add_job(job, 'interval', hours=2, start_date='2018-01-10 09:30:00', end_date='2018-06-15 11:00:00')
#自制定時器
from datetime import datetime
import time
# 每n秒執行一次
def timer(n):
while True:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
time.sleep(n)
timer(5)
BackgroundScheduler定時任務及其他方式的實現
啟動異步定時任務
import time
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
try:
# 實例化調度器
scheduler = BackgroundScheduler()
# 調度器使用DjangoJobStore()
scheduler.add_jobstore(DjangoJobStore(), "default")
# 'cron'方式循環,周一到周五,每天9:30:10執行,id為工作ID作為標記
# ('scheduler',"interval", seconds=1) #用interval方式循環,每一秒執行一次
@register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_time')
def test_job():
t_now = time.localtime()
print(t_now)
# 監控任務
register_events(scheduler)
# 調度器開始
scheduler.start()
except Exception as e:
print(e)
# 報錯則調度器停止執行
scheduler.shutdown()
