1、查看kafka的topic
kafka-topics.sh --zookeeper localhost:2181 --list
2、創建topic
kafka-topics.sh --zookeeper localhost:2181 -create --topic topic1--partitions 1 --replication-factor 1
3、生產數據
kafka-console-producer.sh --broker-list localhost:9092 --topic topic1
向topic1發送一條包含key的消息:
echo '00000,{"name":"Steve", "title":"Captain America"}' | kafka-console-producer.sh --broker-list localhost:9092 --topic topic1 --property parse.key=true --property key.separator=,
4、消費數據
kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic1 --from-beginning 或者 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic1 --from-beginning
其中 --from-beginning 表示從頭開始消費。
將消息的key也輸出:
kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic1 --from-beginning --property print.key=true --property key.separator=,
5、消費記錄offest的topic
消費__consumer_offsets
中的內容,首先需要在consumer.properties
中配置exclude.internal.topics=false
,並執行下面命令:
kafka-console-consumer.sh --zookeeper localhost:2181 \ --topic __consumer_offsets \ --from-beginning \ --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \ --consumer.config ~/consumer.properties
6、topic的offest統計
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topic1 --time -1
其中-1表示顯示獲取當前offset最大值,-2表示offset的最小值
7、查看指定partition的數據
kafka-simple-consumer-shell.sh --broker-list localhost:9092 --topic topic1 --partition 1 --print-offsets --offset 18 --clientId test --property print.key=true
主要是用於遇到數據傾斜的情況,可以通過kafka-simple-consumer-shell.sh
查看具體某個partition數據內容
8、查看topic的詳細信息
kafka-topics.sh -zookeeper localhost:2181 -describe -topic topic1