如果你在使用Kafka來分發消息,在數據處理的過程中可能會出現處理程序出異常或者是其它的錯誤,會造成數據丟失或不一致。這個時候你也許會想要通過kafka把數據從新處理一遍,我們知道kafka默認會在磁盤上保存到7天的數據,你只需要把kafka的某個topic的consumer的offset設置為某個值或者是最小值,就可以使該consumer從你設置的那個點開始消費。
查詢topic的offset的范圍
用下面命令可以查詢到topic:DynamicRange broker:SparkMaster:9092的offset的最小值:
$ /opt/cloudera/parcels/KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list SparkMaster:9092 —topic DynamicRange --time -2
輸出
DynamicRange:0:1288
查詢offset的最大值:
$ /opt/cloudera/parcels/KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list SparkMaster:9092 —topic DynamicRange --time -1
輸出
DynamicRange:0:7885
從上面的輸出可以看出topic:DynamicRange只有一個partition:0 offset范圍為:[1288,7885]
設置consumer group的offset
啟動zookeeper client
$ /opt/cloudera/parcels/CDH/lib/zookeeper/bin/zkCli.sh
通過下面命令設置consumer group:DynamicRangeGroup topic:DynamicRange partition:0的offset為1288:
set /consumers/DynamicRangeGroup/offsets/DynamicRange/0 1288
注意如果你的kafka設置了zookeeper root,比如為/kafka,那么命令應該改為:
set /kafka/consumers/DynamicRangeGroup/offsets/DynamicRange/0 1288
生效
重啟相關的應用程序,就可以從設置的offset開始讀數據了。