kafka命令行操作


1)創建Topic

kafka-topics.sh  --create  --bootstrap-server hadoop102:9092 --topic second --partitions 2 --replication-factor 3

kafka-topics.sh  --create  --zookeeper hadoop102:2181 --topic second --partitions 2 --replication-factor 3

--create 要進行的操作,這里表示創建
--bootstrap-server 把broker注冊信息存儲到kafka集群
--zookeeper	把broker注冊信息存儲到zookeeper集群
--topic 給主題一個名字
--partitions 設置主題分區數
--replication-factor 副本存儲數量,不能大於kafka集群數量

2)查看所有的Topic

kafka-topics.sh --list --bootstrap-server hadoop102:9092

3)查看topic的詳細信息

kafka-topics.sh --describe --bootstrap-server hadoop102:9092 --topic second
#功能一樣,下面的命令更加詳細
kafka-topics.sh --describe --bootstrap-server hadoop102:9092
#ist屬性表示節點數

4)修改topic

#把分區數量調大
kafka-topics.sh --alter --bootstrap-server hadoop102:9092 --topic first --partitions 2

5)刪除topic

當kafka是舊版本時,需要先更改server.propertites里的文件,再刪除

delete.topic.enable=true
kafka-topics.sh --delete --bootstrap-server hadoop102:9092 --topic first

6)生產者

kafka-console-producer.sh --topic second  --broker-list hadoop102:9092
--topic 指定生產的數據流向的topic
--broker-list 指定生產者的數據流向的broker

7)消費者

kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092
--topic 執行要消費的分區
--boostrap-server 要消費的broker
#上面的寫法只能檢測當前消費者啟動后的數據,如果想檢測所有數據,使用如下寫法
kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092 --from-beginning 

8)疑問:為什么啟動一個新的消費者消費不到topic中已有的消息?

#下面寫法會讀取所有分區的數據,所以數據顯示的順序有可能會變
kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092 --from-beginning 

9)消費者組:

1)啟動消費者指定配置文件,讀取配置文件中的group.id配置的組名

這里的配置文件,指的是kafka-2.4.1/config/consumer.properties

進入配置文件后,可以通過group id配置組名

2)啟動消費者時通過--group來指定組名
​ kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092 --group testgroup

10)從目錄的角度看topic

1)通過存儲的角度觀察
offset文件存儲在每台kafka節點的
datas目錄下,
格式為__consumer_offsets-xxx


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM