kafka監控之topic的lag情況監控


需求描述:lag(滯后)是kafka消費隊列性能監控的重要指標,lag的值越大,表示kafka的堆積越嚴重。本篇文章將使用python腳本+influxdb+grafana的方式對kafka的offset、logsiz和lag這三個參數進行監控,並以圖形化的方式進行展現。

 架構描述:使用python收集kafka的相關信息並存儲到influxdb里;配置grafana,將influxdb里的數據以圖形化的方式展現出來。

一,准備工作

1,kafka,influxdb,grafana的安裝(在此不詳細描述,默認為閱讀文章的各位對這三樣工具的使用是熟悉的)

2,查詢kafka消費狀態的命令/kafka_2.11-0.10.1.0/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group1 --topic topicname1 --zookeeper zoo1:2181,zoo2:2181,zoo3:2181,zoo4:2181,zoo5:2181。本篇文章也將以此條命令輸出的信息作為基礎編寫腳本。

#/kafka_2.11-0.10.1.0/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group1 --topic topicname1 --zookeeper zoo1:2181,zoo2:2181,zoo3:2181,zoo4:2181,zoo5:2181
Group Topic Pid Offset logSize Lag Owner
group1 topicname1 0 978337806 978390228 52422 none
group1 topicname1 1 978337840 978390295 52455 none
group1 topicname1 2 978263557 978316052 52495 none
group1 topicname1 3 978307075 978359597 52522 none
group1 topicname1 4 978337803 978390358 52555 none
group1 topicname1 5 978337812 978390394 52582 none
說明:
group1 組名
topicname1 topic名

我們要用腳本取的,就是輸出的這段內容的Offset logSize Lag這三個值,並將所有分片的這些值相加,從而獲取單個topic的Offset logSize Lag的值,並將值輸出到一個txt文件暫存。我這里使用一個shell腳本來取數據和一個python腳本來講數據存儲到influxdb中的方式來實現。

二,編寫腳本提取Offset logSize Lag這三個值
1,給腳本創建一個獨立的目錄,里面會存放腳本和臨時文件。
mkdir /usr/monitor
cd /usr/monitor
mkdir tmp

2,vim topic-collect.sh
#!/bin/bash
#txt文件命名規則:組-topic名字-檢查項名字
source /etc/profile
/kafka_2.11-0.10.1.0/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group1 --topic topicname1 --zookeeper zoo1:2181,zoo2:2181,zoo3:2181,zoo4:2181,zoo5:2181  | awk '{print $4}' | grep -v Offset |  awk '{sum+=$1}END{print sum}'  > /usr/monitor/tmp/topic-group1-topicname1-Offset.txt
/kafka_2.11-0.10.1.0/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group1 --topic topicname1 --zookeeper zoo1:2181,zoo2:2181,zoo3:2181,zoo4:2181,zoo5:2181  | awk '{print $5}' | grep -v logSize |  awk '{sum+=$1}END{print sum}'  > /usr/monitor/tmp/topic-group1-topicname1-logSize.txt
/kafka_2.11-0.10.1.0/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group1 --topic topicname1 --zookeeper zoo1:2181,zoo2:2181,zoo3:2181,zoo4:2181,zoo5:2181  | awk '{print $6}' | grep -v Lag |  awk '{sum+=$1}END{print sum}'  > /usr/monitor/tmp/topic-group1-topicname1-Lag.txt

其中txt是用來存儲計算各分片之和的值的文件。對TXT文件名進行規范化管理會讓后期增加監控十分方便清晰。

 

3,vim kafka-lag-collect.py #這是一個python寫的腳本,用來將數據存儲到influxdb中,在此之前在influxdb中建立對應的庫,在這里用到的庫的名稱是elkDB

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import time
import urllib2
import urllib
import json

#Read the file
f = open('/usr/monitor/tmp/topic-group1-topicname1-Offset.txt')
Offset_sum = f.read()
f.close()
f = open('/usr/monitor/tmp/topic-group1-topicname1-logSize.txt')
logSize_sum = f.read()
f.close()
f = open('/usr/monitor/tmp/topic-group1-topicname1-Lag.txt')
Lag_sum = f.read()
f.close()



dbreqdata = "group1,topic=topicname1,type=Offset value="+str(Offset_sum)+\
        "\ngroup1,topic=topicname1,type=logSize value="+str(logSize_sum)+\
        "\ngroup1,topic=topicname1,type=Lag value="+str(Lag_sum)
print dbreqdata
dbrequrl = "http://127.0.0.1:8086/write?db=elkDB"
dbreq= urllib2.Request(url = dbrequrl,data =dbreqdata)
print dbreq
urllib2.urlopen(dbreq)

  4,腳本寫完后給腳本增加一下可執行權限

chmod +x kafka-lag-collect.py

chmod +x  topic-collect.sh

5,試着執行一下topic-collect.sh看能否執行成功

./topic-collect.sh

如果能執行成功的話,可以看到/usr/monitor/tmp/topic-group1-topicname1-Offset.txt里面已經有計算出來的offset總和了

6,試着執行一下kafka-lag-collect.py看能否執行成功

./kafka-lag-collect.py

如果能執行成功的話,就可以在influxdb里看到新建的表和相關數據了。

7,讓topic-collect.sh腳本調用kafka-lag-collect.py腳本,這樣可以避免添加兩條crontab定時任務

echo "/usr/monitor/kafka-lag-collect.py" >> topic-collect.sh

8,添加定時任務,讓腳本可以每分鍾收集一次信息到influxdb

crontab -e 

* * * * * /usr/monitor/topic-collect.sh

 

三,配置grafana展現數據

1,配置grafana數據源

 

 2,新建圖表

 

至此,就可以在grafana上看到監控的lag狀態了。
 




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM