Python Django +Celery +flower


 

准備工作

1.創建django項目,添加應用到setting文件

2.pip安裝celery + eventlet + flower

3.文件目錄如下:

 

 4.文件配置如下

celery_app目錄下:

# -*- coding: utf-8 -*-
from celery import Celery
app = Celery('demo')# 創建 Celery 實例
app.config_from_object('celery_app.celeryconfig')# 通過 Celery 實例加載配置模塊
__init__.py文件
BROKER_URL = 'redis://xxx.xxx.xxx.xxx:6379'# 指定 Broker
CELERY_RESULT_BACKEND = 'redis://xxx.xxx.xxx.xxx:6379/0'# 指定 Backend
CELERY_TIMEZONE='Asia/Shanghai'# 指定時區,默認是 UTC
# CELERY_TIMEZONE='UTC'                            
CELERY_IMPORTS = (
    # 指定導入的任務模塊
    'celery_app.task1'
)
celeryconfig.py
from celery_app import app
@app.task()
def add(a, b):
    return 'hello world: %i' % (a + b)
task1.py

Django_Celery目錄下:

from django.contrib import admin
from django.urls import path
from djocelery.views import test,getresult
urlpatterns = [
    path('admin/', admin.site.urls),
    path('test/',test.as_view()),
    path('getresult/',getresult.as_view())
]
urls.py文件

djocelery目錄下:

from django.shortcuts import HttpResponse
from django.views import View
from celery_app.task1 import add
# Create your views here.

res = {}
class test(View):
    def get(self,request,*args,**kwargs):
        a,b = 1,2
        result = add.apply_async((a, b))
        res[str(result.id)] = result
        return HttpResponse(result.id)
    def post(self,request,*args,**kwargs):
        return HttpResponse("POST")
    def put(self,request,*args,**kwargs):

        return HttpResponse("PUT")
    def delete(self,request,*args,**kwargs):
        return HttpResponse("DELETE")


class getresult(View):
    def get(self,request,*args,**kwargs):
        id = str(request.GET.get('id'))
        print(id)
        t1 = res[id].get()
        print(t1)
        return HttpResponse(t1)
views.py文件

快速開始

1.python manage.py runserver 8080 (啟動django)

2.celery -A celery_app worker --loglevel=info -P eventlet (啟動eventlet) 注意此條命令的執行位置,在celery_app上一級目錄下執行

其他常用參數

celery -A proj worker --loglevel=INFO -P prefork  --concurrency=10  -n worker1
celery -A proj worker --loglevel=INFO --P eventlet -c 100  -n worker2
-P 指定運行模式 建議使用eventlet和prefork
--concurrency或者-C  指定eventlet池或者進程池大小(默認進程數為cpu核心數)
-n 指定worker名字

3.celery flower --broker=redis://xxx.xxx.xxx.xxx:6379/0  (啟動可視化監控)

celery flower --broker=amqp://guest:guest@localhost:5672//  或
celery flower --broker=redis://guest:guest@localhost:6379/0

4.在瀏覽器訪問http://127.0.0.1:8080/test/,提交任務。

5.查看celery shell窗口,如下:

 

6.訪問http://127.0.0.1:5555/tasks

 

多任務,多對列配置

from kombu import Queue
BROKER_URL = 'redis://123.207.251.121:6379'# 指定 Broker
CELERY_RESULT_BACKEND = 'redis://123.207.251.121:6379/0'# 指定 Backend
CELERY_TIMEZONE='Asia/Shanghai'# 指定時區,默認是 UTC
# CELERY_TIMEZONE='UTC'                            
CELERY_IMPORTS = (
    # 指定導入的任務模塊
    'celery_app.task1'
)

# 配置隊列
CELERY_QUEUES = (
    Queue('default', routing_key='default'),
    Queue('q1', routing_key='testtask1'),
    Queue('q2',  routing_key='testtask2'),
)

# 路由(哪個任務放入哪個隊列)
CELERY_ROUTES = {
    'celery_app.task1.test_add': {'queue': 'q1', 'routing_key': 'testtask1'},
    'celery_app.task1.test_mul': {'queue': 'q2', 'routing_key': 'testtask2'},
}
celeryconfig.py
from celery_app import app
@app.task()
def test_add(a, b):
    return 'hello world: %i' % (a + b)


@app.task()
def test_mul(a, b):
    return 'hello world: %i' % (a * b)
task1.py
"""Django_Celery URL Configuration

The `urlpatterns` list routes URLs to views. For more information please see:
    https://docs.djangoproject.com/en/3.0/topics/http/urls/
Examples:
Function views
    1. Add an import:  from my_app import views
    2. Add a URL to urlpatterns:  path('', views.home, name='home')
Class-based views
    1. Add an import:  from other_app.views import Home
    2. Add a URL to urlpatterns:  path('', Home.as_view(), name='home')
Including another URLconf
    1. Import the include() function: from django.urls import include, path
    2. Add a URL to urlpatterns:  path('blog/', include('blog.urls'))
"""
from django.contrib import admin
from django.urls import path
from djocelery.views import test_Add,getresult,test_Mul
urlpatterns = [
    path('admin/', admin.site.urls),
    path('test_add/',test_Add.as_view()),
    path('test_mul/',test_Mul.as_view()),
    path('getresult/',getresult.as_view())
]
views.py文件

重新啟動flower:顯示設置隊列

 

 

 

 

Flower使用介紹

原文鏈接:https://www.jianshu.com/p/4a408657ef76

官方文檔:https://flower-docs-cn.readthedocs.io/zh/latest/

Dashboard

Dashboard 頁面是展示異步任務隊列的主要情況,該頁面包括如下幾種狀態的任務:

  • Active: 表示worker從隊列中獲取到任務,且正在執行的任務數
  • Processed: 表示worker從隊列中獲取到任務的總數據量
  • Failed: 表示worker從隊列中獲取到任務,且執行失敗了的(異常退出)
  • Successed: 表示worker從隊列中獲取到任務,且執行成功了的
  • Retried: 表示worker從隊列中獲取到任務,因為一些其他原因重新執行的數量

所以,上述這些數量的關系如下:
Processed = Active + Received + Failed + Successed + Retried
其中 Received 表示該任務分配到了worker中,但是還沒有被處理的任務數量

 
Dashboard頁面

Worker Name 表示的是執行celery任務的worker名稱;
Status 表示的是該worker的狀態,包括 Online (在線) 、 Offline(離線),重啟flower進程,即可將Offline狀態的worker剔除掉;
Active / Processed / Failed / Successed / Retried 分別表示該worker正在執行的任務數、該worker處理的總任務數、處理失敗的任務數、處理成功的任務數、重試的任務數;
Load Average 表示系統在 1min / 5min / 15min 內的CPU平均負載(百分比)

Tasks

Tasks 頁面是展示所有worker接收到的任務的處理情況。下面對該表格中的做一些介紹

 
Tasks頁面
  • Name: 表示該任務的名稱,默認規則為該函數的路徑規則,例如 {模塊名}.{文件名}.{函數名}
  • UUID: 表示一個唯一字符串ID用於表示該任務
  • State: 表示該任務的狀態,包括: SUCCESS / FAILURE / STARTED / RECEIVED
    • SUCCESS 表示該任務執行完畢且成功
    • FAILURE 表示該任務執行失敗
    • STARTED 表示該任務正在執行
    • RECEIVED 表示該任務在worker中,只是被接收而已
  • args: 表示該任務的列表參數
  • kwargs: 表示該任務的字典參數
  • Result: 表示該任務函數的返回結果
  • Received: 表示該任務在worker接收到的時間
  • Started: 表示該任務在worker開始執行的時間
  • Runtime: 表示該任務在worker真正執行的耗時(單位:秒)
  • Worker: 表示該任務所在的worker名稱

Broker

Broker 頁面展示的是celery連接消息隊列的信息,包括消息隊列的訪問URL,下面的截圖展示的是鏈接的RabbitMQ,當然也可以鏈接Redis等。

 
Broker頁面
  • Name: 表示隊列的名稱
  • Messages: 表示該隊列的消息數量
  • Unacked: 表示該隊列中還沒收到ack確認的消息數量(該消息可能處於 RECEIVED 或是 STARTED
  • Ready: 表示該隊列中還未分配到worker的消息數量
  • Consumers: 表示消費者數量(即worker數量)
  • Idle since: 表示該隊列空閑的最初時間,否則為 N/A

Monitor

Monitor 頁面展示的是celery后台任務的曲線展示狀況。

 


 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM