背景
Celery 是一個強大的分布式異步任務處理和調度框架。基本上 Python 項目的異步任務,定時任務首先處理框架就是 Celery. 正因為 Celery 的處理時異步的並且是分布式的,當任務出現問題時追蹤和調查就不是很容易。官方提供了 flower 可以用來查看任務的執行情況和執行時間。flower 帶來的任務監控能力十分有限,最主要的是沒有全局性的統計功能,只能查看單個任務;再者支持查找的時間段很短。隨着我們異步任務量的和重要性的增加,增強對 celery 的監控變得十分必要。
監控方案
celery signal + statsd + graphite + grafana
- 優勢 指標采集比較角度,利用 celery 提供的 hook就能做到
- 劣勢
需要部署新的組件 graphite, 具有一定的學習成本
原理介紹
- Graphite Graphite是一個開源實時的、顯示時間序列度量數據的圖形系統。Graphite並不收集度量數據本身,而是像一個數據庫,通過其后端接收度量數據,然后以實時方式查詢、轉換、組合這些度量數據。Graphite支持內建的Web界面,它允許用戶瀏覽度量數據和圖。 三個主要模塊
- Carbon (監控數據的 Twisted 守護進程)
Carbon是基於Twisted實現,是Graphite的后端實現。 Carbon的主要作用,是接收被監控節點的連接,收集各個指標的數據,將這些數據寫入緩存並最終持久化到whisper存儲文件中去。Carbonr能保證Graphite web 繪制出實時接到的指標更新,其原理也很簡單位,有點類似lucence,carbon接收到的數據會先存在緩存中,然后再一起寫入whisper的硬盤存儲。Graphite web通過向carbon-cache發起請求,會同時查詢位於緩存與硬盤中的數據。 - Whisper
whisper 是一個固定大小的數據庫,在設計上類似於RRD(round-robin-database)。它可以為隨時間不斷變化的數值型數據提供快速,可靠的存儲。Whisper還可以把高精度的指標數據轉換成低精度的指標數據以滿足存儲長期的歷史數據的需求。比如說把按秒采集的指標轉換成按分鍾采集的指標,以減少數據量,進行長期存儲。 - Graphite-web Graphite web是基於Django實現的webapp,其主要功能自然是繪制報表與展示。但界面真的不好看(很少使用)
- Carbon (監控數據的 Twisted 守護進程)
- Statsd statsd 其實就是一個監聽UDP(默認)或者TCP的守護程序,根據簡單的協議收集statsd客戶端發送來的數據,聚合之后,定時推送給后端, 我們這里指的就是 graphite。 statsd 有多種語言的客戶端實現,這里我們使用 python實現的 statsd 來采集 celery 指標。
下圖是我們 celery 監控的架構
搭建
-
celery hook 下面示例代碼,使用 statsd 接收 celery hook設置的指標並發送到 graphite
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import time import statsd from celery.signals import ( task_failure, task_postrun, task_prerun, task_success, ) from django.conf import settings statsd_conn = statsd.StatsClient( host=settings.GRAPHITE_HOST, port=8125, ) task_time = {} @task_prerun.connect def task_prerun_handler(task_id, task, *args, **kwargs): task_time[task_id] = time.time() statsd_conn.incr('{}.prerun'.format(task.name)) @task_postrun.connect def task_postrun_handler(task_id, task, *args, **kwargs): statsd_conn.incr('{}.postrun'.format(task.name)) delta_seconds = time.time() - task_time[task_id] # 任務執行時間 statsd_conn.timing('{}.runtime'.format(task.name), int(1000 * delta_seconds)) @task_success.connect def task_success_handler(sender, result, **kwargs): # 成功任務數 statsd_conn.incr('{}.success'.format(sender.name)) @task_failure.connect def task_failure_handler(sender, result, **kwargs): # 失敗任務數 statsd_conn.incr('{}.failure'.format(sender.name))
-
graphite 部署
使用 docker 部署, Dockerfile 在這里1 2 3 4 5 6 7 8 9
docker run -d\ --name graphite\ --restart=always\ -p 8888:80\ -p 2003-2004:2003-2004\ -p 2023-2024:2023-2024\ -p 8125:8125/udp\ -p 8126:8126\ graphiteapp/graphite-statsd
-
使用 nginx 反向代理
nginx 配置就非常簡單了
如果以上步驟全部完成, 訪問 graphite的地址,應該已經能看到收集到的指標。
但就像我們說的還必須有數據可視化,所以下一步就是使用 Grafana 從 Graphite 獲取指標並在 dashboard 上展示
- Grafana 配置
-
添加 Graphite 數據源
-
dashoboard 配置 這里 dashboard 的配置我們不具體展開,說明一些比較重要的步驟。我們的異步任務目錄層級比較多,為了更好的篩選必須配置各層的 Variables
如下圖所示 -
指標配置
這里指標的配置比較多樣,可以有各種aggregate,比如 min, sum, max, mean, mean90
下面展示一張示例的效果圖
-