Kafka consumer group位移重設


  本文闡述如何使用Kafka自帶的kafka-consumer-groups.sh腳本隨意設置消費者組(consumer group)的位移。需要特別強調的是, 這是0.11.0.0版本提供的新功能且只適用於新版本consumer。

  在新版本之前,如果要為已有的consumer group調整位移必須要手動編寫Java程序調用KafkaConsumer#seek方法,費時費力不說還容易出錯。0.11.0.0版本豐富了kafka-consumer-groups腳本的功能,用戶可以直接使用該腳本很方便地為已有的consumer group重新設置位移,但前提是:consumer group狀態必須是inactive的,即不能是處於正在工作中的狀態。

   先務虛一下。總體來說,重設位移的流程由3步組成,如下圖所示:

  • 確定topic作用域——當前有3種作用域指定方式:--all-topics(為consumer group下所有topic的所有分區調整位移),--topic t1 --topic t2(為指定的若干個topic的所有分區調整位移),--topic t1:0,1,2(為指定的topic分區調整位移)
  • 確定位移重設策略——當前支持8種設置規則:
    • --to-earliest:把位移調整到分區當前最小位移
    • --to-latest:把位移調整到分區當前最新位移
    • --to-current:把位移調整到分區當前位移
    • --to-offset <offset>: 把位移調整到指定位移處
    • --shift-by N: 把位移調整到當前位移 + N處,注意N可以是負數,表示向前移動
    • --to-datetime <datetime>:把位移調整到大於給定時間的最早位移處,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
    • --by-duration <duration>:把位移調整到距離當前時間指定間隔的位移處,duration格式是PnDTnHnMnS,比如PT0H5M0S
    • --from-file <file>:從CSV文件中讀取調整策略
  • 確定執行方案——當前支持3種方案:
    • 什么參數都不加:只是打印出位移調整方案,不具體執行
    • --execute:執行真正的位移調整
    • --export:把位移調整方案按照CSV格式打印,方便用戶成csv文件,供后續直接使用

針對上面的8種策略,本文重點演示前面7種策略。

首先,我們創建一個測試topic,5個分區,並發送5,000,000條測試消息:

> bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 5 --replication-factor 1 --topic test

Created topic "test".

> bin/kafka-producer-perf-test.sh --topic test --num-records 5000000 --throughput -1 --record-size 100 --producer-props bootstrap.servers=localhost:9092 acks=-1

 

1439666 records sent, 287760.5 records/sec (27.44 MB/sec), 75.7 ms avg latency, 317.0 max latency.
1541123 records sent, 308163.0 records/sec (29.39 MB/sec), 136.4 ms avg latency, 480.0 max latency.
1878025 records sent, 375529.9 records/sec (35.81 MB/sec), 58.2 ms avg latency, 600.0 max latency.
5000000 records sent, 319529.652352 records/sec (30.47 MB/sec), 86.33 ms avg latency, 600.00 ms max latency, 38 ms 50th, 319 ms 95th, 516 ms 99th, 591 ms 99.9th.

 

 然后,啟動一個console consumer程序,組名設置為test-group:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer-property group.id=test-group

..............

待運行一段時間后關閉consumer程序將group設置為inactive。現在運行kafka-consumer-groups.sh腳本首先確定當前group的消費進度:

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --describe
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).


TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 1000000 1000000 0 consumer-1-8688633a-2f88-4c41-89ca-fd0cd6d19ec7 /127.0.0.1 consumer-1
test 1 1000000 1000000 0 consumer-1-8688633a-2f88-4c41-89ca-fd0cd6d19ec7 /127.0.0.1 consumer-1
test 2 1000000 1000000 0 consumer-1-8688633a-2f88-4c41-89ca-fd0cd6d19ec7 /127.0.0.1 consumer-1
test 3 1000000 1000000 0 consumer-1-8688633a-2f88-4c41-89ca-fd0cd6d19ec7 /127.0.0.1 consumer-1
test 4 1000000 1000000 0 consumer-1-8688633a-2f88-4c41-89ca-fd0cd6d19ec7 /127.0.0.1 consumer-1

由上面輸出可知,當前5個分區LAG列的值都是0,表示全部消費完畢。現在我們演示下如何重設位移。

1. --to-earliest

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).


TOPIC PARTITION NEW-OFFSET
test 0 0
test 1 0
test 4 0
test 3 0
test 2 0

上面輸出表明,所有分區的位移都已經被重設為0

2. --to-latest

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-latest --execute
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).


TOPIC PARTITION NEW-OFFSET
test 0 1000000
test 1 1000000
test 4 1000000
test 3 1000000
test 2 1000000

上面輸出表明,所有分區的位移都已經被重設為最新位移,即1,000,000

3.  --to-offset <offset>

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).


TOPIC PARTITION NEW-OFFSET
test 0 500000
test 1 500000
test 4 500000
test 3 500000
test 2 500000

上面輸出表明,所有分區的位移都已經調整為給定的500000

4.  --to-current

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).


TOPIC PARTITION NEW-OFFSET
test 0 500000
test 1 500000
test 4 500000
test 3 500000
test 2 500000

輸出表明所有分區的位移都已經被移動到當前位移(這個有點傻,因為位移距上一步沒有變動)

5. --shift-by N

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).


TOPIC PARTITION NEW-OFFSET
test 0 400000
test 1 400000
test 4 400000
test 3 400000
test 2 400000

輸出表明所有分區的位移被移動到(500000 - 100000) = 400000處

6. --to-datetime

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).


TOPIC PARTITION NEW-OFFSET
test 0 1000000
test 1 1000000
test 4 1000000
test 3 1000000
test 2 1000000

將所有分區的位移調整為2017年8月4日14:30之后的最早位移

7. --by-duration

bogon:kafka_0.11 huxi$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --by-duration PT0H30M0S
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).


TOPIC PARTITION NEW-OFFSET
test 0 0
test 1 0
test 4 0
test 3 0
test 2 0

將所有分區位移調整為30分鍾之前的最早位移


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM