zabbix監控kafka集群消息隊列情況


通過zabbix監控kafka運行時的主題、分區、偏移量以及消息的堆積消費等。

 

一、配置zabbix

配置定義zabbix的監控項,其中192.168.2.3:8088是kafkaOffsetMonitor服務的IP和端口

[root@prod-datakafka zabbix_agentd.d]# cat userparameter_kafkaconsumer.conf
UserParameter=kafka.discover,/usr/bin/python /etc/zabbix/externalscripts/discover_kafka.py 192.168.2.3:8088 logstash
UserParameter=kafka.info[*],/usr/bin/python /etc/zabbix/externalscripts/kafka.py 192.168.2.3:8088 logstash $1 $2
 
# discover_kafka.py

#
!/usr/bin/env python #-*- coding:utf-8 -*- import json import sys import os def kafkaConsumerDiscovery(): # kafka消費者自動發現 import threading zabbixDiscovery = {"data": []} tp = [] url = sys.argv[1] # try: group = sys.argv[2] # except: # ifInfo = False # 處理函數 # def action(url, group): try: info = json.loads(os.popen("/usr/bin/curl -s '%s/group/%s'" % (url, group)).read().strip())['offsets'] # print(info) for partition in info: # 沒有訂閱則跳過 if len(partition) <= 7: continue if True: zabbixDiscovery['data'].append( #{"{#URL}": url, "{#GROUP}": group, "{#TOPIC}": str(partition['topic']), "{#OWNER}": str(partition['owner'])}) {"{#TOPIC}": str(partition['topic']), "{#OWNER}": str(partition['owner'])}) else: if partition['partition'] == 0: zabbixDiscovery['data'].append( {"{#URL}": url, "{#GROUP}": group, "{#TOPIC}": str(partition['topic'])}) except: pass #action(url,ifInfo) #print(zabbix) # 創建線程池 # for group1 in json.loads(os.popen("/usr/bin/curl -s '%s/group'" % url).read().strip()): # group = str(group1) # t = threading.Thread(target=action, args=(url, group)) # t.setDaemon(True) # tp.append(t) # 控制並發數量 # while tp: # tp.pop().start() # if threading.activeCount() > 10: # while threading.activeCount() > 10: # time.sleep(1) # elif threading.activeCount() == 0: # break # else: # pass print(json.dumps(zabbixDiscovery)) return json.dumps(zabbixDiscovery) kafkaConsumerDiscovery()

 

# kafka.py

#
!/usr/bin/env python #-*- coding:utf-8 -*- import sys import os import json import time def kafkaLag(): # 收集kafka消息信息 url = sys.argv[1] group = sys.argv[2] topic = sys.argv[3] type = sys.argv[4] count = 0 info = json.loads(os.popen("/usr/bin/curl -s '%s/group/%s'" % (url, group)).read().strip())['offsets'] for partition in info: if partition['topic'] == topic: if type == 'lag': count = partition['logSize'] - partition['offset'] + count else: count += partition[type] print count def kafkaLastSeen(): # 收集kafka消息最后被消費至今的時間差 url = sys.argv[1] group = sys.argv[2] topic = sys.argv[3] # owner = sys.argv[4] difftime = 0 info = json.loads(os.popen("/usr/bin/curl -s '%s/group/%s'" % (url, group)).read().strip())['offsets'] for partition in info: #if partition['topic'] == topic and partition['owner'] == owner: if partition['topic'] == topic: if partition['logSize'] - partition['offset'] > 0: difftime = time.time() - float(partition['modified']) / 1000 break print float(difftime) def kafkaConsumerDiscovery(): # kafka消費者自動發現 import threading zabbixDiscovery = {"data": []} tp = [] url = sys.argv[1] try: ifInfo = sys.argv[2] except: ifInfo = False # 處理函數 def action(url, group): try: info = json.loads(os.popen("/usr/bin/curl -s '%s/group/%s'" % (url, group)).read().strip())['offsets'] # print(info) for partition in info: # 沒有訂閱則跳過 if len(partition) <= 7: continue if ifInfo: zabbixDiscovery['data'].append( {"{#URL}": url, "{#GROUP}": group, "{#TOPIC}": str(partition['topic']), "{#OWNER}": str(partition['owner'])}) else: if partition['partition'] == 0: zabbixDiscovery['data'].append( {"{#URL}": url, "{#GROUP}": group, "{#TOPIC}": str(partition['topic'])}) except: pass action(url,ifInfo) # 創建線程池 # for group1 in json.loads(os.popen("/usr/bin/curl -s '%s/group'" % url).read().strip()): # group = str(group1) # t = threading.Thread(target=action, args=(url, group)) # t.setDaemon(True) # tp.append(t) # 控制並發數量 # while tp: # tp.pop().start() # if threading.activeCount() > 10: # while threading.activeCount() > 10: # time.sleep(1) # elif threading.activeCount() == 0: # break # else: # pass print json.dumps(zabbixDiscovery) def kafkaStatus(): # kafka集群主機的健康狀態 ip_list = [] url = sys.argv[1] group = sys.argv[2] ip = sys.argv[3] info = json.loads(os.popen("/usr/bin/curl -s '%s/group/%s'" % (url, group)).read().strip())['brokers'] for i in info: ip_list.append(i['host']) if ip in ip_list: print 1 else: print 0 #count = len(info) # print count def kafkaCount(): url = sys.argv[1] group = sys.argv[2] info = json.loads(os.popen("/usr/bin/curl -s '%s/group/%s'" % (url,group)).read().strip())['brokers'] #print(info) count = len(info) print (count) if __name__ == '__main__': if sys.argv[3] == "count": kafkaCount() elif sys.argv[4] == "number": kafkaLastSeen() elif sys.argv[4] == "status": kafkaStatus() else: kafkaLag() # kafkaLastSeen()

 

二、安裝kafkaOffsetMonitor監控服務

下載地址:鏈接:https://pan.baidu.com/s/1C-cpX8K8ZN-WLp48kwMBgw 提取碼:8l0e

把下載的jar包放到指定目錄下,使用如下腳本啟動即可

kafka_monitor_start.sh
#!/bin/bash
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 172.16.16.12:2181,172.16.16.13:2181,172.16.16.14:2181 --port 8088 --refresh 5.seconds --retain 1.days &

上面三個zk地址是kafka的三台集群節點,即指定你需要監控的kafka節點的ip

 

kafka到上面就都配置完成了,接下來只要在zabbix上配置好自動發現就行,這部分比較基本,自行配置即可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM