KafkaOffsetMonitor是一個可以用於監控Kafka的Topic及Consumer消費狀況的工具,其配置和使用特別的方便。源項目Github地址為:https://github.com/quantifind/KafkaOffsetMonitor。
最簡單的使用方式是從Github上下載一個最新的KafkaOffsetMonitor-assembly-0.2.1.jar,上傳到某服務器上,然后執行一句命令就可以運行起來。但是在使用過程中有可能會發現頁面反應緩慢或者無法顯示相應內容的情況。據說這是由於jar包中的某些js等文件需要連接到網絡,或者需要翻牆導致的。網上找的一個修改版的KafkaOffsetMonitor對應jar包,可以完全在本地運行,經過測試效果不錯。下載地址是:http://pan.baidu.com/s/1ntzIUPN,在此感謝一下貢獻該修改版的原作者。鏈接失效的話,可以博客下方留言聯系我。
一、KafkaOffsetMonitor的使用
因為完全沒有安裝配置的過程,所以直接從KafkaOffsetMonitor的使用開始。
將KafkaOffsetMonitor-assembly-0.2.0.jar上傳到服務器后,可以新建一個腳本用於啟動該應用。腳本內容如下:
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk m000:2181,m001:2181,m002:2181 \ --port 8088 \ --refresh 10.seconds \ --retain 2.days
各參數的作用可以參考一下Github上的描述:
- offsetStorage valid options are ”zookeeper”, ”kafka” or ”storm”. Anything else falls back to ”zookeeper”
- zk the ZooKeeper hosts
- port on what port will the app be available
- refresh how often should the app refresh and store a point in the DB
- retain how long should points be kept in the DB
- dbName where to store the history (default ‘offsetapp’)
- kafkaOffsetForceFromStart only applies to ”kafka” format. Force KafkaOffsetMonitor to scan the commit messages from start (see notes below)
- stormZKOffsetBase only applies to ”storm” format. Change the offset storage base in zookeeper, default to ”/stormconsumers” (see notes below)
pluginsArgs additional arguments used by extensions (see below)
啟動后,訪問
m000:8088
端口,可以看到如下頁面:
在這個頁面上,可以看到當前Kafka集群中現有的Consumer Groups。在上圖中有一個Visualizations選項卡,點擊其中的Cluster Overview可以查看當前Kafka集群的Broker情況
接下來將繼續上一篇Kafka相關的文章Kafka系列之-自定義Producer,在最后對Producer進行包裝的基礎上,分別實現一個簡單的往隨機Partition寫messge,以及自定義Partitioner的Producer,對KafkaOffsetMonitor其他頁面進行展示。
二、簡單的Producer
1、新建一個Topic
首先為本次試驗新建一個Topic,命令如下:
bin/kafka-topics.sh \
--create \ --zookeeper m000:2181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-simpleproducer
2、新建SimpleProducer代碼
在上一篇文章中提到的Producer封裝Github代碼的基礎上,寫了一個往kafkamonitor-simpleproducer發送message的java代碼。
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl;
import com.ckm.kafka.producer.inter.KafkaProducerTool;
/** * Created by ckm on 2016/8/30. */
public class SimpleProducer {
public static void main(String[] args) {
KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
int i = 0;
String message = "";
while (true) {
message = "test-simple-producer : " + i ++;
kafkaProducerTool.publishMessage("kafkamonitor-simpleproducer", message);
}
}
}
程序運行效果:
3、ConsoleConsumer消費該topic
用kafka自帶的ConsoleConsumer消費kafkamonitor-simpleproducer中的message。
bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-simpleproducer
消費截圖如下:
4、KafkaOffsetMonitor頁面
(1)在Topic List選項卡中,我們可以看到剛才新建的kafkamonitor-simpleproducer
(2)點開后,能看到有一個console-consumer正在消費該topic
(3)繼續進入該Consumer,可以查看該Consumer當前的消費狀況
這張圖片的左上角顯示了當前Topic的生產速率,右上角顯示了當前Consumer的消費速率。
圖片中還有三種顏色的線條,藍色的表示當前Topic中的Message數目,灰色的表示當前Consumer消費的offset位置,紅色的表示藍色灰色的差值,即當前Consumer滯后於Producer的message數目。
(4)看一眼各partition中的message消費情況
從上圖可以看到,當前有3個Partition,每個Partition中的message數目分布很不均勻。這里可以與接下來的自定義Producer的情況進行一個對比。
三、自定義Partitioner的Producer
1、新建一個Topic
bin/kafka-topics.sh \
--create \ --zookeeper m000:2181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-partitionedproducer
2、Partitioner代碼
邏輯很簡單,循環依次往各Partition中發送message。
import kafka.producer.Partitioner;
/** * Created by ckm on 2016/8/30. */
public class TestPartitioner implements Partitioner {
public TestPartitioner() {
}
@Override
public int partition(Object key, int numPartitions) {
int intKey = (int) key;
return intKey % numPartitions;
}
}
3、Producer代碼
將自定義的Partitioner設置到Producer,其他調用過程和二中類似。
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl;
import com.ckm.kafka.producer.inter.KafkaProducerTool;
/** * Created by ckm on 2016/8/30. */
public class PartitionedProducer {
public static void main(String[] args) {
KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
kafkaProducerTool.getProducerProperties().put("partitioner.class", "TestPartitioner");
int i = 0;
String message = "";
while (true) {
message = "test-partitioner-producer : " + i;
System.out.println(message);
kafkaProducerTool.publishPartitionedMessage("kafkamonitor-partitionedproducer", i + "", message);
i ++;
}
}
}
代碼運行效果如下圖:
4、ConsoleConsumer消費Message
bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-partitionedproducer
消費效果如下圖:
5、KafkaOffsetMonitor頁面
其他頁面與上面的類似,這里只觀察一下每個partition中的message數目與第二節中的對比。可以看到這里每個Partition中message分別是很均勻的。
注意事項:
注意這里有一個坑,默認情況下Producer往一個不存在的Topic發送message時會自動創建這個Topic。由於在這個封裝中,有同時傳遞message和topic的情況,如果調用方法時傳入的參數反了,將會在Kafka集群中自動創建Topic。在正常情況下,應該是先把Topic根據需要創建好,然后Producer往該Topic發送Message,最好把Kafka這個默認自動創建Topic的功能關掉。
那么,假設真的不小心創建了多余的Topic,在刪除時,會出現“marked for deletion”提示,只是將該topic標記為刪除,使用list命令仍然能看到。如果需要調整這兩個功能的話,在server.properties中配置如下兩個參數:
參數 | 默認值 | 作用 |
---|---|---|
auto.create.topics.enable | true | Enable auto creation of topic on the server |
delete.topic.enable | false | Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off |