Topic相關
一、啟動
# 1 加守護進程啟動
bin/kafka-server-start.sh -daemon config/server.properties
# 2 通過后台來啟動
nohup bin/kafka-server-start.sh config/server.properties &
二、查看當前kafka集群中Topic名字
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
三、查看Topic詳情:分區和副本等情況
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic topicname
# 結果:Topic:topicname PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test0 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 1,0,2
Topic: test0 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
Topic: test0 Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 1,0,2
四、創建Topic
bin/kafka-topics.sh --create --topic test0--zookeeper 127.0.0.1:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1
--topic后面的test0是topic的名稱
--zookeeper應該和server.properties文件中的zookeeper.connect一樣
--config指定當前topic上有效的參數值
--partitions指定topic的partition數量,如果不指定該數量,默認是server.properties文件中的num.partitions配置值
--replication-factor指定每個partition的副本個數,默認1個
五、刪除Topic
bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic topicname
刪除kafka中該topic相關的目錄
重啟kafka
六、查看topic消費到的offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic test0 --time -1
# 運行結果
test0:0:177496
test0:1:61414
七、修改topic的partition數量(只能增加不能減少)
通過kafka-topics.sh工具的alter命令,將replicated-topic的partitions從1增加到3
bin/kafka-topics.sh --zookeeter localhost:2181 --alter --partitions 3 --topic replicated-topic
消費者指令
- 消費信息
# 從 latest 位移位置開始消費該主題的所有分區消息,即僅消費正在寫入的消息。
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic topicName
- 從開始位置消費
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --from-beginning --topic topicName
- 顯示key消費,打印出消息體的 key 和 value
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --property print.key=true --topic topicName