python測試開發django-161.Celery 定時任務保存到數據庫 (djcelery)


前言

接着前面Celery 定時任務,這篇使用Celery + djcelery 把定時任務存到數據庫。

djcelery 環境准備

定時任務基礎環境准備,就不多說了,接着前面一篇https://www.cnblogs.com/yoyoketang/p/15432907.html.
Celery的使用方式有兩種:

  • Celery 只用Celery,本身自帶worker 和 beat (定時任務)功能,定時任務在setting配置
  • Celery + djcelery 使用了djcelery,可以在任務中方便的直接操作 Django 數據庫,而且最終的任務可以在 Django 的后台中查看和修改相關的任務。

多安裝一個 djcelery 主要是把定時任務放到數據庫中,方便配置和管理。

pip 安裝django-celery

pip install django-celery==3.3.1

在 setting 里面配置 INSTALLED_APPS,添加'djcelery'

INSTALLED_APPS = [
    ......
    'djcelery'
]

同步數據庫

python manage.py makemigrations
python manage.py migrate

執行完成后會看到djcelery相關的幾張表

CELERY 配置

在setting.py 添加CELERY 相關配置

import djcelery
djcelery.setup_loader()

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

BROKER_URL = 'redis://192.168.1.1:6379'
# RESULT_BACKEND 結果保存數據庫
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
# SCHEDULER 定時任務保存數據庫
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

相對於前面一篇,修改了以下內容

# 新增這2句
import djcelery
djcelery.setup_loader()

# RESULT_BACKEND 結果保存數據庫
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
# SCHEDULER 定時任務保存數據庫
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

tasks任務

在app下新建tasks.py,必須要是tasks.py文件名稱,django會自動查找到app下的該文件

from __future__ import absolute_import
from celery import shared_task


@shared_task
def add(x, y):
    print("task----------111111----------")
    return x + y


@shared_task
def mul(x, y):
    print("task----------22222----------")
    return x * y

views視圖創建任務

創建視圖,把定時任務信息寫入數據庫

from django.http import JsonResponse
import datetime
import json
from djcelery.models import PeriodicTask, CrontabSchedule


def create_task(request):
    task_name = "test"	 # 唯一值,自定義不能重復, 這里是測試下,先寫死
    task = 'yoyo.tasks.add'  # 任務的注冊路徑
    # task_args = [10, 11]   # 任務參數
    task_kwargs = {'x': 10, 'y': 11}  # 關鍵字參數
    # 定時任務規則
    crontab_time = {
        'minute': '*/2',       # 每2分鍾執行一次
        'hour': '*',
        'day_of_week': '*',
        'day_of_month': '*',
        'month_of_year': '*'
    }
    # 寫入 schedule表
    schedule = CrontabSchedule.objects.create(**crontab_time)
    # 任務和 schedule 關聯
    task, created = PeriodicTask.objects.get_or_create(
        name=task_name,		# 名稱保持唯一
        task=task,
        crontab=schedule,
        enabled=True,	  # 是否開啟任務
        # args=json.dumps(task_args),
        kwargs=json.dumps(task_kwargs),
        # 任務過期時間,設置當前時間往后1天
        expires=datetime.datetime.now()+datetime.timedelta(days=1)
    )
    if created:
        return JsonResponse({"code": 0, "msg": "success"})
    else:
        return JsonResponse({"code": 111, "msg": "create failed"})

分別啟動django, worker 和 beat服務

python manage.py runserver 0.0.0.0:8000
celery -A MyDjango beat -l info
celery -A MyDjango worker -l info

訪問接口觸發后,數據庫 djcelery_crontabschedule 表會寫入定時信息

djcelery_periodictask 表記錄任務信息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM