kafka 的offset的重置


最近在spark讀取kafka消息時,每次讀取都會從kafka最新的offset讀取。但是如果數據丟失,如果在使用Kafka來分發消息,在數據處理的過程中可能會出現處理程序出異常或者是其它的錯誤,會造成數據丟失或不一致。這個時候你也許會想要通過kafka把數據從新處理一遍,或者指定kafka的offset讀取。kafka默認會在磁盤上保存到7天的數據,你只需要把kafka的某個topic的consumer的offset設置為某個值或者是最小值,就可以使該consumer從你設置的那個點開始消費。這就需要從zk里面修改offset的值。

 

查詢topic的offset的范圍

用下面命令可以查詢到topic:DynamicRange broker:SparkMaster:9092的offset的最小值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slave6:9092 -topic videoplay --time -2

輸出

DynamicRange:0:1288

查詢offset的最大值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slave6:9092 -topic videoplay --time -1

輸出

DynamicRange:0:7885

從上面的輸出可以看出topic:DynamicRange只有一個partition:0 offset范圍為:[1288,7885]

 

設置consumer group的offset

啟動zookeeper client

/zookeeper/bin/zkCli.sh

通過下面命令設置consumer group:DynamicRangeGroup topic:DynamicRange partition:0的offset為1288:

set /consumers/DynamicRangeGroup/offsets/DynamicRange/0 1288

注意如果你的kafka設置了zookeeper root,比如為/kafka,那么命令應該改為:

set /kafka/consumers/DynamicRangeGroup/offsets/DynamicRange/0 1288

 

生效 

重啟相關的應用程序,就可以從設置的offset開始讀數據了。 

 

參考:https://metabroadcast.com/blog/resetting-kafka-offsets


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM