首先kafka的消費者組機制一直很受詬病,就是很受詬病的。
過度設計,也不實用,永遠在做沒必要的重平衡。很多情況架構上和客戶端上都可以自己做。
其次,如果用了咋辦
什么時候會發生rebalance?
前面我們已經說到,rebalance 其實就是對 partition 進行重新分配。那么什么時候會發生 rebalance 呢?其實在以下三種情況下,會觸發 rebalance:
- 訂閱 Topic 的分區數發生變化。
- 訂閱的 Topic 個數發生變化。
- 消費組內成員個數發生變化。例如有新的 consumer 實例加入該消費組或者離開組。
-
「消費組內成員個數發生變化」的幾種情況:
- 新成員加入
- 組成員主動離開
- 組成員崩潰
--------------------------------------------------------------------
20220421,
一次重平衡復現實驗,重啟了一台機器上的各個consumer group成員。用了4分鍾最終完成重平衡。
第一次報警:
{"Level":"error","Time":"2022-04-21T20:38:02.9884226+08:00","LoggerName":"","Message":"Cleanup session MemberID :[vwin04_hunter-69c09b1d-a46b-40f8-8e55-e7c07580b46b] , GenerationID :[3980] , Claims :[map[vsdir:[12 13 14 15 16 17]]]","Caller":{"Defined":true,"PC":23615323,"File":"/groupconsumer.go","Line":67,"Function":"cloudscan/pubsub.(*GroupConsumer).Cleanup"},"Stack":"","Subject":"csscand告警 engine: [hunter] , version:[1.7.0]","IP":"","HostName":"vwin04"}
最后一次報警:
{"Level":"error","Time":"2022-04-21T20:42:08.2430919+08:00","LoggerName":"","Message":"Cleanup session MemberID :[vwin04_comodo-724c94cf-551f-4cd4-abb5-389c47f63a07] , GenerationID :[11674] , Claims :[map[vsdir:[16 17 18 19 20 21 22 23]]]","Caller":{"Defined":true,"PC":23615323,"File":"/groupconsumer.go","Line":67,"Function":"cloudscan/pubsub.(*GroupConsumer).Cleanup"},"Stack":"","Subject":"csscand告警 engine: [comodo] , version:[1.7.0]","IP":"","HostName":"vwin04"}
------------------------------------------------------------------------------
20220419
調了
config.Consumer.Group.Session.Timeout = time.Second * 120
加了這一行config.Consumer.Group.Rebalance.Timeout = config.Consumer.Group.Session.Timeout
config.Consumer.Group.Rebalance.Timeout rebalance的timeout 設置為了2分鍾,並且和Session.Timeout相等
20220421
Group vsarcabit remove dynamic members who haven't joined: Set(10-52-6-136_arcabit-590f2304-7918-44f7-9c79-753d548bd41e) (kafka.coordinator.group.GroupCoordinator)
參考https://zhuanlan.zhihu.com/p/109574627
RPC
與 Rebalance 相關有 JoinGroup 和 SyncGroup 兩個接口,再加上 Heartbeat 接口。
JoinGroup 接口演進到了第六版:
JoinGroup Request (Version: 6) => group_id session_timeout_ms rebalance_timeout_ms member_id group_instance_id protocol_type [protocols] TAG_BUFFER
group_id => COMPACT_STRING
session_timeout_ms => INT32
rebalance_timeout_ms => INT32
member_id => COMPACT_STRING
group_instance_id => COMPACT_NULLABLE_STRING
protocol_type => COMPACT_STRING
protocols => name metadata TAG_BUFFER
name => COMPACT_STRING
metadata => COMPACT_BYTES
JoinGroup Response (Version: 6) => throttle_time_ms error_code generation_id protocol_name leader member_id [members] TAG_BUFFER
throttle_time_ms => INT32
error_code => INT16
generation_id => INT32
protocol_name => COMPACT_STRING
leader => COMPACT_STRING
member_id => COMPACT_STRING
members => member_id group_instance_id metadata TAG_BUFFER
member_id => COMPACT_STRING
group_instance_id => COMPACT_NULLABLE_STRING
metadata => COMPACT_BYTES
當有任何新的 Consumer 發起 JoinGroup 后,Coordinator 會進入 PreparingBalance 狀態,遞增 generationID,隨后等待活躍的所有 Consumer 重新加入 Group,等待的期限為 rebalance_timeout_ms(默認值為 60s)。
里面有兩個 timeout 值得注意:
- session_timeout_ms:表示 Coordinator 如果超過該超時值沒有收到心跳,則認為 session 過期;
- rebalance_timeout_ms:表示 Coordinator 等待所有 Consumer 重新申請加入的最大時限;
假如我擴容 100 個 Consumer,要等多久生效?顯然不能是 60s。那么 Coordinator 怎么知道要等到什么時候呢?等待所有活躍 session 的 Consumer 都發送過 JoinGroup 便可以了。每個 Consumer 每 3s 與 Coordinator 發一次心跳,這也意味着一次 rebalance 大約需要 3s 左右的等待。如果減少心跳的時間間隔,Rebalance 的生效時間應能夠相應減少。
也就是說,所有客戶端的 JoinGroup 會阻塞直到所有活躍 Session 的 Consumer 皆執行了 JoinGroup 為止。隨后 ConsumerGroup 將進入 CompletingBalance 狀態。
haven't joined,就是join的階段沒成功,那為什么沒成功?
等待所有活躍 session 的 Consumer 都發送過 JoinGroup 便可以了。每個 Consumer 每 3s 與 Coordinator 發一次心跳,這也意味着一次 rebalance 大約需要 3s 左右的等待。如果減少心跳的時間間隔,Rebalance 的生效時間應能夠相應減少。
我們的配置,增大了config.Consumer.Group.Heartbeat.Interval = time.Second * 20,
由3秒變成了,20秒,那么相當於
原來默認
rebalance_timeout_ms = 60
3秒一次,可以進行20次
現在20秒一次,只能進行3次,極大減少了。調整
rebalance_timeout_ms 為120秒后,也只是變成了6次。
所以應該減少Heartbeat.Interval=5秒,或者就為3秒
像raft協議一樣,心跳是待着邏輯信息在里面的,所以心跳不能間隔太大
-------------------------------------------------------------------------------------------------------------
20220418:
generation 3258 是可用的。
一個正常的kafka server端 rablance日志:
[2022-04-18 12:35:34,586] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in Stable state. Created a new member id _arcabit-a988337d-c2e7-436f-a556-7e7f34576a83 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:35:34,586] INFO [GroupCoordinator 1]: Preparing to rebalance group vsarcabit in state PreparingRebalance with old generation 3257 (__consumer_offsets-11) (reason: Adding new member _arcabit-a988337d-c2e7-436f-a556-7e7f34576a83 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:35:35,047] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in PreparingRebalance state. Created a new member id _arcabit-dce0a743-6b08-4ab7-879c-7518c29f15c8 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:35:43,245] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in PreparingRebalance state. Created a new member id _arcabit-ae61f5ce-c88f-44e7-9475-95e150c39741 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:35:48,499] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in PreparingRebalance state. Created a new member id_arcabit-70411222-99d0-453c-aa53-e7c5262569e4 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:36:14,537] INFO [GroupCoordinator 1]: Stabilized group vsarcabit generation 3258 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:36:14,540] INFO [GroupCoordinator 1]: Assignment received from leader for group vsarcabit for generation 3258 (kafka.coordinator.group.GroupCoordinator)
一個正常的client端日志,
Apr 18 12:36:14 csscand[24350]: {"level":"info","time":"2022-04-18T12:36:14.578+0800","caller":"pubsub/groupconsumer.go:57","msg":"Setup session MemberID :[_arcabit-1b56a50a-ec96-4a48-986b-d90afc70cb0d] , GenerationID :[3258] , Claims :[map[vsdir:[6 7 8]]]",,"version":"v1.6.4","author":"donghongchen","buildDate":"2022-04-18T10:51:05","engine":"arcabit","app":"csscand"}
-----------------------------------------------------------------------------------
所以,有 Group vsarcabit remove dynamic members who haven't joined,是異常的。
20220418,一個完整的kafka 某個消費者組 rebalance日志:
[2022-04-17 02:35:12,783] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in Stable state. Created a new member id 10-52-6-136_arcabit-590f2304-7918-44f7-9c79-753d548bd41e for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 02:35:12,783] INFO [GroupCoordinator 1]: Preparing to rebalance group vsarcabit in state PreparingRebalance with old generation 3105 (__consumer_offsets-11) (reason: Adding new member 10-52-6-136_arcabit-590f2304-7918-44f7-9c79-753d548bd41e with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 02:36:12,783] INFO [GroupCoordinator 1]: Group vsarcabit remove dynamic members who haven't joined: Set(10-52-6-28_arcabit-64bcc707-3180-46ac-ada5-06a1d117b268) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 02:36:12,783] INFO [GroupCoordinator 1]: Stabilized group vsarcabit generation 3106 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 02:36:12,789] INFO [GroupCoordinator 1]: Assignment received from leader for group vsarcabit for generation 3106 (kafka.coordinator.group.GroupCoordinator)
這里雖然時間有空隙,但是一個comsumer group的連續日志:
[2022-04-17 03:17:16,370] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in Stable state. Created a new member id 10-52-6-224_arcabit-ade8d685-c136-4edc-bbef-9ed297b9a8a0 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:17:16,370] INFO [GroupCoordinator 1]: Preparing to rebalance group vsarcabit in state PreparingRebalance with old generation 3106 (__consumer_offsets-11) (reason: Adding new member 10-52-6-224_arcabit-ade8d685-c136-4edc-bbef-9ed297b9a8a0 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:17:20,200] INFO [ProducerStateManager partition=tb_asset-9] Writing producer snapshot at offset 380969561 (kafka.log.ProducerStateManager)
[2022-04-17 03:17:20,201] INFO [Log partition=tb_asset-9, dir=/data/kafka-data] Rolled new log segment at offset 380969561 in 1 ms. (kafka.log.Log)
[2022-04-17 03:17:58,606] INFO [GroupCoordinator 1]: Group vstws remove dynamic members who haven't joined: Set(vwin01_tws-ad4fae20-babf-4897-90c3-7ccbb46b04af) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:17:58,606] INFO [GroupCoordinator 1]: Stabilized group vstws generation 10806 (__consumer_offsets-23) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:17:58,609] INFO [GroupCoordinator 1]: Assignment received from leader for group vstws for generation 10806 (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:18:16,371] INFO [GroupCoordinator 1]: Group vsarcabit remove dynamic members who haven't joined: Set(10-52-6-136_arcabit-590f2304-7918-44f7-9c79-753d548bd41e) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:18:16,371] INFO [GroupCoordinator 1]: Stabilized group vsarcabit generation 3107 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:18:16,375] INFO [GroupCoordinator 1]: Assignment received from leader for group vsarcabit for generation 3107 (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:24:20,903] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in Stable state. Created a new member id 10-52-6-249_arcabit-5b1cefdc-9755-44a0-9806-521eea353142 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:24:20,903] INFO [GroupCoordinator 1]: Preparing to rebalance group vsarcabit in state PreparingRebalance with old generation 3107 (__consumer_offsets-11) (reason: Adding new member 10-52-6-249_arcabit-5b1cefdc-9755-44a0-9806-521eea353142 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:25:20,902] INFO [GroupCoordinator 1]: Group vsarcabit remove dynamic members who haven't joined: Set(10-52-6-224_arcabit-ade8d685-c136-4edc-bbef-9ed297b9a8a0) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:25:20,902] INFO [GroupCoordinator 1]: Stabilized group vsarcabit generation 3108 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:25:20,903] INFO [GroupCoordinator 1]: Assignment received from leader for group vsarcabit for generation 3108 (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:55:30,435] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in Stable state. Created a new member id 10-52-6-131_arcabit-02916ed4-6ada-416c-8b41-a78829ee0488 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:55:30,436] INFO [GroupCoordinator 1]: Preparing to rebalance group vsarcabit in state PreparingRebalance with old generation 3108 (__consumer_offsets-11) (reason: Adding new member 10-52-6-131_arcabit-02916ed4-6ada-416c-8b41-a78829ee0488 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:56:30,436] INFO [GroupCoordinator 1]: Group vsarcabit remove dynamic members who haven't joined: Set(10-52-6-249_arcabit-5b1cefdc-9755-44a0-9806-521eea353142) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:56:30,436] INFO [GroupCoordinator 1]: Stabilized group vsarcabit generation 3109 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:56:30,451] INFO [GroupCoordinator 1]: Assignment received from leader for group vsarcabit for generation 3109 (kafka.coordinator.group.GroupCoordinator)
--------------------------------------------------------------------------------------------------------------------
服務端日志:
[2022-04-04 16:07:10,255] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in Stable state. Created a new member id sarama-a1086e50-96c5-4395-9cc5-c2f5a72b68e1 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:07:10,256] INFO [GroupCoordinator 2]: Preparing to rebalance group vskingsoft in state PreparingRebalance with old generation 7259 (__consumer_offsets-0) (reason: Adding new member sarama-a1086e50-96c5-4395-9cc5-c2f5a72b68e1 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:08:03,853] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in PreparingRebalance state. Created a new member id sarama-42fb1447-ff68-48b3-91d4-2d2a598e0755 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:08:04,114] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in PreparingRebalance state. Created a new member id sarama-cfdf21c8-ef99-4382-85be-d3518dd9b1ff for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:08:10,256] INFO [GroupCoordinator 2]: Group vskingsoft remove dynamic members who haven't joined: Set(sarama-b4c83a70-f6bb-430f-805f-176023f24d25, sarama-32d526c5-5766-49c1-9423-2bbec4a683a4, sarama-a0d6e6b6-4b8d-43e3-9be7-a51c03929f40, sarama-88995424-4fbe-4512-8c7f-1637998310f8, sarama-9d889287-5823-4e59-8b8d-ed016ef44180) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:08:10,256] INFO [GroupCoordinator 2]: Stabilized group vskingsoft generation 7260 (__consumer_offsets-0) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:08:10,260] INFO [GroupCoordinator 2]: Assignment received from leader for group vskingsoft for generation 7260 (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:09:19,166] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in Stable state. Created a new member id sarama-668f4500-b31b-49d7-b3a6-4194b9dbae89 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:09:19,166] INFO [GroupCoordinator 2]: Preparing to rebalance group vskingsoft in state PreparingRebalance with old generation 7260 (__consumer_offsets-0) (reason: Adding new member sarama-668f4500-b31b-49d7-b3a6-4194b9dbae89 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:09:28,431] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in PreparingRebalance state. Created a new member id sarama-92092c42-3cb3-4ccd-bae9-92ffc53348dd for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:09:34,467] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in PreparingRebalance state. Created a new member id sarama-7f49d3d4-1ad2-461b-b5ce-1a4cd12dca66 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:10:06,090] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in PreparingRebalance state. Created a new member id sarama-d7cf7efd-ba01-48b5-a6ff-ecee6ed161ba for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:10:19,165] INFO [GroupCoordinator 2]: Group vskingsoft remove dynamic members who haven't joined: Set(sarama-cfdf21c8-ef99-4382-85be-d3518dd9b1ff, sarama-a1086e50-96c5-4395-9cc5-c2f5a72b68e1, sarama-42fb1447-ff68-48b3-91d4-2d2a598e0755) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:10:19,165] INFO [GroupCoordinator 2]: Stabilized group vskingsoft generation 7261 (__consumer_offsets-0) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:10:19,182] INFO [GroupCoordinator 2]: Assignment received from leader for group vskingsoft for generation 7261 (kafka.coordinator.group.GroupCoordinator)
表現:
就是不斷重平衡,導致業務消費速度變慢。
客戶端日志沒找到有效消息。
以前遇到的情況,
kafka troubleshooting,The provided member is not known in the current generation,有時候日志里還會伴隨着 i/o timeout
可能原因1,代碼里解決了
case <-sess.Context().Done()
可能原因2,一個consumer group消費多個topic的問題,也解決了。
目前還沒有定位到,為什么有動態成員加入consumer group
可能原因3:
watermill庫的問題。這個庫已經不用了。
read tcp :49560->:9092: i/o timeout under sarama kafka golang panic
這里的也改了。config.Consumer.Group.Session.Timeout = time.Second * 120,也就是心跳超時時間,但我們的網絡超時時間很小,默認30秒,30秒我們的場景,文件掃描消費時間長,30秒是可能處理不完數據的。最終配置:
// https://github.com/Shopify/sarama/issues/1422 config.Net.ReadTimeout = config.Consumer.Group.Session.Timeout + 30*time.Second
從server端日志定位的結論:
目前定位的結論,我們使用的動態成員id,然后不斷有新成員申請加入group,然后觸發重平衡。而且這個新成員又經常加入不成功。
目前處置方式:
如果這個問題持續出現,就關注下是什么觸發的動態新成員加入。如果不出現了,就可以不關注了。
如果找不到是什么觸發的動態新成員加入,那就換靜態成員跑跑看.
---------------------------------
20220411,后續處置方式,重啟了kafka客戶端所在的程序后,問題不在出現了,說明卡bug了
-------------------------------
20220415 有一個找到問題了,是因為oom 然后進程被kill,然后重連kafka,所以日志里(reason: Adding new member sarama-a1086e50-96c5-4395-9cc5-c2f5a72b68e1 with group instance id None) (kafka.coordinator.group.GroupCoordinator,所以重平衡
----------------------------------
靜態消費組相關資料:
kafka 靜態消費組成員
kafka 靜態消費組成員
kafka的消費者組機制一直很受詬病,原因是他的設計看起來是比較美好的,但是在實際使用過程中,由於各種業務本身的消費邏輯漫長或者用戶的使用姿勢問題,導致自身的消費者組經常陷入無限的重平衡中,而由於消費者組的STW機制也會導致同組內的其他消費者出現消費停止的情況。這種現象在越大的工業集群中越容易出現,所以為了改進這種現象,kafka從2.3版本開始提供了靜態消費者組的機制。(雲上ckafka可以購買專業版2.4 也可以支持本特性)
為什么需要
kafka的消費者組機制,可以支持某個程序故障退出了,剩下的消費者可以快速擁有退出消費者的分區,並繼續消費。但是這里存在一些問題使得消費者組的實際表現並不怎么好,同時現代的程序架構下,並不需要kafka本身的消費組機制來達成故障恢復的能力。
- 消費者能力已經到頂了,如果再擁有退出消費者的分區,由於消費能力不夠,導致不斷觸發重平衡,於是整個消費者組都沒法繼續消費。
- 消費者雖然退出了,但是由於現代程序架構下大家普遍使用了supervisor機制或者是運行在k8s上的pod,消費者可能很快就會回來,但是這個時候重平衡已經觸發了,由於消費者回來,又會觸發一次重平衡,這種情況下每次退出恢復都會導致兩次重平衡的出現,這種不必要的重平衡在大型消費集群中出現是很難接受的。
- 快速的滾動升級,正常的程序迭代,由於每次發布都會導致服務的重啟,觸發整個消費者組的重平衡,這種情況在現代架構下看起來也是不必要的。
- kafka的消費者是不能超過分區數的,雖然在表面看來超過了分區數只是會有部分消費者無法擁有分區,但是從實際的生產環境來看,由於重平衡時多個消費者可能出現間歇性擁有某幾個分區,然后在消費能力不足,且消費邏輯比較漫長的情況下,又出現反復重平衡。
基本原理
靜態消費者組會盡量在 組成員發生一些變動的時候阻止消費者組狀態從 STABLE 變換為 PREPARE_REBALANCE。
為了達成這樣的目的,kafka在2.3版本修改了Group的多個API且更改了啟動了靜態消費者的客戶端退出邏輯
- 加入group.instance.id 參數,用於識別靜態消費者成員,一旦設定了這個參數消費者就會被認為是靜態消費者
- 靜態消費者退出的時候不再往服務端發生LeaveGroup請求,直到session超時,才會被剔除消費者組
- 加大了服務端的最大session超時,在服務端支持下,客戶端的最大session超時可以設定為30分鍾
靜態消費者情況下重平衡邏輯及注意事項
- 消費者組成員增加,會觸發重平衡
- session超時會觸發重平衡(這里session超時配置建議是基於能夠容忍不可用的時間來配置,盡量延長為重啟的程序和消費慢的程序留出時間)
- max.poll.interval.ms 始終大於 session.timeout.ms 如果session timeout為5min,那么poll.interval.ms也要大於5min
- 客戶端程序必須要自己確保group.instance.id的唯一性,重復的group.instance.id加入同一個消費者組會報錯
- 目前已知java官方客戶端(2.3以上)和Librdkafka(1.4.0以上) 支持本特性,sarama暫時不支持
附錄代碼
1.參數group.instance.id說明:
配置了此參數,說明該consumer是靜態成員。
靜態成員配以較大的session超時設置能夠避免因成員臨時不可用(比如重啟)而引發的Rebalance。
2.參數session.timeout.ms說明:
配合group.instance.id使用,如果該consumer超過了該時間,還沒有上線,那么將觸發rebalance。
如果不配置此參數,默認時間是6000ms。
實際生成中,程序或容器的重啟可能需要幾分鍾。因此可以設置大一點。
參考連接:
StackOverflow上也有這個問題,不過還沒有答案:
https://stackoverflow.com/questions/62878735/kafka-one-particular-consumers-group-is-rebalancing
https://cloud.tencent.com/developer/article/1786605
https://stackoverflow.com/questions/62878735/kafka-one-particular-consumers-group-is-rebalancing
https://blog.csdn.net/weixin_33970380/article/details/113316660 這是sarama庫加入新成員的源碼
https://olnrao.wordpress.com/2015/05/15/apache-kafka-case-of-mysterious-rebalances/
https://www.cnblogs.com/chanshuyi/p/kafka_rebalance_quick_guide.html
https://www.cnblogs.com/zstiancai/p/15708612.html
https://www.icode9.com/content-1-956448.html