前言
接着前面Celery 定時任務,這篇使用Celery + djcelery 把定時任務存到數據庫。
djcelery 環境准備
定時任務基礎環境准備,就不多說了,接着前面一篇https://www.cnblogs.com/yoyoketang/p/15432907.html.
Celery的使用方式有兩種:
- Celery 只用Celery,本身自帶worker 和 beat (定時任務)功能,定時任務在setting配置
- Celery + djcelery 使用了djcelery,可以在任務中方便的直接操作 Django 數據庫,而且最終的任務可以在 Django 的后台中查看和修改相關的任務。
多安裝一個 djcelery 主要是把定時任務放到數據庫中,方便配置和管理。
pip 安裝django-celery
pip install django-celery==3.3.1
在 setting 里面配置 INSTALLED_APPS,添加'djcelery'
INSTALLED_APPS = [
......
'djcelery'
]
同步數據庫
python manage.py makemigrations
python manage.py migrate
執行完成后會看到djcelery相關的幾張表
CELERY 配置
在setting.py 添加CELERY 相關配置
import djcelery
djcelery.setup_loader()
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
BROKER_URL = 'redis://192.168.1.1:6379'
# RESULT_BACKEND 結果保存數據庫
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
# SCHEDULER 定時任務保存數據庫
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
相對於前面一篇,修改了以下內容
# 新增這2句
import djcelery
djcelery.setup_loader()
# RESULT_BACKEND 結果保存數據庫
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
# SCHEDULER 定時任務保存數據庫
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
tasks任務
在app下新建tasks.py,必須要是tasks.py文件名稱,django會自動查找到app下的該文件
from __future__ import absolute_import
from celery import shared_task
@shared_task
def add(x, y):
print("task----------111111----------")
return x + y
@shared_task
def mul(x, y):
print("task----------22222----------")
return x * y
views視圖創建任務
創建視圖,把定時任務信息寫入數據庫
from django.http import JsonResponse
import datetime
import json
from djcelery.models import PeriodicTask, CrontabSchedule
def create_task(request):
task_name = "test" # 唯一值,自定義不能重復, 這里是測試下,先寫死
task = 'yoyo.tasks.add' # 任務的注冊路徑
# task_args = [10, 11] # 任務參數
task_kwargs = {'x': 10, 'y': 11} # 關鍵字參數
# 定時任務規則
crontab_time = {
'minute': '*/2', # 每2分鍾執行一次
'hour': '*',
'day_of_week': '*',
'day_of_month': '*',
'month_of_year': '*'
}
# 寫入 schedule表
schedule = CrontabSchedule.objects.create(**crontab_time)
# 任務和 schedule 關聯
task, created = PeriodicTask.objects.get_or_create(
name=task_name, # 名稱保持唯一
task=task,
crontab=schedule,
enabled=True, # 是否開啟任務
# args=json.dumps(task_args),
kwargs=json.dumps(task_kwargs),
# 任務過期時間,設置當前時間往后1天
expires=datetime.datetime.now()+datetime.timedelta(days=1)
)
if created:
return JsonResponse({"code": 0, "msg": "success"})
else:
return JsonResponse({"code": 111, "msg": "create failed"})
分別啟動django, worker 和 beat服務
python manage.py runserver 0.0.0.0:8000
celery -A MyDjango beat -l info
celery -A MyDjango worker -l info
訪問接口觸發后,數據庫 djcelery_crontabschedule 表會寫入定時信息
djcelery_periodictask 表記錄任務信息