曾經有一個叫django-celery的模塊,大家都用它來做django的異步任務。后來因為它對django、celery還有django-celery的版本要求太高了\
,稍有不對就用不了,而且至今那個django-celery模塊已經很長時間沒更新過了,所以大家就都單獨使用celery了。但在django中使用需要注意幾點也是我遇見的幾個坑,后面會講到。
1.安裝celery
pip install celery
2.celery簡介
Celery 是一個 基於python開發的分布式異步消息任務隊列,通過它可以輕松的實現任務的異步處理, 如果你的業務場景中需要用到異步任務,就可以考慮使用celery
celery有以下優點:
- 簡單:一單熟悉了celery的工作流程后,配置和使用還是比較簡單的
- 高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務
- 快速:一個單進程的celery每分鍾可處理上百萬個任務
- 靈活: 幾乎celery的各個組件都可以被擴展及自定制
celery組成結構
消息中間件(message broker)
Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等
任務執行單元(worker)
Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中。
任務執行結果存儲(task result store)
Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, redis等
工作流程圖
3.celery的使用
因為celery本身不提供消息服務,所以需要配置第三方消息中間件,如redis,RebbitMq,若不配置,默認用的RebbitMq
安裝RebbitMq
https://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id3
安裝reids
https://www.runoob.com/redis/redis-install.html
在本篇中使用redis
1.目錄結構
假設項目名為Demo,app名為app01
Demo
Demo
__init__.py
settings.py
celery.py
...
app01
views.py
task.py
...
2.settings中配置
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200} # 設置超時時間,一定要設置
BACKEND='redis://127.0.0.1:6379/3' BROKER='redis://127.0.0.1:6379/4'
CELERY_TIMEZONE='Asia/Shanghai'
CELERY_ENABLE_UTC=True
3.__init__.py中配置
from __future__ import absolute_import from .celery import app as celery_app
4. celery_task可以放在app中也可以放在app外,根據項目配置。celery.py中需要做配置
import os from celery import Celery from NP.settings import BACKEND,BROKER from django.conf import settings from celery.schedules import crontab from datetime import timedelta # set the default Django settings module for the 'celery' program. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "NP.settings") # 若不配置backend和broker,則默認使用rebbitmq作為消息中間件 app = Celery('app01',backend=BACKEND,broker=BROKER) # 將celery的settings設置為django的settings,需要配置celery,只需要在settings中配置即可 app.config_from_object('django.conf:settings') # 使用這句,則在task中就不需要根據相對路徑導入celery對象了,只需要導入shared_task app.autodiscover_tasks(lambda :settings.INSTALLED_APPS) # 設置定時任務 app.conf.update( CELERYBEAT_SCHEDULE={ 'sum-add': { # 名字隨意起 'task': 'app01.task.add', # 指定定時任務路徑 'schedule': timedelta(seconds=20), # 每20執行一次 'args': (5, 6) }, 'send_email': { 'task': 'app01.task.email', 'schedule': crontab(hour='18',minute='45'), # 每天18點45分執行 'args':(1,2) } } )
5.task中寫任務處理邏輯
一般來說,一個app中只需要寫一個task文件就可以了,異步、定時、延時任務都可配置
from celery import shared_task @shared_task def add_user(data): ... return '用戶添加成功!' @shared_task def email(data): ... return '郵件發送成功!'
6.views中觸發任務
from task import add_user,emial from datetime import datetime class userViewset(): def post(request): ret = add_user.delay(request.data) # 觸發異步任務 class EmailViewset(): def post(request): ctime = datetime.now() # 當前時間 utc_time = datetime.utcfromtimestamp(ctime.timestamp()) # 轉成本地時間 time_delta = timedelta(seconds=30) # 設置延時30s task_time = utc_time + time_delta # 設定時間點為30s后 ret = add_user.apply_async(args=(request.data,), eta=task_time) # 觸發延時任務
需要注意的幾點:
1.delay()方法只能傳遞可被Json序列化的數據,也就是說不能傳遞對象。如果傳了不可序列化的數據則會報錯:
kombu.exceptions.EncodeError: Object of type *args is not JSON serializable
2.如果配置完成后,執行定時任務時,發現只要添加hour就會無法在指定時間觸發任務,則是時區問題
解決方法:在settings中配置
CELERY_TIMEZONE='Asia/Shanghai' CELERY_ENABLE_UTC=True