准備工作
1.創建django項目,添加應用到setting文件
2.pip安裝celery + eventlet + flower
3.文件目錄如下:
4.文件配置如下
celery_app目錄下:

# -*- coding: utf-8 -*- from celery import Celery app = Celery('demo')# 創建 Celery 實例 app.config_from_object('celery_app.celeryconfig')# 通過 Celery 實例加載配置模塊

BROKER_URL = 'redis://xxx.xxx.xxx.xxx:6379'# 指定 Broker CELERY_RESULT_BACKEND = 'redis://xxx.xxx.xxx.xxx:6379/0'# 指定 Backend CELERY_TIMEZONE='Asia/Shanghai'# 指定時區,默認是 UTC # CELERY_TIMEZONE='UTC' CELERY_IMPORTS = ( # 指定導入的任務模塊 'celery_app.task1' )

from celery_app import app @app.task() def add(a, b): return 'hello world: %i' % (a + b)
Django_Celery目錄下:

from django.contrib import admin from django.urls import path from djocelery.views import test,getresult urlpatterns = [ path('admin/', admin.site.urls), path('test/',test.as_view()), path('getresult/',getresult.as_view()) ]
djocelery目錄下:

from django.shortcuts import HttpResponse from django.views import View from celery_app.task1 import add # Create your views here. res = {} class test(View): def get(self,request,*args,**kwargs): a,b = 1,2 result = add.apply_async((a, b)) res[str(result.id)] = result return HttpResponse(result.id) def post(self,request,*args,**kwargs): return HttpResponse("POST") def put(self,request,*args,**kwargs): return HttpResponse("PUT") def delete(self,request,*args,**kwargs): return HttpResponse("DELETE") class getresult(View): def get(self,request,*args,**kwargs): id = str(request.GET.get('id')) print(id) t1 = res[id].get() print(t1) return HttpResponse(t1)
快速開始
1.python manage.py runserver 8080 (啟動django)
2.celery -A celery_app worker --loglevel=info -P eventlet (啟動eventlet) 注意此條命令的執行位置,在celery_app上一級目錄下執行
其他常用參數
celery -A proj worker --loglevel=INFO -P prefork --concurrency=10 -n worker1
celery -A proj worker --loglevel=INFO --P eventlet -c 100 -n worker2
-P 指定運行模式 建議使用eventlet和prefork
--concurrency或者-C 指定eventlet池或者進程池大小(默認進程數為cpu核心數)
-n 指定worker名字
3.celery flower --broker=redis://xxx.xxx.xxx.xxx:6379/0 (啟動可視化監控)
celery flower --broker=amqp://guest:guest@localhost:5672// 或
celery flower --broker=redis://guest:guest@localhost:6379/0
4.在瀏覽器訪問http://127.0.0.1:8080/test/,提交任務。
5.查看celery shell窗口,如下:
6.訪問http://127.0.0.1:5555/tasks
多任務,多對列配置

from kombu import Queue BROKER_URL = 'redis://123.207.251.121:6379'# 指定 Broker CELERY_RESULT_BACKEND = 'redis://123.207.251.121:6379/0'# 指定 Backend CELERY_TIMEZONE='Asia/Shanghai'# 指定時區,默認是 UTC # CELERY_TIMEZONE='UTC' CELERY_IMPORTS = ( # 指定導入的任務模塊 'celery_app.task1' ) # 配置隊列 CELERY_QUEUES = ( Queue('default', routing_key='default'), Queue('q1', routing_key='testtask1'), Queue('q2', routing_key='testtask2'), ) # 路由(哪個任務放入哪個隊列) CELERY_ROUTES = { 'celery_app.task1.test_add': {'queue': 'q1', 'routing_key': 'testtask1'}, 'celery_app.task1.test_mul': {'queue': 'q2', 'routing_key': 'testtask2'}, }

from celery_app import app @app.task() def test_add(a, b): return 'hello world: %i' % (a + b) @app.task() def test_mul(a, b): return 'hello world: %i' % (a * b)

"""Django_Celery URL Configuration The `urlpatterns` list routes URLs to views. For more information please see: https://docs.djangoproject.com/en/3.0/topics/http/urls/ Examples: Function views 1. Add an import: from my_app import views 2. Add a URL to urlpatterns: path('', views.home, name='home') Class-based views 1. Add an import: from other_app.views import Home 2. Add a URL to urlpatterns: path('', Home.as_view(), name='home') Including another URLconf 1. Import the include() function: from django.urls import include, path 2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) """ from django.contrib import admin from django.urls import path from djocelery.views import test_Add,getresult,test_Mul urlpatterns = [ path('admin/', admin.site.urls), path('test_add/',test_Add.as_view()), path('test_mul/',test_Mul.as_view()), path('getresult/',getresult.as_view()) ]
重新啟動flower:顯示設置隊列
Flower使用介紹
原文鏈接:https://www.jianshu.com/p/4a408657ef76
官方文檔:https://flower-docs-cn.readthedocs.io/zh/latest/
Dashboard
Dashboard 頁面是展示異步任務隊列的主要情況,該頁面包括如下幾種狀態的任務:
- Active: 表示worker從隊列中獲取到任務,且正在執行的任務數
- Processed: 表示worker從隊列中獲取到任務的總數據量
- Failed: 表示worker從隊列中獲取到任務,且執行失敗了的(異常退出)
- Successed: 表示worker從隊列中獲取到任務,且執行成功了的
- Retried: 表示worker從隊列中獲取到任務,因為一些其他原因重新執行的數量
所以,上述這些數量的關系如下:
Processed = Active + Received + Failed + Successed + Retried
其中 Received 表示該任務分配到了worker中,但是還沒有被處理的任務數量

Worker Name 表示的是執行celery任務的worker名稱;
Status 表示的是該worker的狀態,包括 Online (在線) 、 Offline(離線),重啟flower進程,即可將Offline狀態的worker剔除掉;
Active / Processed / Failed / Successed / Retried 分別表示該worker正在執行的任務數、該worker處理的總任務數、處理失敗的任務數、處理成功的任務數、重試的任務數;
Load Average 表示系統在 1min / 5min / 15min 內的CPU平均負載(百分比)
Tasks
Tasks 頁面是展示所有worker接收到的任務的處理情況。下面對該表格中的做一些介紹

- Name: 表示該任務的名稱,默認規則為該函數的路徑規則,例如
{模塊名}.{文件名}.{函數名}
- UUID: 表示一個唯一字符串ID用於表示該任務
- State: 表示該任務的狀態,包括: SUCCESS / FAILURE / STARTED / RECEIVED
- SUCCESS 表示該任務執行完畢且成功
- FAILURE 表示該任務執行失敗
- STARTED 表示該任務正在執行
- RECEIVED 表示該任務在worker中,只是被接收而已
- args: 表示該任務的列表參數
- kwargs: 表示該任務的字典參數
- Result: 表示該任務函數的返回結果
- Received: 表示該任務在worker接收到的時間
- Started: 表示該任務在worker開始執行的時間
- Runtime: 表示該任務在worker真正執行的耗時(單位:秒)
- Worker: 表示該任務所在的worker名稱
Broker
Broker 頁面展示的是celery連接消息隊列的信息,包括消息隊列的訪問URL,下面的截圖展示的是鏈接的RabbitMQ,當然也可以鏈接Redis等。

- Name: 表示隊列的名稱
- Messages: 表示該隊列的消息數量
- Unacked: 表示該隊列中還沒收到ack確認的消息數量(該消息可能處於 RECEIVED 或是 STARTED)
- Ready: 表示該隊列中還未分配到worker的消息數量
- Consumers: 表示消費者數量(即worker數量)
- Idle since: 表示該隊列空閑的最初時間,否則為 N/A
Monitor
Monitor 頁面展示的是celery后台任務的曲線展示狀況。
