Celery-4.1 用戶指南: Monitoring and Management Guide (監測和管理指南)


簡介


Celery 提供了監控和探查celery集群的工具。

這篇文檔描述了一些工具,以及與監控相關的一些特性,例如事件和廣播命令。

工作單元


命令行管理工具(inspect/control)


Celery 可以用來探查和管理工作單元節點(以及一定程度上對任務管理)。

列出所有可用的命令:

$ celery help

或者對指定的命令獲取幫助:

$ celery <command> --help

命令


  • shell: 進入一個 Python shell
    本地環境將包含 celery 變量:這是當前的應用實例。另外,所有已知的任務會自動添加到本地環境中(除非 --without-tasks 標記被設置)。

celery 將按 Ipython, bpython, 以及常規 python 的順序尋找交互式解釋器。使用 --ipython, --bpython, --python 選項,你可以強制使用一中實現。

  • status: 列出集群中的活動節點
$ celery -A proj status
  • result: 顯示一個任務的結果
$ celery -A proj result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577

注意只要任務沒有使用一個自定義的存儲后端,你可以忽略任務的名稱。

  • purge: 清除所有配置的任務隊列的任務
    這個命令將清除在 CELERY_QUEUES 設置的隊列中的所有消息

告警:
這個操作是無法取消的,消息將永久被刪除!

$ celery -A proj purge

你可以使用 -Q 選項聲明要執行清除操作的隊列:

$ celery -A proj purge -Q celery,foo,bar

或者使用 -X 選項聲明不會被清除的隊列:

$ celery -A proj purge -X celery
  • inspect active: 列出活動任務
$ celery -A proj inspect active

這是當前正在執行的所有任務。

  • inspect scheduled: 列出被調度的ETA任務

這是當任務設置了 eta 或者 countdown 參數時工作單元預留的任務。

  • inspect reserved: 列出預留的任務
    這是被工作單元獲取並正在等待被執行的任務(不包含帶有 ETA 設置的任務)。

  • inspect revoked: 列出被取消的任務的歷史

$ celery -A proj inspect revoked
  • inspect registered: 列出注冊過的任務
$ celery -A proj inspect registered
  • inspect stats: 顯示統計信息 (查看統計信息這一節)
$ celery -A proj inspect stats
  • inspect query_task: 根據任務 ID 顯示任務信息

任何包含預留的或者正在執行的指定任務id的工作單元將回復任務狀態和信息

$ celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8

你還可以一次詢問多個任務:

$ celery -A proj inspect query_task id1 id2 ... idN
  • control enable_events: 啟用事件
$ celery -A proj control enable_events
  • control disable_events: 禁用事件
$ celery -A proj control disable_events

遷移:將任務從一個消息中間件遷移到另一個消息中間件(實驗特性)

$ celery -A proj migrate redis://localhost amqp://localhost

這個命令會將所有任務從一個消息中間件遷移到另一個消息中間件。因為這個命令是新的並且是實驗性的,所以你在操作前確保數據有備份。

注意:
所有的 inspectcontrol 命令都支持 --timeout 參數,這表示等待回復的超時時間。如果由於延遲不能按時得到回復你可能需要增加超時時間。

聲明目的節點


默認情況下,inspectcontrol 命令將發送給所有工作單元。你可以通過 --destination 參數聲明一個或者多個工作單元。

$ celery -A proj inspect -d w1@e.com,w2@e.com reserved

$ celery -A proj control -d w1@e.com,w2@e.com enable_events

Flower: Celery 實時web監控


Flower 是 celery 的一個基於 web 的實時監控和管理工具。它正在活躍的開發中,但是已經是一個必不可少的工具。作為 celery 推薦的監控工具,它廢棄了 Django-Admin 監控、celerymon 以及基於 ncurses 的監控。

Flower 的發音就像 “flow”,但是如果你願意你還可以使用植物版的flower。

特性


  • 使用 celery 事件實時監控

    • 任務進度和歷史
    • 顯示任務詳細信息 (參數, 開始時間, 運行時間, 以及更多)
    • 圖和統計信息
  • 遠程控制

    • 查看工作單元狀態和統計信息
    • 關閉和重啟工作單元實例
    • 控制工作單元池大小和自動擴展設置
    • 查看和修改工作單元獲取消息的隊列
    • 查看當前執行的任務
    • 查看被調度的任務(ETA/countdown)
    • 查看預留的任務和被取消的任務
    • 應用時間和速率限制
    • 配置查看器
    • 取消和中止任務
  • HTTP API

    • 列出工作單元
    • 關閉工作單元
    • 重啟工作單元池
    • 擴大工作單元池
    • 縮小工作單元池
    • 自動擴展工作單元池
    • 開始從一個隊列消費
    • 停止從一個隊列消費
    • 列出任務
    • 列出任務類型
    • 獲取一個任務的信息
    • 執行一個任務
    • 根據名稱執行一個任務
    • 獲取任務結果
    • 獲取一個任務的軟時間限制和硬時間限制
    • 修改一個任務的速率
    • 取消一個任務
  • OpenID 授權驗證

屏幕截圖
這里寫圖片描述
這里寫圖片描述

使用


你可以使用pip安裝Flower:

$pip install flower

運行flower命令會啟動一個web服務器,你可以訪問它:

$ celery -A proj flower

默認的端口是 http://localhost:5555,但是你可以通過 --port 參數修改:

$ celery -A proj flower --port=5555

消息中間件可以通過 --broker 參數聲明:

$ celery flower --broker=amqp://guest:guest@localhost:5672//
or
$ celery flower --broker=redis://guest:guest@localhost:6379/0

此時,你可以在web瀏覽器中訪問flower:

$ open http://localhost:5555

Flower 有比在這描述的更多的特性,包括授權認證選項。查看官方文檔獲取更多的信息。

celery 事件:Curses 監控


2.0 版本新特性。

celery events 是一個顯示任務和工作單元歷史的字符界面監控。你可以查看任務的結果和跟蹤信息,並且它還支持一些管理命令,如速率限制、關閉工作單元。這個監控是作為概念的驗證開啟的,並且你也許更想使用 flower。

啟動:

$ celery -A proj events

你應該可以看到類似如下的屏幕:
這里寫圖片描述

celery events 還用來開啟快照相機(查看 Snapshot 這一節):

$ celery -A proj events --camera=<camera-class> --frequency=1.0

並且它還包含一個工具可以將事件dump到標准輸出:

$ celery -A proj events --dump

獲取可用選項的完整列表,使用 --help:

$ celery events --help

RabbitMQ


要管理一個 Celery 集群,知道如何監控 RabbitMQ 非常重要。

RabbitMQ 帶 rabbitmqctl(1) 命令,使用這個命令你可以列出隊列、消息交換器、綁定、隊列長度、每個隊列使用的內存,以及管理用戶、虛擬主機和他們的權限。

注意:
默認的虛擬主機(”/”)在這些示例中使用,如果你要使用一個定制化的虛擬主機,你應該添加 -p參數,例如: rabbitmqctl list_queues -p my_vhost ...

Inspecting queues


查找隊列中任務的數量:

$ rabbitmqctl list_queues name messages messages_ready \
                          messages_unacknowledged

這里 messages_ready 是准備遞送的消息的數量(已經發送但是還沒有被接收),messages_unacknowledged是已經被工作單元接收但是還沒有被確認的消息數量(意味它正在進行中,或者已經被預留)。messages是正在執行和沒有確認消息總數。

查找從一個隊列中消費消息的工作單元的數量:

$ rabbitmqctl list_queues name ,consumers

查看為一個隊列分配的總內存:

$ rabbitmqctl list_queues name memory

提示:給 rabbitmqctl 命令添加 -q 選項使得輸出更容易被解析。

Redis


如果你使用 Redis 作為消息中間件,你可以使用 redis-cli(1) 命令監控 Celery 集群。

Inspecting queues


查找隊列中的任務數量:

$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

默認隊列的名稱是 celery。獲取所有的可用隊列,調用:

$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER keys \*

注意:
   只有在隊列中有任務時隊列鍵才會存在,所以如果一個隊列鍵不存在,這意味着這個隊列中沒有任何消息。這是因為在Redis中,如果一個列表中沒有任何元素,這個列表將自動被刪除,因此在命令行輸出中不會有相應的鍵,並且這個列表的 llen 值返回 0。

  另外,如果你還出於其他原因使用 Redis,keys 命令的輸出將包含存儲在數據庫中的不相關的值。推薦的方式是為 Celery 指定一個專用DATABASE_NUMBER,你還可以使用數據庫編號隔離不同的 celery 應用(虛擬主機),但是這不會影響到例如被Flower使用的監控事件,因為Redis pub/sub 命令是全局的而不是基於數據庫的。

Munin


下列是一些知名的能用來監控Celery集群的Munin插件:

事件


工作單元可以在一些事件發生時發送一個消息。這些事件之后被類似 Flower、celery events 的工具捕獲到用來監控集群。

快照


2.1版本新特性。

即使是一個工作單元也能產生大量的事件,所以在硬盤上存儲所有事件的歷史是非常昂貴的。

快照是在一個時間段的一系列描述集群狀態的事件,通過周期性的執行快照你可以保存所有的歷史,但是只需要周期性的寫到硬盤。

你需要一個照相機類來執行快照,在這個類中你可以定義每次狀態被捕獲時將發生什么;你可以將它寫道一個數據庫,通過郵件發送或者其他操作。

celery events 用來使用照相機進行快照,例如你想使用 myapp.Camera 照相機每2秒中獲取一個狀態快照,你可以使用如下參數運行 celery events:

$ celery -A proj events -c myapp.Camera --frequency=2.0

Custom Camera


如果你想捕獲事件並且定時針對這些事件作一些操作,照相機將非常有用。對於實時事件處理,你可以直接使用 app.events.Receiver,就像實時處理這一節。

下面是一個照相機的示例,將快照dump到屏幕:

from pprint import pformat

from celery.events.snapshot import Polaroid

class DumpCam(Polaroid):
    clear_after = True  # clear after flush (incl, state.event_count).

    def on_shutter(self, state):
        if not state.event_count:
            # No new events since last snapshot.
            return
        print('Workers: {0}'.format(pformat(state.workers, indent=4)))
        print('Tasks: {0}'.format(pformat(state.tasks, indent=4)))
        print('Total: {0.event_count} events, {0.task_count} tasks'.format(state))

查看 celery.events.state 的API獲取更多關於狀態對象的信息。

現在,你可以通過給 celery events 命令聲明 -c 選項使用這個照相機:

$ celery -A proj events -c myapp.DumpCam --frequency=2.0

或者你可以在程序中如下使用:

from celery import Celery
from myapp import DumpCam

def main(app, freq=1.0):
    state = app.events.State()
    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={'*': state.event})
        with DumpCam(state, freq=freq):
            recv.capture(limit=None, timeout=None)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    main(app)

實時處理


實時處理事件,你需要:

  • 一個事件消費者(這里是 Receiver

  • 當事件發生時將被調用的一個處理函數的集合
    對每個事件類型,你可以有不同的處理函數,后者捕獲所有事件可以寫成 *

  • 狀態(可選)
    app.events.State 是任務和工作單元內存中表示的一種方便的手段,它隨着事件到來不斷更新。

它為很多通用的東西包裝了解決方案,如檢查一個工作單元是否還活着(通過驗證心跳),隨着事件到來將事件字段合並,確保時間戳同步等等。

組合這些,你可以很容易實時處理事件:

from celery import Celery


def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

注意:
capture函數的wakeup參數使得強制讓所有的工作單元發送一個心跳。當這個監控開始,你可以立馬看到工作單元。

你可以通過聲明處理函數來監聽指定的事件:

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

Event Reference


這個列表包含工作單元發送的事件以及事件的參數

任務事件


task-sent


signature: task-sent(uuid, name, args, kwargs, retries, eta, expires, queue, exchange, routing_key, root_id, parent_id)
如果使能了 task_send_sent_event 設置,那么當有任務消息發布時發送該事件。

task-received


signature: task-received(uuid, name, args, kwargs, retries, eta, hostname, timestamp, root_id, parent_id)
當工作單元接收到一個任務時發送該事件。

task-started


signature: task-started(uuid, hostname, timestamp, pid)
工作單元將要執行任務之前

task-succeeded


signature: task-succeeded(uuid, result, runtime, hostname, timestamp)
任務執行成功

Run-time 是指使用池運行使用所花費的時間。(從任務發送到工作單元池,到池結果處理回調函數被調用)

task-failed


signature: task-failed(uuid, exception, traceback, hostname, timestamp)
任務執行失敗時

task-rejected


signature: task-rejected(uuid, requeued)
任務被工作單元拒絕,可能被重新入隊或者移除到死信隊列

task-revoked


signature: task-revoked(uuid, terminated, signum, expired)
任務被取消時(注意這可能被多個工作單元發送).

如果任務進程被中止,將terminated設置為真,並且 signum 字段設置成使用的信號。如果任務過期, expired設置成真。

task-retried


signature: task-retried(uuid, exception, traceback, hostname, timestamp)
任務失敗,但是將在將來重試

工作單元事件


worker-online


signature: worker-online(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)
工作單元已經連接到消息中間件,並且在線

hostname: 工作單元的節點名稱
timestamp: 事件時間戳
freq: 心跳頻率,以秒為單位 (float)
sw_ident: 工作單元軟件名稱 (例如, py-celery)
sw_ver: 軟件版本 (例如, 2.2.0)
sw_sys: 操作系統 (例如, Linux/Darwin)

woker-heartbeat


signature: worker-heartbeat(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys, active, processed)
每分鍾發送一次,如果工作單元在兩分鍾內沒有發送心跳,它將被認為離線。

hostname: 工作單元的節點名稱
timestamp: 事件時間戳
freq: 心跳頻率,以秒為單位 (float)
sw_ident: 工作單元軟件名稱 (例如, py-celery)
sw_ver: 軟件版本 (例如, 2.2.0)
sw_sys: 操作系統 (例如, Linux/Darwin)
active: 當前執行的任務數量
processed: 被工作單元處理的總任務數量

woker-offline


signature: worker-offline(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)
工作單元與消息中間件斷開連接。

 

轉自:https://blog.csdn.net/libing_thinking/article/details/78592801


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM