簡介
Celery 提供了監控和探查celery集群的工具。
這篇文檔描述了一些工具,以及與監控相關的一些特性,例如事件和廣播命令。
工作單元
命令行管理工具(inspect/control)
Celery 可以用來探查和管理工作單元節點(以及一定程度上對任務管理)。
列出所有可用的命令:
$ celery help
或者對指定的命令獲取幫助:
$ celery <command> --help
命令
- shell: 進入一個 Python shell
本地環境將包含celery
變量:這是當前的應用實例。另外,所有已知的任務會自動添加到本地環境中(除非--without-tasks
標記被設置)。
celery 將按 Ipython, bpython, 以及常規 python
的順序尋找交互式解釋器。使用 --ipython, --bpython, --python
選項,你可以強制使用一中實現。
- status: 列出集群中的活動節點
$ celery -A proj status
- result: 顯示一個任務的結果
$ celery -A proj result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577
注意只要任務沒有使用一個自定義的存儲后端,你可以忽略任務的名稱。
- purge: 清除所有配置的任務隊列的任務
這個命令將清除在CELERY_QUEUES
設置的隊列中的所有消息
告警:
這個操作是無法取消的,消息將永久被刪除!
$ celery -A proj purge
你可以使用 -Q
選項聲明要執行清除操作的隊列:
$ celery -A proj purge -Q celery,foo,bar
或者使用 -X
選項聲明不會被清除的隊列:
$ celery -A proj purge -X celery
- inspect active: 列出活動任務
$ celery -A proj inspect active
這是當前正在執行的所有任務。
- inspect scheduled: 列出被調度的ETA任務
這是當任務設置了 eta 或者 countdown 參數時工作單元預留的任務。
-
inspect reserved: 列出預留的任務
這是被工作單元獲取並正在等待被執行的任務(不包含帶有 ETA 設置的任務)。 -
inspect revoked: 列出被取消的任務的歷史
$ celery -A proj inspect revoked
- inspect registered: 列出注冊過的任務
$ celery -A proj inspect registered
- inspect stats: 顯示統計信息 (查看統計信息這一節)
$ celery -A proj inspect stats
- inspect query_task: 根據任務 ID 顯示任務信息
任何包含預留的或者正在執行的指定任務id的工作單元將回復任務狀態和信息
$ celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8
你還可以一次詢問多個任務:
$ celery -A proj inspect query_task id1 id2 ... idN
- control enable_events: 啟用事件
$ celery -A proj control enable_events
- control disable_events: 禁用事件
$ celery -A proj control disable_events
遷移:將任務從一個消息中間件遷移到另一個消息中間件(實驗特性)
$ celery -A proj migrate redis://localhost amqp://localhost
這個命令會將所有任務從一個消息中間件遷移到另一個消息中間件。因為這個命令是新的並且是實驗性的,所以你在操作前確保數據有備份。
注意:
所有的 inspect
和 control
命令都支持 --timeout
參數,這表示等待回復的超時時間。如果由於延遲不能按時得到回復你可能需要增加超時時間。
聲明目的節點
默認情況下,inspect
和 control
命令將發送給所有工作單元。你可以通過 --destination
參數聲明一個或者多個工作單元。
$ celery -A proj inspect -d w1@e.com,w2@e.com reserved $ celery -A proj control -d w1@e.com,w2@e.com enable_events
Flower: Celery 實時web監控
Flower 是 celery 的一個基於 web 的實時監控和管理工具。它正在活躍的開發中,但是已經是一個必不可少的工具。作為 celery 推薦的監控工具,它廢棄了 Django-Admin 監控、celerymon 以及基於 ncurses
的監控。
Flower 的發音就像 “flow”,但是如果你願意你還可以使用植物版的flower。
特性
-
使用 celery 事件實時監控
- 任務進度和歷史
- 顯示任務詳細信息 (參數, 開始時間, 運行時間, 以及更多)
- 圖和統計信息
-
遠程控制
- 查看工作單元狀態和統計信息
- 關閉和重啟工作單元實例
- 控制工作單元池大小和自動擴展設置
- 查看和修改工作單元獲取消息的隊列
- 查看當前執行的任務
- 查看被調度的任務(ETA/countdown)
- 查看預留的任務和被取消的任務
- 應用時間和速率限制
- 配置查看器
- 取消和中止任務
-
HTTP API
- 列出工作單元
- 關閉工作單元
- 重啟工作單元池
- 擴大工作單元池
- 縮小工作單元池
- 自動擴展工作單元池
- 開始從一個隊列消費
- 停止從一個隊列消費
- 列出任務
- 列出任務類型
- 獲取一個任務的信息
- 執行一個任務
- 根據名稱執行一個任務
- 獲取任務結果
- 獲取一個任務的軟時間限制和硬時間限制
- 修改一個任務的速率
- 取消一個任務
-
OpenID 授權驗證
屏幕截圖
使用
你可以使用pip安裝Flower:
$pip install flower
運行flower命令會啟動一個web服務器,你可以訪問它:
$ celery -A proj flower
默認的端口是 http://localhost:5555
,但是你可以通過 --port
參數修改:
$ celery -A proj flower --port=5555
消息中間件可以通過 --broker
參數聲明:
$ celery flower --broker=amqp://guest:guest@localhost:5672// or $ celery flower --broker=redis://guest:guest@localhost:6379/0
此時,你可以在web瀏覽器中訪問flower:
$ open http://localhost:5555
Flower 有比在這描述的更多的特性,包括授權認證選項。查看官方文檔獲取更多的信息。
celery 事件:Curses 監控
2.0 版本新特性。
celery events 是一個顯示任務和工作單元歷史的字符界面監控。你可以查看任務的結果和跟蹤信息,並且它還支持一些管理命令,如速率限制、關閉工作單元。這個監控是作為概念的驗證開啟的,並且你也許更想使用 flower。
啟動:
$ celery -A proj events
你應該可以看到類似如下的屏幕:
celery events 還用來開啟快照相機(查看 Snapshot 這一節):
$ celery -A proj events --camera=<camera-class> --frequency=1.0
並且它還包含一個工具可以將事件dump到標准輸出:
$ celery -A proj events --dump
獲取可用選項的完整列表,使用 --help
:
$ celery events --help
RabbitMQ
要管理一個 Celery 集群,知道如何監控 RabbitMQ 非常重要。
RabbitMQ 帶 rabbitmqctl(1)
命令,使用這個命令你可以列出隊列、消息交換器、綁定、隊列長度、每個隊列使用的內存,以及管理用戶、虛擬主機和他們的權限。
注意:
默認的虛擬主機(”/”)在這些示例中使用,如果你要使用一個定制化的虛擬主機,你應該添加 -p
參數,例如: rabbitmqctl list_queues -p my_vhost ...
Inspecting queues
查找隊列中任務的數量:
$ rabbitmqctl list_queues name messages messages_ready \ messages_unacknowledged
這里 messages_ready
是准備遞送的消息的數量(已經發送但是還沒有被接收),messages_unacknowledged
是已經被工作單元接收但是還沒有被確認的消息數量(意味它正在進行中,或者已經被預留)。messages
是正在執行和沒有確認消息總數。
查找從一個隊列中消費消息的工作單元的數量:
$ rabbitmqctl list_queues name ,consumers
查看為一個隊列分配的總內存:
$ rabbitmqctl list_queues name memory
提示:給 rabbitmqctl
命令添加 -q
選項使得輸出更容易被解析。
Redis
如果你使用 Redis 作為消息中間件,你可以使用 redis-cli(1)
命令監控 Celery 集群。
Inspecting queues
查找隊列中的任務數量:
$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
默認隊列的名稱是 celery。獲取所有的可用隊列,調用:
$ redis-cli -h HOST -p PORT -n DATABASE_NUMBER keys \*
注意:
只有在隊列中有任務時隊列鍵才會存在,所以如果一個隊列鍵不存在,這意味着這個隊列中沒有任何消息。這是因為在Redis中,如果一個列表中沒有任何元素,這個列表將自動被刪除,因此在命令行輸出中不會有相應的鍵,並且這個列表的 llen
值返回 0。
另外,如果你還出於其他原因使用 Redis,keys
命令的輸出將包含存儲在數據庫中的不相關的值。推薦的方式是為 Celery 指定一個專用的DATABASE_NUMBER
,你還可以使用數據庫編號隔離不同的 celery 應用(虛擬主機),但是這不會影響到例如被Flower使用的監控事件,因為Redis pub/sub 命令是全局的而不是基於數據庫的。
Munin
下列是一些知名的能用來監控Celery集群的Munin插件:
-
rabbitmq-munin: RabbitMQ 的 Munin 插件
https://github.com/ask/rabbitmq-munin -
celery_tasks: 監控每種類型的任務被執行的次數(需要 celerymon)
http://exchange.munin-monitoring.org/plugins/celery_tasks-2/details -
celery_task_states:監控在每個狀態的任務的數量
http://exchange.munin-monitoring.org/plugins/celery_tasks/details
事件
工作單元可以在一些事件發生時發送一個消息。這些事件之后被類似 Flower、celery events 的工具捕獲到用來監控集群。
快照
2.1版本新特性。
即使是一個工作單元也能產生大量的事件,所以在硬盤上存儲所有事件的歷史是非常昂貴的。
快照是在一個時間段的一系列描述集群狀態的事件,通過周期性的執行快照你可以保存所有的歷史,但是只需要周期性的寫到硬盤。
你需要一個照相機類來執行快照,在這個類中你可以定義每次狀態被捕獲時將發生什么;你可以將它寫道一個數據庫,通過郵件發送或者其他操作。
celery events 用來使用照相機進行快照,例如你想使用 myapp.Camera
照相機每2秒中獲取一個狀態快照,你可以使用如下參數運行 celery events:
$ celery -A proj events -c myapp.Camera --frequency=2.0
Custom Camera
如果你想捕獲事件並且定時針對這些事件作一些操作,照相機將非常有用。對於實時事件處理,你可以直接使用 app.events.Receiver
,就像實時處理這一節。
下面是一個照相機的示例,將快照dump到屏幕:
from pprint import pformat from celery.events.snapshot import Polaroid class DumpCam(Polaroid): clear_after = True # clear after flush (incl, state.event_count). def on_shutter(self, state): if not state.event_count: # No new events since last snapshot. return print('Workers: {0}'.format(pformat(state.workers, indent=4))) print('Tasks: {0}'.format(pformat(state.tasks, indent=4))) print('Total: {0.event_count} events, {0.task_count} tasks'.format(state))
查看 celery.events.state
的API獲取更多關於狀態對象的信息。
現在,你可以通過給 celery events 命令聲明 -c
選項使用這個照相機:
$ celery -A proj events -c myapp.DumpCam --frequency=2.0
或者你可以在程序中如下使用:
from celery import Celery from myapp import DumpCam def main(app, freq=1.0): state = app.events.State() with app.connection() as connection: recv = app.events.Receiver(connection, handlers={'*': state.event}) with DumpCam(state, freq=freq): recv.capture(limit=None, timeout=None) if __name__ == '__main__': app = Celery(broker='amqp://guest@localhost//') main(app)
實時處理
實時處理事件,你需要:
-
一個事件消費者(這里是
Receiver
) -
當事件發生時將被調用的一個處理函數的集合
對每個事件類型,你可以有不同的處理函數,后者捕獲所有事件可以寫成*
-
狀態(可選)
app.events.State
是任務和工作單元內存中表示的一種方便的手段,它隨着事件到來不斷更新。
它為很多通用的東西包裝了解決方案,如檢查一個工作單元是否還活着(通過驗證心跳),隨着事件到來將事件字段合並,確保時間戳同步等等。
組合這些,你可以很容易實時處理事件:
from celery import Celery def my_monitor(app): state = app.events.State() def announce_failed_tasks(event): state.event(event) # task name is sent only with -received event, and state # will keep track of this for us. task = state.tasks.get(event['uuid']) print('TASK FAILED: %s[%s] %s' % ( task.name, task.uuid, task.info(),)) with app.connection() as connection: recv = app.events.Receiver(connection, handlers={ 'task-failed': announce_failed_tasks, '*': state.event, }) recv.capture(limit=None, timeout=None, wakeup=True) if __name__ == '__main__': app = Celery(broker='amqp://guest@localhost//') my_monitor(app)
注意:
capture
函數的wakeup
參數使得強制讓所有的工作單元發送一個心跳。當這個監控開始,你可以立馬看到工作單元。
你可以通過聲明處理函數來監聽指定的事件:
from celery import Celery def my_monitor(app): state = app.events.State() def announce_failed_tasks(event): state.event(event) # task name is sent only with -received event, and state # will keep track of this for us. task = state.tasks.get(event['uuid']) print('TASK FAILED: %s[%s] %s' % ( task.name, task.uuid, task.info(),)) with app.connection() as connection: recv = app.events.Receiver(connection, handlers={ 'task-failed': announce_failed_tasks, }) recv.capture(limit=None, timeout=None, wakeup=True) if __name__ == '__main__': app = Celery(broker='amqp://guest@localhost//') my_monitor(app)
Event Reference
這個列表包含工作單元發送的事件以及事件的參數
任務事件
task-sent
signature: task-sent(uuid, name, args, kwargs, retries, eta, expires, queue, exchange, routing_key, root_id, parent_id)
如果使能了 task_send_sent_event 設置,那么當有任務消息發布時發送該事件。
task-received
signature: task-received(uuid, name, args, kwargs, retries, eta, hostname, timestamp, root_id, parent_id)
當工作單元接收到一個任務時發送該事件。
task-started
signature: task-started(uuid, hostname, timestamp, pid)
工作單元將要執行任務之前
task-succeeded
signature: task-succeeded(uuid, result, runtime, hostname, timestamp)
任務執行成功
Run-time
是指使用池運行使用所花費的時間。(從任務發送到工作單元池,到池結果處理回調函數被調用)
task-failed
signature: task-failed(uuid, exception, traceback, hostname, timestamp)
任務執行失敗時
task-rejected
signature: task-rejected(uuid, requeued)
任務被工作單元拒絕,可能被重新入隊或者移除到死信隊列
task-revoked
signature: task-revoked(uuid, terminated, signum, expired)
任務被取消時(注意這可能被多個工作單元發送).
如果任務進程被中止,將terminated
設置為真,並且 signum
字段設置成使用的信號。如果任務過期, expired
設置成真。
task-retried
signature: task-retried(uuid, exception, traceback, hostname, timestamp)
任務失敗,但是將在將來重試
工作單元事件
worker-online
signature: worker-online(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)
工作單元已經連接到消息中間件,並且在線
hostname: 工作單元的節點名稱
timestamp: 事件時間戳
freq: 心跳頻率,以秒為單位 (float)
sw_ident: 工作單元軟件名稱 (例如, py-celery)
sw_ver: 軟件版本 (例如, 2.2.0)
sw_sys: 操作系統 (例如, Linux/Darwin)
woker-heartbeat
signature: worker-heartbeat(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys, active, processed)
每分鍾發送一次,如果工作單元在兩分鍾內沒有發送心跳,它將被認為離線。
hostname: 工作單元的節點名稱
timestamp: 事件時間戳
freq: 心跳頻率,以秒為單位 (float)
sw_ident: 工作單元軟件名稱 (例如, py-celery)
sw_ver: 軟件版本 (例如, 2.2.0)
sw_sys: 操作系統 (例如, Linux/Darwin)
active: 當前執行的任務數量
processed: 被工作單元處理的總任務數量
woker-offline
signature: worker-offline(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)
工作單元與消息中間件斷開連接。
轉自:https://blog.csdn.net/libing_thinking/article/details/78592801