1)創建Topic
kafka-topics.sh --create --bootstrap-server hadoop102:9092 --topic second --partitions 2 --replication-factor 3
kafka-topics.sh --create --zookeeper hadoop102:2181 --topic second --partitions 2 --replication-factor 3
--create 要進行的操作,這里表示創建
--bootstrap-server 把broker注冊信息存儲到kafka集群
--zookeeper 把broker注冊信息存儲到zookeeper集群
--topic 給主題一個名字
--partitions 設置主題分區數
--replication-factor 副本存儲數量,不能大於kafka集群數量
2)查看所有的Topic
kafka-topics.sh --list --bootstrap-server hadoop102:9092
3)查看topic的詳細信息
kafka-topics.sh --describe --bootstrap-server hadoop102:9092 --topic second
#功能一樣,下面的命令更加詳細
kafka-topics.sh --describe --bootstrap-server hadoop102:9092
#ist屬性表示節點數
4)修改topic
#把分區數量調大
kafka-topics.sh --alter --bootstrap-server hadoop102:9092 --topic first --partitions 2
5)刪除topic
當kafka是舊版本時,需要先更改server.propertites里的文件,再刪除
delete.topic.enable=true
kafka-topics.sh --delete --bootstrap-server hadoop102:9092 --topic first
6)生產者
kafka-console-producer.sh --topic second --broker-list hadoop102:9092
--topic 指定生產的數據流向的topic
--broker-list 指定生產者的數據流向的broker
7)消費者
kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092
--topic 執行要消費的分區
--boostrap-server 要消費的broker
#上面的寫法只能檢測當前消費者啟動后的數據,如果想檢測所有數據,使用如下寫法
kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092 --from-beginning
8)疑問:為什么啟動一個新的消費者消費不到topic中已有的消息?
#下面寫法會讀取所有分區的數據,所以數據顯示的順序有可能會變
kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092 --from-beginning
9)消費者組:
1)啟動消費者指定配置文件,讀取配置文件中的group.id配置的組名
這里的配置文件,指的是kafka-2.4.1/config/consumer.properties
進入配置文件后,可以通過group id配置組名
2)啟動消費者時通過--group來指定組名
kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092 --group testgroup
10)從目錄的角度看topic
1)通過存儲的角度觀察
offset文件存儲在每台kafka節點的
datas目錄下,
格式為__consumer_offsets-xxx