Celery動態添加定時任務


背景

業務需求:用戶可創建多個多人任務,需要在任務截止時間前一天提醒所有參與者

技術選型:

Celery:分布式任務隊列。實現異步與定時

django-celery-beat:實現動態添加定時任務,即在創建多人任務時添加定時。django-celery-beat插件本質上是對數據庫表變化檢查,一旦有數據庫表改變,調度器重新讀取任務進行調度

安裝與配置

安裝

pip install celery
pip install django-celery-beat

配置

INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)

# settings.py
TIME_ZONE = 'Asia/Shanghai'
USE_TZ = False

# =================Celery 配置=================
# 使用redis作為broker
REDIS_HOST = 'redis://127.0.0.1:6379/0'
# 關閉 UTC
CELERY_ENABLE_UTC = False
# 設置 django-celery-beat 真正使用的時區
CELERY_TIMEZONE = TIME_ZONE
# 使用 timezone naive 模式,不存儲時區信息,只存儲經過時區轉換后的時間
DJANGO_CELERY_BEAT_TZ_AWARE = False
# 配置 celery 定時任務使用的調度器,使用django_celery_beat插件用來動態配置任務
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

創建django-celery-beat所需要的數據表

python manage.py migrate

創建celery實例,並定義任務

# 由於django_celery_beat用到了Django的ORM,因此首先需要setup django,否則會報錯
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "qaboard.settings")
django.setup()

from celery import Celery
from project import settings
from utils.send_msg import send_msg

# 使用redis作為消息隊列,backend也默認為broker使用的隊列服務
app = Celery('test', broker=settings.REDIS_HOST)
# 載入django配置文件中以 CELERY 開頭的配置
app.config_from_object('project.settings', namespace='CELERY')


@app.task
def test_task():
    send_msg("test celery")

啟動celery worker和celery beat

celery -A project_celery worker --pool=solo -l info -f logs/celery.log

'-A' 是一個全局配置,定義了APP的位置

'--pool' 是POOL的配置,默認是prefork(並發),選擇solo之后,發送的任務不會被並發執行,在worker執行任務過程中,再次發送給worker的任務會排隊,執行完一個再執行另一個。不需要並發時可以選擇此模式以節約服務器資源

'-l' 定義了log級別

'-f' 定義日志文件路徑

celery -A project_celery beat -l info -f logs/beat.log --pidfile=logs/celerybeat.pid

'--pidfile' 用於定位pidfile,pidfile是一個存儲了beat進程的進程id的文件,如果此文件存在且此文件中的進程正在運行中,則不會啟動新的beat進程

由於配置中已經聲明了調度器,因此這里不需要重新聲明,否則需要使用

--scheduler django_celery_beat.schedulers:DatabaseScheduler

聲明使用DatabaseScheduler

在linux上可以用-B參數同步啟動celery beat

celery -A qaboard_celery worker --pool=solo -l info -f logs/celery.log -B

beat的log會輸出到celery.log中

動態添加定時任務

PeriodicTask

此模型定義要運行的單個周期性任務。

  1. 必須為任務指定一種Schedule,即clocked, interval, crontab, solar四個字段必須填寫一個,且只能填寫一個
  2. name字段給任務命名,它是unique的
  3. task字段指定運行的Celery任務,如“proj.tasks.test_task”
  4. one_off:默認值為False,如果one_off=True,任務被運行一次后enabled字段將被置為False,即任務只會運行一次
  5. args:傳遞給任務的參數,是一個json字符串,如 ["arg1", "arg2"]
  6. expires:過期時間,過期的任務將不再會被驅動觸發

使用ClockedSchedule

會在特定的時間觸發任務

def test_clock():
    clock = ClockedSchedule.objects.create(clocked_time=datetime.now() + timedelta(seconds=10))
    PeriodicTask.objects.create(
        name="%s" % str(datetime.now()),
        task="project_celery.celery_app.test_task",
        clocked=clock,
        # 如果使用ClockedSchedule,則one_off必須為True
        one_off=True
    )

不知道為什么我的任務就是無法通過clock觸發,beat.log中有DatabaseScheduler: Schedule changed.的記錄,但是到了clock指定的時間任務不會被觸發,其他的調度器都是可以正常運行的,如果有知道解決方法的同學可以評論告訴我,感謝

使用IntervalSchedule

以特定間隔運行的Schedule

用IntervalSchedule能夠實現與ClockedSchedule同樣的功能:計算目標時間與當前時間的時間差,令此時間差作為IntervalSchedule的周期,並且將任務的one_off參數置為True

def time_diff(target_time):
    diff = target_time - datetime.now()
    return int(diff.total_seconds())

def test_interval():
    seconds = time_diff(datetime.strptime("2020-3-19 15:39:00", "%Y-%m-%d %H:%M:%S"))
    schedule = IntervalSchedule.objects.create(every=seconds, period=IntervalSchedule.SECONDS)
    PeriodicTask.objects.create(
        name="%s" % str(datetime.now()),
        task="project_celery.celery_app.test_task",
        interval=schedule,
        one_off=True
    )

使用CrontabSchedule

使用CrontabSchedule一定要注意將時區設置為當前地區時區

model參數與crontab表達式的對應關系:

minite, hour, day_of_week, day_of_month, month_of_year

全部默認為"*"

def test_crontab():
    # 表示 * * * * * ,即每隔一分鍾觸發一次
    schedule = CrontabSchedule.objects.create(timezone='Asia/Shanghai')
    PeriodicTask.objects.create(
        name="%s" % str(datetime.now()),
        task="project_celery.celery_app.test_task",
        crontab=schedule,
        one_off=True
    )


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM