Celery在Django中的使用


celery+django使用教程

建議先看一下最下面的一些坑,總結的不是太全,celery與django-celery最好版本保持一致,總之坑比較多,但不放棄就可以爬出來

目錄結構

taskproj
├── app01
│   ├── __init__.py
│   ├── apps.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py
│   └── views.py
├── manage.py
├── taskproj
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── templates
1、安裝軟件
pip install celery==3.1.22
pip install django-celery==3.1.22
pip install -U "celery[redis]"

2、新建celery文件

#! /usr/bin/env python
# coding: utf-8

from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproj.settings')
app = Celery('taskproj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)  
# 不添加(lambda: settings.INSTALLED_APPS) 可能會報錯

3、配置setting里的內容

INSTALLED_APPS = [
...,
  'djcelery',
]

LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
USE_TZ = False

# broker是代理人,它負責分發任務給worker去執行
REDIS_URL = '47.98.xxx.xx'
#《!不設置找不到下面的路徑 !》
BROKER_URL = 'redis://{0}:6379/6'.format(REDIS_URL) 
CELERY_BROKER_URL = 'redis://47.98.xxx.xx:6379/5' # Broker配置,使用Redis作為消息中間件
CELERY_RESULT_BACKEND = 'redis://47.98.xxx.xx:6379/5' # BACKEND配置,這里使用redis
CELERY_RESULT_SERIALIZER = 'json' # 結果序列化方案
######################
#       隊列         #
######################
# 導入目標任務文件
# # CELERY_IMPORTS = ('task.tasks',) # 這里可以不用設置,否則會重復導入任務
# CELERY_ACCEPT_CONTENT = ['json']
# CELERY_TASK_SERIALIZER = 'json'
# CELERY_RESULT_SERIALIZER = 'json'
# # CELERY_TIMEZONE = TIME_ZONE
# CELERY_BROKER_URL = 'redis://{0}:6379/5'.format(REDIS_URL) # Broker配置,使用Redis作為消息中間件
# CELERY_RESULT_BACKEND = 'redis://{0}:6379/5'.format(REDIS_URL) # BACKEND配置,這里使用redis
# # CELERY_RESULT_BACKEND = 'django-db'
# CELERY_ENABLE_UTC = False
# DJANGO_CELERY_BEAT_TZ_AWARE = False
# CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# CELERYD_MAX_TASKS_PER_CHILD = 3
# # 設置結果的保存時間
# CELERY_TASK_RESULT_EXPIRES = 10 * 60
# # 設置默認不存結果
# CELERY_IGNORE_RESULT = False
# CELERYD_FORCE = True
# CELERY_TIMEZONE = 'Asia/Shanghai'

4、在 /taskproj/taskproj/__init__.py里添加如下內容

#!/usr/bin/env python
# encoding: utf-8
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

import pymysql
pymysql.install_as_MySQLdb()

5、添加tasks

from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task()
def add(x, y):
  return x + y

@shared_task()
def mul(x, y):
  print("%d * %d = %d" % (x, y, x * y))
  return x * y

6、View視圖函數

from django.http import JsonResponse
from app01 import tasks

# Create your views here.

def index(request,*args,**kwargs):
  res=tasks.add.delay(2,5)
  data = {
      'status':'successful',
      'task_id':res.task_id
  }
  return JsonResponse(data)

6.5 )進行結果驗證

python manage.py runserver
celery worker -A taskproj -l info  # 啟動woker

7、結果的查詢,先進入redis數據庫,選擇進行查看

1、redis服務器開啟命令    redis-server /etc/redis/redis.conf
(注釋:/etc/redis/redis.conf,安裝時的配置 文件路徑,咱們的可能不一樣,不添加可能報錯)
2、redis客戶端開啟命令    redis-cli --raw           
(注釋:--raw參數為了查看原生數據)
3、選擇settings配置的redis庫(本人配置的是5)
1) select 5
2) keys *      
(注釋:查看該庫所有的鍵)
3) get celery-task-meta-458ac683-5054-497e-b9e5-794a19f8c6a9

!1577503953859](/tmp/1577503953859.png)

8、把結果存儲到mysql里

1.安裝
pip install django-celery-results
2.配置settings.py,注冊app
INSTALLED_APPS = (
  ...,
  'django_celery_results',
)
3.修改backend配置,將redis改為django-db
# CELERY_RESULT_BACKEND = 'redis://47.98.xxx.xx:6379/5' # BACKEND配置,這里使用redis
CELERY_RESULT_BACKEND = 'django-db' #使用django orm 作為結果存儲
4.修改數據庫
python manage.py makemigrations
python manage.py migrate django_celery_results

9、Task源碼表結構

class TaskResult(models.Model):
  """Task result/status."""
  task_id = models.CharField(_('task id'), max_length=255, unique=True)
  task_name = models.CharField(_('task name'), null=True, max_length=255)
  task_args = models.TextField(_('task arguments'), null=True)
  task_kwargs = models.TextField(_('task kwargs'), null=True)
  status = models.CharField(_('state'), max_length=50,
                            default=states.PENDING,
                            choices=TASK_STATE_CHOICES
                            )
  content_type = models.CharField(_('content type'), max_length=128)
  content_encoding = models.CharField(_('content encoding'), max_length=64)
  result = models.TextField(null=True, default=None, editable=False)
  date_done = models.DateTimeField(_('done at'), auto_now=True)
  traceback = models.TextField(_('traceback'), blank=True, null=True)
  hidden = models.BooleanField(editable=False, default=False, db_index=True)
  meta = models.TextField(null=True, default=None, editable=False)
  objects = managers.TaskResultManager()
  class Meta:
      """Table information."""
      ordering = ['-date_done']
      verbose_name = _('task result')
      verbose_name_plural = _('task results')
  def as_dict(self):
      return {
          'task_id': self.task_id,
          'task_name': self.task_name,
          'task_args': self.task_args,
          'task_kwargs': self.task_kwargs,
          'status': self.status,
          'result': self.result,
          'date_done': self.date_done,
          'traceback': self.traceback,
          'meta': self.meta,
      }
  def __str__(self):
      return '<Task: {0.task_id} ({0.status})>'.format(self)
Django中使用定時任務
  如果想要在django中使用定時任務功能同樣是靠beat完成任務發送功能,當在Django中使用定時任務時,需要安裝django-celery-beat插件。以下將介紹使用過程。

安裝配置
1.beat插件安裝
pip install django-celery-beat
2.注冊APP
INSTALLED_APPS = [
  ....  
  'django_celery_beat',
]
3.數據庫變更
python manage.py migrate django_celery_beat
4.分別啟動woker和beta
celery -A taskproj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler#啟動beta 調度器使用數據庫
celery worker -A taskproj -l info #啟動woker
5.配置admin
urls.py
復制代碼
# urls.py
from django.conf.urls import url
from django.contrib import admin
urlpatterns = [
  url(r'^admin/', admin.site.urls),
]
復制代碼
6.創建用戶
python manage.py createsuperuser
7.登錄admin進行管理(地址http://127.0.0.1:8000/admin)並且還可以看到我們上次使用orm作為結果存儲的表。
http://127.0.0.1:8000/admin/login/?next=/admin/

celery使用過程中的坑

1、版本之間相互不兼容:

1)django-celery不兼容celery4.0.0以上版本

2)python3.7以上版本不兼容celery,因為python3.7使用了async關鍵字,而celery用這個關鍵字了,解決辦法可以直接把錯誤直接百度,我用的是修改async.py文件名,然后再把導包的位置進行修改

1、application(task producer)"任務的生產者",我們通過代碼層面,產生任務然后發送給broker(task queue)
2、celery beat(task scheduler)"任務調度器",常見為生成定時任務
3、broker(task queue)"任務隊列",我們需要將任務送往任務隊列去,官網強烈推薦的broker有:redis和rabbitmq
4、worker(taks consumer)"任務消費者",在任務送往broker后,worker從中進行操作,完成任務后生成result
5、result 完成任務后產生的結果

Broker的選擇broker我們可選的有redis和rabbitmq,官網推薦的兩種,我們這里選redis,相對於rabbitmq更加輕量級,安裝方便。所以我們需要安裝redis,redis很容易安裝,這里就不細講了。

2、當然如果想要實現定時任務的話還需要安裝django-celery-beta插件,需要注意的是Celery4.0只支持Django版本>=1.8的,如果是小於1.8版本需要使用Celery3.1。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM