使用Kafka自帶的kafka-consumer-groups.sh腳本可隨意設置消費者組(consumer group),這是0.11.0.0版本提供的新功能,設置的前提是:consumer group狀態是inactive的,即不能是處於正在工作中的狀態。
重設位移的流程由下面3步組成:
1、確定位移重設策略——當前支持8種設置規則:
--to-earliest:把位移調整到分區當前最小位移
--to-latest:把位移調整到分區當前最新位移
--to-current:把位移調整到分區當前位移
--to-offset <offset>: 把位移調整到指定位移處
--shift-by N: 把位移調整到當前位移 + N處,注意N可以是負數,表示向前移動
--to-datetime <datetime>:把位移調整到大於給定時間的最早位移處,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx
--by-duration <duration>:把位移調整到距離當前時間指定間隔的位移處,duration格式是PnDTnHnMnS
--from-file <file>:從CSV文件中讀取調整策略
2、確定執行方案——當前支持3種方案:
什么參數都不加:只是打印出位移調整方案,不具體執行
--execute:執行真正的位移調整
--export:把位移調整方案按照CSV格式打印,方便用戶成csv文件,供后續直接使用
3、確定topic作用域——當前有3種作用域指定方式:--all-topics(為consumer group下所有topic的所有分區調整位移),--topic t1 --topic t2(為指定的若干個topic的所有分區調整位移),--topic t1:0,1,2(為指定的topic分區調整位移)
下面實例設置如何重設位移:
1.--to-earliest : 有分區的位移都被重設為0
bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute
2.--to-latest :所有分區的位移都被重設為最新位移,即1,000,000
bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-latest --execute
3.--to-offset <offset> : 所有分區的位移都調整為給定的500000
bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute
4.--to-current: 所有分區的位移都被移動到當前位移,位移距上一步沒有變動
bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute
5.--shift-by N :所有分區的位移被移動到(500000 - 100000) = 400000處
bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute
6.--to-datetime: 所有分區的位移調整為2019年3月3日3:30之后的最早位移
bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2019-03-03T3:30:00.000
7.--by-duration:所有分區位移調整為30分鍾之前的最早位移
bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --by-duration PT0H30M0S
