最近在spark讀取kafka消息時,每次讀取都會從kafka最新的offset讀取。但是如果數據丟失,如果在使用Kafka來分發消息,在數據處理的過程中可能會出現處理程序出異常或者是其它的錯誤,會造成數據丟失或不一致。這個時候你也許會想要通過kafka把數據從新處理一遍,或者指定kafka的offset讀取。kafka默認會在磁盤上保存到7天的數據,你只需要把kafka的某個topic的consumer的offset設置為某個值或者是最小值,就可以使該consumer從你設置的那個點開始消費。這就需要從zk里面修改offset的值。
查詢topic的offset的范圍
用下面命令可以查詢到topic:DynamicRange broker:SparkMaster:9092的offset的最小值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slave6:9092 -topic videoplay --time -2
輸出
DynamicRange:0:1288
查詢offset的最大值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slave6:9092 -topic videoplay --time -1
輸出
DynamicRange:0:7885
從上面的輸出可以看出topic:DynamicRange只有一個partition:0 offset范圍為:[1288,7885]
設置consumer group的offset
啟動zookeeper client
/zookeeper/bin/zkCli.sh
通過下面命令設置consumer group:DynamicRangeGroup topic:DynamicRange partition:0的offset為1288:
set /consumers/DynamicRangeGroup/offsets/DynamicRange/0 1288
注意如果你的kafka設置了zookeeper root,比如為/kafka,那么命令應該改為:
set /kafka/consumers/DynamicRangeGroup/offsets/DynamicRange/0 1288
生效
重啟相關的應用程序,就可以從設置的offset開始讀數據了。