kafka rebalance測試與日志分析


本文結合日志打印與源碼分析下kafka沖平衡的機制:

幾種觸發rebalance的場景:

測試環境,版本: kafka 1.1.0,訂閱主題test6,三個分區;三個consumer.

最初的分配方案:每一個消費者會消費一個指定的分區。

1.兩次poll的時間間隔超過maxpollinterval

業務處理邏輯過於繁重,導致在5min(默認值)中內還沒有觸發下一次的poll,從而觸發沖平衡。
服務端日志:

[2021-12-19 21:38:32,954] INFO [GroupCoordinator 0]: Member consumer-1-717725b3-b341-44de-92db-941243fdc42f in group group-1 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:38:32,954] INFO [GroupCoordinator 0]: Preparing to rebalance group group-1 with old generation 7 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:38:35,018] INFO [GroupCoordinator 0]: Stabilized group group-1 generation 8 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:38:35,019] INFO [GroupCoordinator 0]: Assignment received from leader for group group-1 for generation 8 (kafka.coordinator.group.GroupCoordinator)

客戶端日志:

[2021-12-19 21:38:35,016] INFO Revoking previously assigned partitions [test6-1] for group group-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2021-12-19 21:38:35,016] INFO (Re-)joining group group-1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-12-19 21:38:35,020] INFO Successfully joined group group-1 with generation 8 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-12-19 21:38:35,020] INFO Setting newly assigned partitions [test6-1, test6-0] for group group-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

2.心跳超時

通過修改客戶端的源碼實現,待補充

synchronized RequestFuture<Void> sendHeartbeatRequest() {
        try {
            log.info("Test heartbeat thread block,sleep 10s");
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("Sending Heartbeat request to coordinator {}", coordinator);
        HeartbeatRequest.Builder requestBuilder =
                new HeartbeatRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId);
        return client.send(coordinator, requestBuilder)
                .compose(new HeartbeatResponseHandler());
    } 

時間上相差幾乎10s.
服務端日志:

[2021-12-20 21:48:47,685] INFO [GroupCoordinator 0]: Member consumer2-dd4a191e-c537-4e3d-b5b9-81c17c5f3136 in group g1  has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-12-20 21:48:47,685] INFO [GroupCoordinator 0]: Preparing to rebalance group g1 with old generation 190 (__consumer_offsets-42) (kafka.coordinator.group.GroupCoordinator)

客戶端日志:

[2021-12-20 21:48:38,681] INFO [Consumer clientId=consumer2, groupId=g1] This is consumer2:  sleep 10s (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

3.新加入member

服務端日志:

[2021-12-19 21:21:50,010] INFO [GroupCoordinator 0]: Preparing to rebalance group group-1 with old generation 4 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:21:52,727] INFO [GroupCoordinator 0]: Stabilized group group-1 generation 5 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:21:52,732] INFO [GroupCoordinator 0]: Assignment received from leader for group group-1 for generation 5 (kafka.coordinator.group.GroupCoordinator)

客戶端日志:

先找到協調者:
                          INFO Discovered coordinator 101.100.0.***:9092 
新加入的partitions為空[]:
                          INFO Revoking previously assigned partitions [] for group group-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2021-12-19 21:21:49,998] INFO (Re-)joining group group-1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-12-19 21:21:52,737] INFO Successfully joined group group-1 with generation 5 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-12-19 21:21:52,740] INFO Setting newly assigned partitions **[test6-0]** for group group-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

4.member 離開

服務端日志:

[2021-12-19 21:25:38,412] INFO [GroupCoordinator 0]: Member consumer-1-3903ffdf-b59a-4e33-bb7b-5287afd4812c in group group-1 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:25:38,412] INFO [GroupCoordinator 0]: Preparing to rebalance group group-1 with old generation 5 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:25:40,868] INFO [GroupCoordinator 0]: Stabilized group group-1 generation 6 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:25:40,869] INFO [GroupCoordinator 0]: Assignment received from leader for group group-1 for generation 6 (kafka.coordinator.group.GroupCoordinator)

客戶端日志:

[2021-12-19 21:25:40,867] INFO Revoking previously assigned partitions [test6-1] for group group-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2021-12-19 21:25:40,867] INFO (Re-)joining group group-1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-12-19 21:25:40,870] INFO Successfully joined group group-1 with generation 6 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
將會多消費一個分區
[2021-12-19 21:25:40,871] INFO Setting newly assigned partitions **[test6-1, test6-0]** for group group-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

5.unsubscribe

服務端日志:
注意時間 相差10s(心跳檢查間隔)

[2021-12-19 21:59:06,636] INFO [GroupCoordinator 0]: Member consumer-1-1b50470e-f950-4c37-bf95-3156321a8cbb in group group-1 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:59:06,636] INFO [GroupCoordinator 0]: Preparing to rebalance group group-1 with old generation 15 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:59:08,641] INFO [GroupCoordinator 0]: Stabilized group group-1 generation 16 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:59:08,642] INFO [GroupCoordinator 0]: Assignment received from leader for group group-1 for generation 16 (kafka.coordinator.group.GroupCoordinator)

客戶端日志:

2021-12-19 21:58:57: unsubscribe

6.close

注意時間上,close與發生沖平衡幾乎是同一時間,也就是說不是通過心跳機制告知的。
服務端日志:

[2021-12-19 22:02:33,080] INFO [GroupCoordinator 0]: Preparing to rebalance group group-1 with old generation 17 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 22:02:33,272] INFO [GroupCoordinator 0]: Stabilized group group-1 generation 18 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 22:02:33,274] INFO [GroupCoordinator 0]: Assignment received from leader for group group-1 for generation 18 (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 22:02:45,544] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 8 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

客戶端日志:

2021-12-19 22:02:33: close

總結:比較值得注意的是目前測試的過程中,只發現consumer主動的close或者主動的加入,preparing to rebalance 日志之前不會報明顯的錯誤,這有助於問題的排查。

未完待續,敬請期待。。。。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM