一、相關基礎內容
然后正常kafka的指令是 : ./bin/kafka-topics.sh --zookeeper hadoop300:2181 .......
但是使用CDH安裝的kafka則不需要全寫出此 ./bin/kafka-topics.sh 部分。只許直接寫 kafka-topics 即可,這是很重要的一個區別,使用CDH安裝的kafka時候要特別注意一下。
具體有哪些指令可以看此路徑下:
/opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/bin
二、topic主題使用
所以我們使用topic的指令格式應該都類似:
kafka-topics --zookeeper hadoop300:2181/kafka ......
- 創建一個名為test 的主題(Topic):
kafka-topics --zookeeper hadoop300:2181/kafka --create -replication-factor 1 --partitions 3 --topic test
- 若是上述中的 ZooKeeper Root 的Kafka服務范圍為: " / "。則這里的創建主題指令改為:
kafka-topics --zookeeper hadoop300:2181--create -replication-factor 1 --partitions 3 --topic test
- 刪除創建的topic
kafka-topics --zookeeper hadoop300:2181/kafka --delete --topic test
這里如果直接刪除,則會輸出 Topic *** is marked for deletion
如果我們topic中消息堆積的太多,或者kafka所在磁盤空間滿了等等,則會需要徹底清理一下kafka topic。
方法一:修改kafaka配置文件server.properties
, 添加 delete.topic.enable=true
,重啟kafka,之后通過kafka命令行就可以直接刪除topic。
方法二:通過命令行刪除topic: ./bin/kafka-topics.sh --delete --zookeeper {zookeeper server} --topic {topic name}
因為kafaka配置文件中server.properties沒有配delete.topic.enable=true
,此時的刪除並不是真正的刪除,只是把topic標記為:marked for deletion
你可以通過命令:./bin/kafka-topics --zookeeper {zookeeper server} --list
來查看所有topic
方法三:若需要真正刪除它
需要登錄zookeeper客戶端:zookeeper-client
找到topic所在的目錄:ls /kafka/brokers/topics
執行命令,即可,此時topic被徹底刪除:rmr /kafka/brokers/topics/{topic name}
- 修改topic的分區數
kafka-topics --zookeeper hadoop300:2181/kafka --alter --topic test \ partitions 5
- 接着啟動消費者
kafka-console-consumer --bootstrap-server hadoop300:9092 --topic test --from-beginning
- 查看消費數據后的偏移量 kafka-run-class
(1)查看每個Partition的最新偏移量 kafka-run-class kafka.tools.GetOffsetShell --broker-list hadoop:9092 --topic yourTopic --time -1 (2)查看每個Partition的最早的偏移量 kafka-run-class kafka.tools.GetOffsetShell --broker-list hadoop:9092 --topic yourTopic --time -2 (3)查看consumer組內消費的offset kafka-run-class kafka.tools.ConsumerOffsetChecker --zookeeper hadoop:2181 --topic yourTopic
- 獲取topic消費組的偏移量
kafka-consumer-offset-checker --zookeeper=hadoop300:2181 --topic=mytopic --group=my_consumer_group