kafka集群搭建:

1 查看有哪些group:
1 [root@slave3 bin]# kafka-consumer-groups.sh --bootstrap-server master:9092 --list 2 Note: This will not show information about old Zookeeper-based consumers. 3 4 sys 5 console-consumer-5032 6 console-consumer-43565 7 console-consumer-72223 8 UDISKMONITORLoggroup
2 查看都有哪些topic:
[root@slave2 bin]# kafka-topics.sh --zookeeper master:2181,slave1 --list mytest mytest1 mytest2 mytest3 event app
3 查看某個group的消費信息(0.10.1.1+版本)
[root@master ~]# kafka-consumer-groups.sh --bootstrap-server master:9092 --describe --group sys Consumer group 'sys' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID event 0 709 709 0 - - - app 0 820 838 18 -
4 把某個group的offset設置到最初或指定位置:
# 把group sys在topic:event上的offcet恢復到最初 [root@slave3 ~]# kafka-consumer-groups.sh --bootstrap-server master:9092 --group sys --topic event --reset-offsets --to-earliest –execute TOPIC PARTITION NEW-OFFSET event 0 519 # 【注意】最初的offset不一定是0,比如本例,519位置以前的數據已經過期,所以offset的最初位置就是519 # 恢復offcet到指定位置 [root@slave3 ~]# kafka-consumer-groups.sh --bootstrap-server master:9092 --group sys --topic event --reset-offsets --to-offset 1000 --execute [2020-06-17 16:26:06,074] WARN New offset (518) is lower than earliest offset. Value will be set to 519 (kafka.admin.ConsumerGroupCommand$) TOPIC PARTITION NEW-OFFSET event 0 1000 # 把offcet從當前位置往前移動100個,如果是正數就是往后移動。 [root@slave3 ~]# kafka-consumer-groups.sh --bootstrap-server master:9092 --group sys --topic event --reset-offsets --shift-by -100 --execute
5 創建topic:
[root@slave3 bin]# kafka-topics.sh --zookeeper slave1:2181,slave2,slave3 --create --topic mytest --partitions 3 --replication-factor 3 # 默認端口2181,可寫可不寫。最好節點都寫上,如果某個節點掛掉會自動尋找下一個節點。 # topic后面的參數是topic名字,partitions后面是分區數量,replication-factor后面是副本數量 # 增加分區,原來只有1個parttions,現在增加到3個。 [root@slave3 bin]# kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --partitions 3 --topic mytest
6 在控制台創建生產者:
[root@slave3 bin]# ./kafka-console-producer.sh --broker-list slave1:9092,slave2,slave3 --topic mytest zookeeper is not a recognized option # 如果出錯就用下面的語句,看好端口 [root@slave3 bin]# ./kafka-console-producer.sh --broker-list 10.18.1.103:9092 --topic mytest >123
7 在控制台創建消費者:
[root@slave3 bin]# ./kafka-console-consumer.sh --zookeeper slave2:2181,slave3 --topic mytest Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. # 如果出現上面的告警提示,也可以用。如果不想出現這個提示可以使用下面的命令,注意端口變化 [root@slave3 bin]# ./kafka-console-consumer.sh --bootstrap-server 10.18.1.102:9092 --topic test # 如果想從頭開始讀,可以在最后加上--from-beginning [root@slave3 bin]# ./kafka-console-consumer.sh --bootstrap-server 10.18.1.102:9092 --topic test --from-beginning
8 查看topic的描述信息:
[root@slave2 bin]# kafka-topics.sh --zookeeper slave1,slave2 --describe --topic mytest1 Topic:mytest1 PartitionCount:5 ReplicationFactor:1 Configs: Topic: mytest1 Partition: 0 Leader: 116 Replicas: 116 Isr: 116 Topic: mytest1 Partition: 1 Leader: 117 Replicas: 117 Isr: 117 Topic: mytest1 Partition: 2 Leader: 118 Replicas: 118 Isr: 118 Topic: mytest1 Partition: 3 Leader: 119 Replicas: 119 Isr: 119 Topic: mytest1 Partition: 4 Leader: 116 Replicas: 116 Isr: 116
9 查看topic在各partition的offset最小最大值:
[root@slave2 data]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slave1:9092,slave2:9092 --topic mytest1 --time -1 mytest1:1:932 mytest1:2:893 mytest1:0:868
10 刪除topic:
[root@slave2 data]# kafka-topics.sh --zookeeper master,slave1,slave2 --delete --topic mytest1 # 此時mytest1被標記刪除,但還可以使用,默認一周后會刪除 # 然后去kafka存數據的目錄,把有關mytest1的相關數據刪除掉 [root@slave2 data]# zkCli.sh # 或者zookeeper-client [zk: localhost:2181(CONNECTED) 2] rmr /brokers/topics/mytest1 # 去zookeeper中刪除當前topic的原數據 [zk: localhost:2181(CONNECTED) 2] rmr /admin/delete_topics/mytest1 # 刪除‘被標記已刪除’的topic [zk: localhost:2181(CONNECTED) 2]ls /config/topics/mytest1 # 這里也有,最好也刪除吧 # 至此刪除完畢