一、KafkaOffsetMonitor簡述
KafkaOffsetMonitor是Kafka的一款客戶端消費監控工具,用來實時監控Kafka服務的Consumer以及它們所在的Partition中的Offset,我們可以瀏覽當前的消費者組,並且每個Topic的所有Partition的消費情況都可以一目了然。
二、KafkaOffsetMonitor下載
KafkaOffsetMonitor托管在Github上,可以通過Github下載。
下載地址:https://github.com/quantifind/KafkaOffsetMonitor/releases
或者下載百度網盤:鏈接:https://pan.baidu.com/s/1geEBEvT 密碼:jaeu

三、KafkaOffsetMonitor啟動
將下載下來的KafkaOffsetMonitor jar包上傳到linux上,可以新建一個目錄KafkaMonitor,用於存放KafkaOffsetMonitor-assembly-0.2.0.jar進入到KafkaMonitor目錄下,通過java編譯命令來運行這個jar包:
[root@kafka50 KafkaMonitor]# java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 --port 8088 --refresh 5.seconds --retain 1.days 按回車后,可以看到控制台輸出: serving resources from: jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 2018-01-05 21:17:36.267:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 2018-01-05 21:17:36.630:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp} 2018-01-05 21:17:36.662:INFO:oejs.AbstractConnector:Started SocketConnector@0.0.0.0:8088
如果沒有指定端口,則默認會開啟一個隨機端口。
參數說明:
zk :zookeeper主機地址,如果有多個,用逗號隔開
port :應用程序端口
refresh :應用程序在數據庫中刷新和存儲點的頻率
retain :在db中保留多長時間
dbName :保存的數據庫文件名,默認為offsetapp
為了更方便的啟動KafkaOffsetMonitor,可以寫一個啟動腳本來直接運行,我這里新建一個名為:kafka-monitor-start.sh的腳本,然后編輯這個腳本:
[root@kafka50 KafkaMonitor]# vim kafka-monitor-start.sh java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --port 8088 \ --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 \ --refresh 5.minutes \ --retain 1.day >/dev/null 2>&1;
然后退出保存即可,接下來修改一下kafka-monitor-start.sh的權限
[root@kafka50 KafkaMonitor]# chmod +x kafka-monitor-start.sh
啟動KafkaOffsetMonitor:
[root@kafka50 KafkaMonitor]# nohup /data/KafkaMonitor/kafka-monitor-start.sh & [1] 6551 [root@kafka50 KafkaMonitor]# lsof -i:8088 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 6552 root 16u IPv6 26047 0t0 TCP *:radan-http (LISTEN)
四、KafkaOffsetMonitor Web UI
在游覽器中輸入:http://ip:port即可以查看KafkaOffsetMonitor Web UI,如下圖:

在下圖中有一個Visualizations選項卡,點擊其中的Cluster Overview可以查看當前Kafka集群的Broker情況

五、簡單的Producer
1、新建一個Topic
首先為本次試驗新建一個Topic,命令如下:
bin/kafka-topics.sh \ --create \ --zookeeper 10.0.0.50:12181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-simpleproducer
2、新建SimpleProducer代碼
在上一篇文章中提到的Producer封裝Github代碼的基礎上,寫了一個往kafkamonitor-simpleproducer發送message的java代碼。
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl; import com.ckm.kafka.producer.inter.KafkaProducerTool; /** * Created by ckm on 2016/8/30. */
public class SimpleProducer { public static void main(String[] args) { KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); int i = 0; String message = ""; while (true) { message = "test-simple-producer : " + i ++; kafkaProducerTool.publishMessage("kafkamonitor-simpleproducer", message); } } }
程序運行效果:

3、ConsoleConsumer消費該topic
用kafka自帶的ConsoleConsumer消費kafkamonitor-simpleproducer中的message。
bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-simpleproducer
消費截圖如下:

4、KafkaOffsetMonitor頁面
(1)在Topic List選項卡中,我們可以看到剛才新建的kafkamonitor-simpleproducer

(2)點開后,能看到有一個console-consumer正在消費該topic

(3)繼續進入該Consumer,可以查看該Consumer當前的消費狀況

這張圖片的左上角顯示了當前Topic的生產速率,右上角顯示了當前Consumer的消費速率。
圖片中還有三種顏色的線條,藍色的表示當前Topic中的Message數目,灰色的表示當前Consumer消費的offset位置,紅色的表示藍色灰色的差值,即當前Consumer滯后於Producer的message數目。
(4)看一眼各partition中的message消費情況

從上圖可以看到,當前有3個Partition,每個Partition中的message數目分布很不均勻。這里可以與接下來的自定義Producer的情況進行一個對比。
六、自定義Partitioner的Producer
1、新建一個Topic
bin/kafka-topics.sh \ --create \ --zookeeper 10.0.0.50:12181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-partitionedproducer
2、Partitioner代碼
邏輯很簡單,循環依次往各Partition中發送message。
import kafka.producer.Partitioner; /** * Created by ckm on 2018/1/8. */
public class TestPartitioner implements Partitioner { public TestPartitioner() { } @Override public int partition(Object key, int numPartitions) { int intKey = (int) key; return intKey % numPartitions; } }
3、Producer代碼
將自定義的Partitioner設置到Producer,其他調用過程和二中類似。
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl; import com.ckm.kafka.producer.inter.KafkaProducerTool; /** * Created by ckm on 2016/8/30. */
public class PartitionedProducer { public static void main(String[] args) { KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); kafkaProducerTool.getProducerProperties().put("partitioner.class", "TestPartitioner"); int i = 0; String message = ""; while (true) { message = "test-partitioner-producer : " + i; System.out.println(message); kafkaProducerTool.publishPartitionedMessage("kafkamonitor-partitionedproducer", i + "", message); i ++; } } }
代碼運行效果如下圖:

4、ConsoleConsumer消費Message
bin/kafka-console-consumer.sh --zookeeper 10.0.0.50:12181 --from-beginning --topic kafkamonitor-partitionedproducer
消費效果如下圖:

5、KafkaOffsetMonitor頁面
其他頁面與上面的類似,這里只觀察一下每個partition中的message數目與第二節中的對比。可以看到這里每個Partition中message分別是很均勻的。

注意事項:
注意這里有一個坑,默認情況下Producer往一個不存在的Topic發送message時會自動創建這個Topic。由於在這個封裝中,有同時傳遞message和topic的情況,如果調用方法時傳入的參數反了,將會在Kafka集群中自動創建Topic。在正常情況下,應該是先把Topic根據需要創建好,然后Producer往該Topic發送Message,最好把Kafka這個默認自動創建Topic的功能關掉。
那么,假設真的不小心創建了多余的Topic,在刪除時,會出現“marked for deletion”提示,只是將該topic標記為刪除,使用list命令仍然能看到。如果需要調整這兩個功能的話,在server.properties中配置如下兩個參數:
| 參數 | 默認值 | 作用 |
|---|---|---|
| auto.create.topics.enable | true | Enable auto creation of topic on the server |
| delete.topic.enable | false | Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off |
七,KafkaOffsetMonitor 總結
KafkaOffsetMonitor:程序一個jar包的形式運行,部署較為方便。只有監控功能,使用起來也較為安全。
除了KafkaOffsetMonitor,Kafka監控工具還有另外兩款:
Kafka Web Console:監控功能較為全面,可以預覽消息,監控Offset、Lag等信息,但存在bug,不建議在生產環境中使用。
Kafka Manager:偏向Kafka集群管理,若操作不當,容易導致集群出現故障。對Kafka實時生產和消費消息是通過JMX實現的。沒有記錄Offset、Lag等信息。
