1. 查看所有topic
kafka-topics.sh --zookeeper hadoop3 --list
2. 創建tooic及topic的partitioner
./kafka-topics.sh --zookeeper hadoop3:2181,hadoop4:2181,hadoop5:2181,hadoop6:2181,hadoop7:2181,hadoop8:2181,hadoop9:2181 --create --topic check-data --partitions 21 --replication-factor 2
說明:在集群模式中,partitioner可以根據集群節點的磁盤空間大小和kafka server這個配置log-dir=/data/kafka-log,/data01/kafka-log,/data02/kafka-log
兩個結合來控制partitions個數,這樣做可以提高性能和避免一個partitions被撐爆
3. 查看kafka中的偏移量
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ynjz003:9092,ynjz004:9092,ynjz005:9092,ynjz006:9092,ynjz007:9092,ynjz008:9092,ynjz009:9092 --topic ynjz-data --time -1
說明:該條命令可以查看到kafka的每個partitioner的位置的偏移量,通過這個可以看出kafka的數據攝入能力和大概的數據容量
4. 設置kafka topic 創建數據保存時間:
kafka-configs.sh --zookeeper ynjz003:2181,ynjz004:2181,ynjz005:2181,ynjz006:2181,ynjz007:2181,ynjz008:2181,ynjz009:2181 --entity-type topics --entity-name statistics-data --alter --add-config retention.ms=259200
說明:如果不設置的話,卡夫卡默認保存時間是7天,但在數據量過大,實時處理過程中為了減少數據的積壓沒必要保存7天,可以根據以上命令設置某個topic數據保存的時間,最后一個參數的單位是秒
5. 查看某個topic在某個消費者的狀態
kafka-consumer-offset-checker --zookeeper gawh220:2181,gawh221:2181,gawh222:2181/kafka --topic ori_31_jn_jt_hcpgpxx_zdr --group to_hive_original_new_513
說明:ori_31_jn_jt_hcpgpxx_zdr在消費者to_hive_original_new_513消費狀態
這里代表還有1000沒有被消費
6. 刪除topic及topic的數據
這塊較為復雜,可以參考這篇博客 https://blog.csdn.net/belalds/article/details/80575751