背景
業務需求:用戶可創建多個多人任務,需要在任務截止時間前一天提醒所有參與者
技術選型:
Celery:分布式任務隊列。實現異步與定時
django-celery-beat:實現動態添加定時任務,即在創建多人任務時添加定時。django-celery-beat插件本質上是對數據庫表變化檢查,一旦有數據庫表改變,調度器重新讀取任務進行調度
安裝與配置
安裝
pip install celery
pip install django-celery-beat
配置
INSTALLED_APPS = (
...,
'django_celery_beat',
)
# settings.py
TIME_ZONE = 'Asia/Shanghai'
USE_TZ = False
# =================Celery 配置=================
# 使用redis作為broker
REDIS_HOST = 'redis://127.0.0.1:6379/0'
# 關閉 UTC
CELERY_ENABLE_UTC = False
# 設置 django-celery-beat 真正使用的時區
CELERY_TIMEZONE = TIME_ZONE
# 使用 timezone naive 模式,不存儲時區信息,只存儲經過時區轉換后的時間
DJANGO_CELERY_BEAT_TZ_AWARE = False
# 配置 celery 定時任務使用的調度器,使用django_celery_beat插件用來動態配置任務
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
創建django-celery-beat所需要的數據表
python manage.py migrate
創建celery實例,並定義任務
# 由於django_celery_beat用到了Django的ORM,因此首先需要setup django,否則會報錯
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "qaboard.settings")
django.setup()
from celery import Celery
from project import settings
from utils.send_msg import send_msg
# 使用redis作為消息隊列,backend也默認為broker使用的隊列服務
app = Celery('test', broker=settings.REDIS_HOST)
# 載入django配置文件中以 CELERY 開頭的配置
app.config_from_object('project.settings', namespace='CELERY')
@app.task
def test_task():
send_msg("test celery")
啟動celery worker和celery beat
celery -A project_celery worker --pool=solo -l info -f logs/celery.log
'-A' 是一個全局配置,定義了APP的位置
'--pool' 是POOL的配置,默認是prefork(並發),選擇solo之后,發送的任務不會被並發執行,在worker執行任務過程中,再次發送給worker的任務會排隊,執行完一個再執行另一個。不需要並發時可以選擇此模式以節約服務器資源
'-l' 定義了log級別
'-f' 定義日志文件路徑
celery -A project_celery beat -l info -f logs/beat.log --pidfile=logs/celerybeat.pid
'--pidfile' 用於定位pidfile,pidfile是一個存儲了beat進程的進程id的文件,如果此文件存在且此文件中的進程正在運行中,則不會啟動新的beat進程
由於配置中已經聲明了調度器,因此這里不需要重新聲明,否則需要使用
--scheduler django_celery_beat.schedulers:DatabaseScheduler
聲明使用DatabaseScheduler
在linux上可以用-B參數同步啟動celery beat
celery -A qaboard_celery worker --pool=solo -l info -f logs/celery.log -B
beat的log會輸出到celery.log中
動態添加定時任務
PeriodicTask
此模型定義要運行的單個周期性任務。
- 必須為任務指定一種Schedule,即clocked, interval, crontab, solar四個字段必須填寫一個,且只能填寫一個
- name字段給任務命名,它是unique的
- task字段指定運行的Celery任務,如“proj.tasks.test_task”
- one_off:默認值為False,如果one_off=True,任務被運行一次后enabled字段將被置為False,即任務只會運行一次
- args:傳遞給任務的參數,是一個json字符串,如 ["arg1", "arg2"]
- expires:過期時間,過期的任務將不再會被驅動觸發
使用ClockedSchedule
會在特定的時間觸發任務
def test_clock():
clock = ClockedSchedule.objects.create(clocked_time=datetime.now() + timedelta(seconds=10))
PeriodicTask.objects.create(
name="%s" % str(datetime.now()),
task="project_celery.celery_app.test_task",
clocked=clock,
# 如果使用ClockedSchedule,則one_off必須為True
one_off=True
)
不知道為什么我的任務就是無法通過clock觸發,beat.log中有DatabaseScheduler: Schedule changed.
的記錄,但是到了clock指定的時間任務不會被觸發,其他的調度器都是可以正常運行的,如果有知道解決方法的同學可以評論告訴我,感謝
使用IntervalSchedule
以特定間隔運行的Schedule
用IntervalSchedule能夠實現與ClockedSchedule同樣的功能:計算目標時間與當前時間的時間差,令此時間差作為IntervalSchedule的周期,並且將任務的one_off參數置為True
def time_diff(target_time):
diff = target_time - datetime.now()
return int(diff.total_seconds())
def test_interval():
seconds = time_diff(datetime.strptime("2020-3-19 15:39:00", "%Y-%m-%d %H:%M:%S"))
schedule = IntervalSchedule.objects.create(every=seconds, period=IntervalSchedule.SECONDS)
PeriodicTask.objects.create(
name="%s" % str(datetime.now()),
task="project_celery.celery_app.test_task",
interval=schedule,
one_off=True
)
使用CrontabSchedule
使用CrontabSchedule一定要注意將時區設置為當前地區時區
model參數與crontab表達式的對應關系:
minite, hour, day_of_week, day_of_month, month_of_year
全部默認為"*"
def test_crontab():
# 表示 * * * * * ,即每隔一分鍾觸發一次
schedule = CrontabSchedule.objects.create(timezone='Asia/Shanghai')
PeriodicTask.objects.create(
name="%s" % str(datetime.now()),
task="project_celery.celery_app.test_task",
crontab=schedule,
one_off=True
)