kafka相關的常用命令
后台啟動kafka
kafka-server-start.sh /opt/app/kafka_2.11-1.1.0/config/server.properties > /dev/null 2>&1 &
關閉kafka
kafka-server-stop.sh
創建topic
topic的添加和修改使用下面命令
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
副本(replication)j控制每條消息在服務器中的備份,如果有3個副本,那么允許最多有2個節點宕機才能不丟數據,集群中推薦設置2或3個副本,才不會中斷數據消費。
分區(partition)控制topic將分片成多少log。關於分區數的影響,首先每個分區必須完全安裝在獨立的服務器上。因此,如果你有20個分區的話(讀和寫的負載),那么處理完整的數據集不要超過20個服務器(不計算備份)。最后的分區數影響你的消費者的最大並行。命令行上添加的配置覆蓋服務器的默認設置。
例如,創建topic帶有4個分區,2個副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
修改刪除topic
添加分區:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partition 40
添加分區不能改變現有的數據,如果分區被使用中,這就可能擾亂消費者。如果數據通過哈希划分,那么該分區將通過添加分區進行洗牌,但kafka不以任何方式自動分配數據。
添加配置:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y
移除配置:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x=y
刪除topic:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
主題刪除選項默認是關閉的,設置服務器配置開啟它。
delete.topic.enable=true
查看topic列表
kafka-topics.sh --list --zookeeper localhost:2181
查詢集群描述
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
生產者(向topic寫入數據)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
生產者(支持0.9-1.1.1版本)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
消費者(從topic消費數據)(0.9以下舊版寫法),間接通過zookeeper端口2181消費數據
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
消費者(支持0.9-1.1.1版本),直接通過kafka端口9092消費數據
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
消費者(支持2.0.0版本+),直接通過kafka端口9092消費數據
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer.properties
注:“--new-consumer”已經不能使用啦
消費者組列表查詢(支持0.9-1.1.1版本)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
消費者組列表查詢(支持2.0.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
顯示某個消費組的消費詳情(僅支持offset存儲在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test
顯示某個消費組的消費詳情(支持0.9版本)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group
可能遇到的問題
kafka消費者查詢出錯:https://blog.csdn.net/getyouwant/article/details/79000524
參考資料: