修改kafka topic offset 的方法


 

在spark JOB中消費kafka隊列數據時,通過zookeeper記錄了kafka的偏移量,有時數據量較大,JOB處理不過來,這事需要kafka修改偏移量offset,如:

 

 

開始嘗試調用kafka內置的類kafka.tools.UpdateOffsetsInZK,修改offset,如下:

[bsauser@bsa222 kafka]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest config/consumer.properties tam_format_alarm
updating partition 0 with new offset: 6776033
updating partition 1 with new offset: 6782580
updating partition 2 with new offset: 6778624
updating partition 3 with new offset: 6786418
updating partition 4 with new offset: 6780299
updated the offset for 5 partitions

但是重啟spark JOB之后,發現並不成功。突然想到應該跟新zookeeper中該消費group id的偏移量:

操作之前先查看下topic offset的最大值和最小值,進入kafka目錄:

查看最小值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list bsa222:9092,bsa221:9092,bsa220:9092 -topic tam_format_alarm --time -2

結果:

 

查看最大值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list bsa222:9092,bsa221:9092,bsa220:9092 -topic tam_format_alarm --time -1

結果:

 

根據最大值最小值區間,設置kafka的offset。

先進入zookeeper安裝目錄,進入bin目錄,執行./zkCli.sh命令,進入終端:

通過下面命令設置consumer group:bsatam.enhance_alarm topic:tam_format_alarm partition:1  offset 為 6776033:

set /consumers/enhance_alarm/offsets/tam_format_alarm/0 6776033

 

 

同樣,設置其余的partition,partition 1-4 設置命令一樣,需要修改partiton修改下最后面兩個參數的值:

如partition 4的最大值是6780299,現在需要將offset 調為最大,即命令為:

set /consumers/enhance_alarm/offsets/tam_format_alarm/4 6780299

 

 調試完5個partition后,重啟JOB,運行正常:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM