在spark JOB中消費kafka隊列數據時,通過zookeeper記錄了kafka的偏移量,有時數據量較大,JOB處理不過來,這事需要kafka修改偏移量offset,如:
開始嘗試調用kafka內置的類kafka.tools.UpdateOffsetsInZK,修改offset,如下:
[bsauser@bsa222 kafka]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest config/consumer.properties tam_format_alarm
updating partition 0 with new offset: 6776033
updating partition 1 with new offset: 6782580
updating partition 2 with new offset: 6778624
updating partition 3 with new offset: 6786418
updating partition 4 with new offset: 6780299
updated the offset for 5 partitions
但是重啟spark JOB之后,發現並不成功。突然想到應該跟新zookeeper中該消費group id的偏移量:
操作之前先查看下topic offset的最大值和最小值,進入kafka目錄:
查看最小值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list bsa222:9092,bsa221:9092,bsa220:9092 -topic tam_format_alarm --time -2
結果:
查看最大值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list bsa222:9092,bsa221:9092,bsa220:9092 -topic tam_format_alarm --time -1
結果:
根據最大值最小值區間,設置kafka的offset。
先進入zookeeper安裝目錄,進入bin目錄,執行./zkCli.sh命令,進入終端:
通過下面命令設置consumer group:bsatam.enhance_alarm topic:tam_format_alarm partition:1 offset 為 6776033:
set /consumers/enhance_alarm/offsets/tam_format_alarm/0 6776033
同樣,設置其余的partition,partition 1-4 設置命令一樣,需要修改partiton修改下最后面兩個參數的值:
如partition 4的最大值是6780299,現在需要將offset 調為最大,即命令為:
set /consumers/enhance_alarm/offsets/tam_format_alarm/4 6780299
調試完5個partition后,重啟JOB,運行正常: