Kafka監控工具KafkaOffsetMonitor配置及使用


 一、KafkaOffsetMonitor簡述

KafkaOffsetMonitor是Kafka的一款客戶端消費監控工具,用來實時監控Kafka服務的Consumer以及它們所在的Partition中的Offset,我們可以瀏覽當前的消費者組,並且每個Topic的所有Partition的消費情況都可以一目了然。

二、KafkaOffsetMonitor下載

KafkaOffsetMonitor托管在Github上,可以通過Github下載。
下載地址:https://github.com/quantifind/KafkaOffsetMonitor/releases

或者下載百度網盤:鏈接:https://pan.baidu.com/s/1geEBEvT 密碼:jaeu

 

三、KafkaOffsetMonitor啟動

將下載下來的KafkaOffsetMonitor jar包上傳到linux上,可以新建一個目錄KafkaMonitor,用於存放KafkaOffsetMonitor-assembly-0.2.0.jar進入到KafkaMonitor目錄下,通過java編譯命令來運行這個jar包:

[root@kafka50 KafkaMonitor]# java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 --port 8088 --refresh 5.seconds --retain 1.days 按回車后,可以看到控制台輸出:

serving resources from: jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2018-01-05 21:17:36.267:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2018-01-05 21:17:36.630:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp}
2018-01-05 21:17:36.662:INFO:oejs.AbstractConnector:Started SocketConnector@0.0.0.0:8088

如果沒有指定端口,則默認會開啟一個隨機端口。

參數說明:

zk :zookeeper主機地址,如果有多個,用逗號隔開
port :應用程序端口
refresh :應用程序在數據庫中刷新和存儲點的頻率
retain :在db中保留多長時間
dbName :保存的數據庫文件名,默認為offsetapp

為了更方便的啟動KafkaOffsetMonitor,可以寫一個啟動腳本來直接運行,我這里新建一個名為:kafka-monitor-start.sh的腳本,然后編輯這個腳本:

[root@kafka50 KafkaMonitor]# vim kafka-monitor-start.sh java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --port 8088 \ --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 \ --refresh 5.minutes \ --retain 1.day >/dev/null 2>&1;

然后退出保存即可,接下來修改一下kafka-monitor-start.sh的權限

[root@kafka50 KafkaMonitor]# chmod +x kafka-monitor-start.sh 

啟動KafkaOffsetMonitor:

[root@kafka50 KafkaMonitor]# nohup /data/KafkaMonitor/kafka-monitor-start.sh & [1] 6551 [root@kafka50 KafkaMonitor]# lsof -i:8088 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 6552 root   16u  IPv6  26047      0t0  TCP *:radan-http (LISTEN)

四、KafkaOffsetMonitor Web UI

在游覽器中輸入:http://ip:port即可以查看KafkaOffsetMonitor Web UI,如下圖:

 

在下圖中有一個Visualizations選項卡,點擊其中的Cluster Overview可以查看當前Kafka集群的Broker情況

 

五、簡單的Producer

1、新建一個Topic

  首先為本次試驗新建一個Topic,命令如下:

bin/kafka-topics.sh \ --create \ --zookeeper 10.0.0.50:12181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-simpleproducer

 

2、新建SimpleProducer代碼

  在上一篇文章中提到的Producer封裝Github代碼的基礎上,寫了一個往kafkamonitor-simpleproducer發送message的java代碼。

import com.ckm.kafka.producer.impl.KafkaProducerToolImpl; import com.ckm.kafka.producer.inter.KafkaProducerTool; /** * Created by ckm on 2016/8/30. */
public class SimpleProducer { public static void main(String[] args) { KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); int i = 0; String message = ""; while (true) { message = "test-simple-producer : " + i ++; kafkaProducerTool.publishMessage("kafkamonitor-simpleproducer", message); } } }

  程序運行效果: 
  這里寫圖片描述

3、ConsoleConsumer消費該topic

  用kafka自帶的ConsoleConsumer消費kafkamonitor-simpleproducer中的message。

bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-simpleproducer

  消費截圖如下: 
  這里寫圖片描述

4、KafkaOffsetMonitor頁面

(1)在Topic List選項卡中,我們可以看到剛才新建的kafkamonitor-simpleproducer 
  這里寫圖片描述
(2)點開后,能看到有一個console-consumer正在消費該topic 
  這里寫圖片描述
(3)繼續進入該Consumer,可以查看該Consumer當前的消費狀況 
  這里寫圖片描述
  這張圖片的左上角顯示了當前Topic的生產速率,右上角顯示了當前Consumer的消費速率。 
  圖片中還有三種顏色的線條,藍色的表示當前Topic中的Message數目,灰色的表示當前Consumer消費的offset位置,紅色的表示藍色灰色的差值,即當前Consumer滯后於Producer的message數目。 
(4)看一眼各partition中的message消費情況 
  這里寫圖片描述
  從上圖可以看到,當前有3個Partition,每個Partition中的message數目分布很不均勻。這里可以與接下來的自定義Producer的情況進行一個對比。

六、自定義Partitioner的Producer

1、新建一個Topic

bin/kafka-topics.sh \ --create \ --zookeeper 10.0.0.50:12181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-partitionedproducer

2、Partitioner代碼

  邏輯很簡單,循環依次往各Partition中發送message。

import kafka.producer.Partitioner; /** * Created by ckm on 2018/1/8. */
public class TestPartitioner implements Partitioner { public TestPartitioner() { } @Override public int partition(Object key, int numPartitions) { int intKey = (int) key; return intKey % numPartitions; } }

3、Producer代碼

  將自定義的Partitioner設置到Producer,其他調用過程和二中類似。

import com.ckm.kafka.producer.impl.KafkaProducerToolImpl; import com.ckm.kafka.producer.inter.KafkaProducerTool; /** * Created by ckm on 2016/8/30. */
public class PartitionedProducer { public static void main(String[] args) { KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); kafkaProducerTool.getProducerProperties().put("partitioner.class", "TestPartitioner"); int i = 0; String message = ""; while (true) { message = "test-partitioner-producer : " + i; System.out.println(message); kafkaProducerTool.publishPartitionedMessage("kafkamonitor-partitionedproducer", i + "", message); i ++; } } }

  代碼運行效果如下圖: 
  這里寫圖片描述

4、ConsoleConsumer消費Message

bin/kafka-console-consumer.sh --zookeeper 10.0.0.50:12181 --from-beginning --topic kafkamonitor-partitionedproducer

 

  消費效果如下圖: 
  這里寫圖片描述

5、KafkaOffsetMonitor頁面

  其他頁面與上面的類似,這里只觀察一下每個partition中的message數目與第二節中的對比。可以看到這里每個Partition中message分別是很均勻的。 
  這里寫圖片描述

注意事項: 
  注意這里有一個坑,默認情況下Producer往一個不存在的Topic發送message時會自動創建這個Topic。由於在這個封裝中,有同時傳遞message和topic的情況,如果調用方法時傳入的參數反了,將會在Kafka集群中自動創建Topic。在正常情況下,應該是先把Topic根據需要創建好,然后Producer往該Topic發送Message,最好把Kafka這個默認自動創建Topic的功能關掉。 
  那么,假設真的不小心創建了多余的Topic,在刪除時,會出現“marked for deletion”提示,只是將該topic標記為刪除,使用list命令仍然能看到。如果需要調整這兩個功能的話,在server.properties中配置如下兩個參數:

 

參數 默認值 作用
auto.create.topics.enable true Enable auto creation of topic on the server
delete.topic.enable false Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off

 

七,KafkaOffsetMonitor 總結

KafkaOffsetMonitor:程序一個jar包的形式運行,部署較為方便。只有監控功能,使用起來也較為安全。
除了KafkaOffsetMonitor,Kafka監控工具還有另外兩款:
Kafka Web Console:監控功能較為全面,可以預覽消息,監控Offset、Lag等信息,但存在bug,不建議在生產環境中使用。
Kafka Manager:偏向Kafka集群管理,若操作不當,容易導致集群出現故障。對Kafka實時生產和消費消息是通過JMX實現的。沒有記錄Offset、Lag等信息。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM