celery是一個python的分布式任務隊列框架,支持 分布的 機器/進程/線程的任務調度。采用典型的生產者-消費者模型
包含三部分:
1. 隊列 broker :可使用redis ,rabbitmq ,或關系數據庫作為broker
2.處理任務的消費者workers : 隊列中有任務時就發出通知,worker收到通知就去處理
3.任務結果存儲 backend: 存儲任務的返回值
celery 4.2,django 1.11.7 ,rabbitmq 3.7.3,centos 6.5
事先需安裝 rabbitmq , (安裝rabbitmq前需安裝erlang ,esl-erlang )
yum install rabbitmq-server
(開啟 rabbitmq : systemctl start rabbitmq-server ; 查看rabbitmq的狀態 ,切換到合適的目錄與用戶(一般為安裝時的用戶,目錄一般為/usr/lib/bin): rabbitmqctl status ;)
安裝celery包
pip install celery ==4.2
通常django的項目目錄為
- proj /
- manage.py
-proj/
-__init__.py
-settings.py
- urls.py
-myapp/
-urls.py
-views.py
-models.py
首先要創建 一個celery實例 proj/proj/celery.py
from __future__import absolute_import,unicode_literals
import os
from celery import Celery
#為這個celery項目設置系統環境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings')
#實例化Celery對象
app=Celery('proj')
#配置傳入字符串而不是配置對象,使worker不需要在子進程中序列化配置對象;命名空間設置為’CELERY‘意味着所有celery相關的配置鍵值都應該以'CELERY_'開頭
app.config_from_object('django.conf:settings',namespace='CELERY')
#自動加載task模塊
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
#debug_task是一個復制它自己的request的信息的task ,使用bind=True簡單地將其綁定到當前的task實例上。
2.在 proj/proj/__init__.py中導入第一步中創建的app,是為了保證django項目開啟時就加載這個app,讓后續的@shared_task裝飾器可以使用它。
from __future__ import absolute_import,unicode_literals
from .celery import app as celery_app
__all__=('celery_app',)
3.創建myapp/tasks.py ,使用@shared_task裝飾器
from __future__ import absolute_import,unicode_literals
from celery import shared_task
@shared_task
def add(x,y):
return x+y
@shared_task
def mul(x,y):
return x*y
@shared_task
def xsum(numbers):
return sum(numbers)
#一般將比較耗時的操作,定義在task.py中,這樣在view中使用這個函數時,就可以異步調用,不必等操作完成再返回頁面結果,而是可以異步調用完后直接進行下一步。
而這個操作會在子進程中繼續執行,執行結果保存在隊列中,也可指定保存在django的orm中
4.將celery隊列的任務執行結果保存到 django的orm或緩存框架中
*需要安裝 django-celery-results
pip install django-celery-results
*將其('django_celery_results')添加到settings.py的INSTALL_APPS中
*在數據庫中創建保存celery結果的表
python manage.py migrate django_celery_results
*在settings.py為celery配置后端存儲
CELERY_RESULT_BACKEND='django-db'
或
CELERY_RESULT_BACKEND='django-cache'
啟動完django 的 manage.py runserver后運行 celery -A proj worker -l info 開啟celery隊列