celery在django中的使用


曾經有一個叫django-celery的模塊,大家都用它來做django的異步任務。后來因為它對django、celery還有django-celery的版本要求太高了\
,稍有不對就用不了,而且至今那個django-celery模塊已經很長時間沒更新過了,所以大家就都單獨使用celery了。但在django中使用需要注意幾點也是我遇見的幾個坑,后面會講到。

 

1.安裝celery

pip install celery

2.celery簡介

  Celery 是一個 基於python開發的分布式異步消息任務隊列,通過它可以輕松的實現任務的異步處理, 如果你的業務場景中需要用到異步任務,就可以考慮使用celery

  celery有以下優點:

  1. 簡單:一單熟悉了celery的工作流程后,配置和使用還是比較簡單的
  2. 高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務
  3. 快速:一個單進程的celery每分鍾可處理上百萬個任務
  4. 靈活: 幾乎celery的各個組件都可以被擴展及自定制

  celery組成結構

    消息中間件(message broker)
      Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等
    任務執行單元(worker)
      Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中。
    任務執行結果存儲(task result store)
      Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, redis等

 

  

  工作流程圖 

   

3.celery的使用

  因為celery本身不提供消息服務,所以需要配置第三方消息中間件,如redis,RebbitMq,若不配置,默認用的RebbitMq
  安裝RebbitMq
    https://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id3
  安裝reids
    https://www.runoob.com/redis/redis-install.html
  在本篇中使用redis

  1.目錄結構

  假設項目名為Demo,app名為app01

    Demo
      Demo
        __init__.py
        settings.py
        celery.py
        ...
      app01
        views.py
        task.py
        ...

  2.settings中配置

BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200}  # 設置超時時間,一定要設置
BACKEND='redis://127.0.0.1:6379/3' BROKER='redis://127.0.0.1:6379/4'
CELERY_TIMEZONE='Asia/Shanghai'
CELERY_ENABLE_UTC=True

  3.__init__.py中配置

from __future__ import absolute_import
from .celery import app as celery_app

  4. celery_task可以放在app中也可以放在app外,根據項目配置。celery.py中需要做配置

import os
from celery import Celery
from NP.settings import BACKEND,BROKER
from django.conf import settings
from celery.schedules import crontab
from datetime import timedelta

# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "NP.settings")

# 若不配置backend和broker,則默認使用rebbitmq作為消息中間件
app = Celery('app01',backend=BACKEND,broker=BROKER)

# 將celery的settings設置為django的settings,需要配置celery,只需要在settings中配置即可
app.config_from_object('django.conf:settings')

# 使用這句,則在task中就不需要根據相對路徑導入celery對象了,只需要導入shared_task
app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)

# 設置定時任務
app.conf.update(
    CELERYBEAT_SCHEDULE={
        'sum-add': {  # 名字隨意起
            'task': 'app01.task.add',  # 指定定時任務路徑
            'schedule': timedelta(seconds=20),  # 每20執行一次
            'args': (5, 6)
        },
        'send_email': {
            'task': 'app01.task.email',
            'schedule': crontab(hour='18',minute='45'),  # 每天18點45分執行
            'args':(1,2)
        }
    }
)

  5.task中寫任務處理邏輯

  一般來說,一個app中只需要寫一個task文件就可以了,異步、定時、延時任務都可配置

from celery import shared_task

@shared_task
def add_user(data):
    ...
    return '用戶添加成功!'

@shared_task
def email(data):
    ...
    return '郵件發送成功!'

  6.views中觸發任務

from task import add_user,emial
from datetime import datetime

class userViewset():
  def post(request):
    ret = add_user.delay(request.data)  # 觸發異步任務

class EmailViewset():
  def post(request):
       ctime = datetime.now()  # 當前時間
       utc_time = datetime.utcfromtimestamp(ctime.timestamp())  # 轉成本地時間
       time_delta = timedelta(seconds=30)  # 設置延時30s
       task_time = utc_time + time_delta  # 設定時間點為30s后
       ret = add_user.apply_async(args=(request.data,), eta=task_time)  # 觸發延時任務

 

需要注意的幾點:
  1.delay()方法只能傳遞可被Json序列化的數據,也就是說不能傳遞對象。如果傳了不可序列化的數據則會報錯:
    kombu.exceptions.EncodeError: Object of type *args is not JSON serializable

  2.如果配置完成后,執行定時任務時,發現只要添加hour就會無法在指定時間觸發任務,則是時區問題

    解決方法:在settings中配置

CELERY_TIMEZONE='Asia/Shanghai'
CELERY_ENABLE_UTC=True

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM