kafka消費者組重平衡懸疑問題


首先kafka的消費者組機制一直很受詬病,就是很受詬病的。

過度設計,也不實用,永遠在做沒必要的重平衡。很多情況架構上和客戶端上都可以自己做。

 

其次,如果用了咋辦

什么時候會發生rebalance?

前面我們已經說到,rebalance 其實就是對 partition 進行重新分配。那么什么時候會發生 rebalance 呢?其實在以下三種情況下,會觸發 rebalance:

  • 訂閱 Topic 的分區數發生變化。
  • 訂閱的 Topic 個數發生變化。
  • 消費組內成員個數發生變化。例如有新的 consumer 實例加入該消費組或者離開組。
  • 「消費組內成員個數發生變化」的幾種情況:

    • 新成員加入
    • 組成員主動離開
    • 組成員崩潰

 --------------------------------------------------------------------

20220421,

一次重平衡復現實驗,重啟了一台機器上的各個consumer group成員。用了4分鍾最終完成重平衡。

 

第一次報警:

{"Level":"error","Time":"2022-04-21T20:38:02.9884226+08:00","LoggerName":"","Message":"Cleanup session MemberID :[vwin04_hunter-69c09b1d-a46b-40f8-8e55-e7c07580b46b] , GenerationID :[3980] , Claims :[map[vsdir:[12 13 14 15 16 17]]]","Caller":{"Defined":true,"PC":23615323,"File":"/groupconsumer.go","Line":67,"Function":"cloudscan/pubsub.(*GroupConsumer).Cleanup"},"Stack":"","Subject":"csscand告警 engine: [hunter] , version:[1.7.0]","IP":"","HostName":"vwin04"}

 

最后一次報警:

{"Level":"error","Time":"2022-04-21T20:42:08.2430919+08:00","LoggerName":"","Message":"Cleanup session MemberID :[vwin04_comodo-724c94cf-551f-4cd4-abb5-389c47f63a07] , GenerationID :[11674] , Claims :[map[vsdir:[16 17 18 19 20 21 22 23]]]","Caller":{"Defined":true,"PC":23615323,"File":"/groupconsumer.go","Line":67,"Function":"cloudscan/pubsub.(*GroupConsumer).Cleanup"},"Stack":"","Subject":"csscand告警 engine: [comodo] , version:[1.7.0]","IP":"","HostName":"vwin04"}

------------------------------------------------------------------------------

20220419

調了

config.Consumer.Group.Session.Timeout = time.Second * 120

加了這一行config.Consumer.Group.Rebalance.Timeout = config.Consumer.Group.Session.Timeout

config.Consumer.Group.Rebalance.Timeout   rebalance的timeout 設置為了2分鍾,並且和Session.Timeout相等

 

20220421

Group vsarcabit remove dynamic members who haven't joined: Set(10-52-6-136_arcabit-590f2304-7918-44f7-9c79-753d548bd41e) (kafka.coordinator.group.GroupCoordinator)

參考https://zhuanlan.zhihu.com/p/109574627

RPC

與 Rebalance 相關有 JoinGroup 和 SyncGroup 兩個接口,再加上 Heartbeat 接口。

JoinGroup 接口演進到了第六版:

JoinGroup Request (Version: 6) => group_id session_timeout_ms rebalance_timeout_ms member_id group_instance_id protocol_type [protocols] TAG_BUFFER 
  group_id => COMPACT_STRING
  session_timeout_ms => INT32
  rebalance_timeout_ms => INT32
  member_id => COMPACT_STRING
  group_instance_id => COMPACT_NULLABLE_STRING
  protocol_type => COMPACT_STRING
  protocols => name metadata TAG_BUFFER 
    name => COMPACT_STRING
    metadata => COMPACT_BYTES

JoinGroup Response (Version: 6) => throttle_time_ms error_code generation_id protocol_name leader member_id [members] TAG_BUFFER 
  throttle_time_ms => INT32
  error_code => INT16
  generation_id => INT32
  protocol_name => COMPACT_STRING
  leader => COMPACT_STRING
  member_id => COMPACT_STRING
  members => member_id group_instance_id metadata TAG_BUFFER 
    member_id => COMPACT_STRING
    group_instance_id => COMPACT_NULLABLE_STRING
    metadata => COMPACT_BYTES

當有任何新的 Consumer 發起 JoinGroup 后,Coordinator 會進入 PreparingBalance 狀態,遞增 generationID,隨后等待活躍的所有 Consumer 重新加入 Group,等待的期限為 rebalance_timeout_ms(默認值為 60s)。

里面有兩個 timeout 值得注意:

  • session_timeout_ms:表示 Coordinator 如果超過該超時值沒有收到心跳,則認為 session 過期;
  • rebalance_timeout_ms:表示 Coordinator 等待所有 Consumer 重新申請加入的最大時限;

假如我擴容 100 個 Consumer,要等多久生效?顯然不能是 60s。那么 Coordinator 怎么知道要等到什么時候呢?等待所有活躍 session 的 Consumer 都發送過 JoinGroup 便可以了。每個 Consumer 每 3s 與 Coordinator 發一次心跳,這也意味着一次 rebalance 大約需要 3s 左右的等待。如果減少心跳的時間間隔,Rebalance 的生效時間應能夠相應減少。

也就是說,所有客戶端的 JoinGroup 會阻塞直到所有活躍 Session 的 Consumer 皆執行了 JoinGroup 為止。隨后 ConsumerGroup 將進入 CompletingBalance 狀態。

 

haven't joined,就是join的階段沒成功,那為什么沒成功?

等待所有活躍 session 的 Consumer 都發送過 JoinGroup 便可以了。每個 Consumer 每 3s 與 Coordinator 發一次心跳,這也意味着一次 rebalance 大約需要 3s 左右的等待。如果減少心跳的時間間隔,Rebalance 的生效時間應能夠相應減少。

 

 

我們的配置,增大了config.Consumer.Group.Heartbeat.Interval = time.Second * 20,

由3秒變成了,20秒,那么相當於

原來默認

rebalance_timeout_ms = 60
3秒一次,可以進行20次
現在20秒一次,只能進行3次,極大減少了。調整
rebalance_timeout_ms 為120秒后,也只是變成了6次。
所以應該減少Heartbeat.Interval=5秒,或者就為3秒

像raft協議一樣,心跳是待着邏輯信息在里面的,所以心跳不能間隔太大


-------------------------------------------------------------------------------------------------------------

20220418:

generation 3258  是可用的。

一個正常的kafka server端 rablance日志:

[2022-04-18 12:35:34,586] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in Stable state. Created a new member id _arcabit-a988337d-c2e7-436f-a556-7e7f34576a83 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:35:34,586] INFO [GroupCoordinator 1]: Preparing to rebalance group vsarcabit in state PreparingRebalance with old generation 3257 (__consumer_offsets-11) (reason: Adding new member _arcabit-a988337d-c2e7-436f-a556-7e7f34576a83 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:35:35,047] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in PreparingRebalance state. Created a new member id _arcabit-dce0a743-6b08-4ab7-879c-7518c29f15c8 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:35:43,245] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in PreparingRebalance state. Created a new member id _arcabit-ae61f5ce-c88f-44e7-9475-95e150c39741 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:35:48,499] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in PreparingRebalance state. Created a new member id_arcabit-70411222-99d0-453c-aa53-e7c5262569e4 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:36:14,537] INFO [GroupCoordinator 1]: Stabilized group vsarcabit generation 3258 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2022-04-18 12:36:14,540] INFO [GroupCoordinator 1]: Assignment received from leader for group vsarcabit for generation 3258 (kafka.coordinator.group.GroupCoordinator)

 

一個正常的client端日志,

Apr 18 12:36:14  csscand[24350]: {"level":"info","time":"2022-04-18T12:36:14.578+0800","caller":"pubsub/groupconsumer.go:57","msg":"Setup session MemberID :[_arcabit-1b56a50a-ec96-4a48-986b-d90afc70cb0d] , GenerationID :[3258] , Claims :[map[vsdir:[6 7 8]]]",,"version":"v1.6.4","author":"donghongchen","buildDate":"2022-04-18T10:51:05","engine":"arcabit","app":"csscand"}

-----------------------------------------------------------------------------------

所以,有 Group vsarcabit remove dynamic members who haven't joined,是異常的。

 

20220418,一個完整的kafka 某個消費者組 rebalance日志:

[2022-04-17 02:35:12,783] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in Stable state. Created a new member id 10-52-6-136_arcabit-590f2304-7918-44f7-9c79-753d548bd41e for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 02:35:12,783] INFO [GroupCoordinator 1]: Preparing to rebalance group vsarcabit in state PreparingRebalance with old generation 3105 (__consumer_offsets-11) (reason: Adding new member 10-52-6-136_arcabit-590f2304-7918-44f7-9c79-753d548bd41e with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 02:36:12,783] INFO [GroupCoordinator 1]: Group vsarcabit remove dynamic members who haven't joined: Set(10-52-6-28_arcabit-64bcc707-3180-46ac-ada5-06a1d117b268) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 02:36:12,783] INFO [GroupCoordinator 1]: Stabilized group vsarcabit generation 3106 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 02:36:12,789] INFO [GroupCoordinator 1]: Assignment received from leader for group vsarcabit for generation 3106 (kafka.coordinator.group.GroupCoordinator)

 

這里雖然時間有空隙,但是一個comsumer group的連續日志:

[2022-04-17 03:17:16,370] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in Stable state. Created a new member id 10-52-6-224_arcabit-ade8d685-c136-4edc-bbef-9ed297b9a8a0 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:17:16,370] INFO [GroupCoordinator 1]: Preparing to rebalance group vsarcabit in state PreparingRebalance with old generation 3106 (__consumer_offsets-11) (reason: Adding new member 10-52-6-224_arcabit-ade8d685-c136-4edc-bbef-9ed297b9a8a0 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:17:20,200] INFO [ProducerStateManager partition=tb_asset-9] Writing producer snapshot at offset 380969561 (kafka.log.ProducerStateManager)
[2022-04-17 03:17:20,201] INFO [Log partition=tb_asset-9, dir=/data/kafka-data] Rolled new log segment at offset 380969561 in 1 ms. (kafka.log.Log)
[2022-04-17 03:17:58,606] INFO [GroupCoordinator 1]: Group vstws remove dynamic members who haven't joined: Set(vwin01_tws-ad4fae20-babf-4897-90c3-7ccbb46b04af) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:17:58,606] INFO [GroupCoordinator 1]: Stabilized group vstws generation 10806 (__consumer_offsets-23) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:17:58,609] INFO [GroupCoordinator 1]: Assignment received from leader for group vstws for generation 10806 (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:18:16,371] INFO [GroupCoordinator 1]: Group vsarcabit remove dynamic members who haven't joined: Set(10-52-6-136_arcabit-590f2304-7918-44f7-9c79-753d548bd41e) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:18:16,371] INFO [GroupCoordinator 1]: Stabilized group vsarcabit generation 3107 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:18:16,375] INFO [GroupCoordinator 1]: Assignment received from leader for group vsarcabit for generation 3107 (kafka.coordinator.group.GroupCoordinator)

[2022-04-17 03:24:20,903] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in Stable state. Created a new member id 10-52-6-249_arcabit-5b1cefdc-9755-44a0-9806-521eea353142 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:24:20,903] INFO [GroupCoordinator 1]: Preparing to rebalance group vsarcabit in state PreparingRebalance with old generation 3107 (__consumer_offsets-11) (reason: Adding new member 10-52-6-249_arcabit-5b1cefdc-9755-44a0-9806-521eea353142 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:25:20,902] INFO [GroupCoordinator 1]: Group vsarcabit remove dynamic members who haven't joined: Set(10-52-6-224_arcabit-ade8d685-c136-4edc-bbef-9ed297b9a8a0) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:25:20,902] INFO [GroupCoordinator 1]: Stabilized group vsarcabit generation 3108 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:25:20,903] INFO [GroupCoordinator 1]: Assignment received from leader for group vsarcabit for generation 3108 (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:55:30,435] INFO [GroupCoordinator 1]: Dynamic Member with unknown member id joins group vsarcabit in Stable state. Created a new member id 10-52-6-131_arcabit-02916ed4-6ada-416c-8b41-a78829ee0488 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:55:30,436] INFO [GroupCoordinator 1]: Preparing to rebalance group vsarcabit in state PreparingRebalance with old generation 3108 (__consumer_offsets-11) (reason: Adding new member 10-52-6-131_arcabit-02916ed4-6ada-416c-8b41-a78829ee0488 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:56:30,436] INFO [GroupCoordinator 1]: Group vsarcabit remove dynamic members who haven't joined: Set(10-52-6-249_arcabit-5b1cefdc-9755-44a0-9806-521eea353142) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:56:30,436] INFO [GroupCoordinator 1]: Stabilized group vsarcabit generation 3109 (__consumer_offsets-11) (kafka.coordinator.group.GroupCoordinator)
[2022-04-17 03:56:30,451] INFO [GroupCoordinator 1]: Assignment received from leader for group vsarcabit for generation 3109 (kafka.coordinator.group.GroupCoordinator)

 

-------------------------------------------------------------------------------------------------------------------- 

 

服務端日志:

[2022-04-04 16:07:10,255] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in Stable state. Created a new member id sarama-a1086e50-96c5-4395-9cc5-c2f5a72b68e1 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:07:10,256] INFO [GroupCoordinator 2]: Preparing to rebalance group vskingsoft in state PreparingRebalance with old generation 7259 (__consumer_offsets-0) (reason: Adding new member sarama-a1086e50-96c5-4395-9cc5-c2f5a72b68e1 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:08:03,853] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in PreparingRebalance state. Created a new member id sarama-42fb1447-ff68-48b3-91d4-2d2a598e0755 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:08:04,114] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in PreparingRebalance state. Created a new member id sarama-cfdf21c8-ef99-4382-85be-d3518dd9b1ff for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:08:10,256] INFO [GroupCoordinator 2]: Group vskingsoft remove dynamic members who haven't joined: Set(sarama-b4c83a70-f6bb-430f-805f-176023f24d25, sarama-32d526c5-5766-49c1-9423-2bbec4a683a4, sarama-a0d6e6b6-4b8d-43e3-9be7-a51c03929f40, sarama-88995424-4fbe-4512-8c7f-1637998310f8, sarama-9d889287-5823-4e59-8b8d-ed016ef44180) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:08:10,256] INFO [GroupCoordinator 2]: Stabilized group vskingsoft generation 7260 (__consumer_offsets-0) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:08:10,260] INFO [GroupCoordinator 2]: Assignment received from leader for group vskingsoft for generation 7260 (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:09:19,166] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in Stable state. Created a new member id sarama-668f4500-b31b-49d7-b3a6-4194b9dbae89 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:09:19,166] INFO [GroupCoordinator 2]: Preparing to rebalance group vskingsoft in state PreparingRebalance with old generation 7260 (__consumer_offsets-0) (reason: Adding new member sarama-668f4500-b31b-49d7-b3a6-4194b9dbae89 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:09:28,431] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in PreparingRebalance state. Created a new member id sarama-92092c42-3cb3-4ccd-bae9-92ffc53348dd for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:09:34,467] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in PreparingRebalance state. Created a new member id sarama-7f49d3d4-1ad2-461b-b5ce-1a4cd12dca66 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:10:06,090] INFO [GroupCoordinator 2]: Dynamic Member with unknown member id joins group vskingsoft in PreparingRebalance state. Created a new member id sarama-d7cf7efd-ba01-48b5-a6ff-ecee6ed161ba for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:10:19,165] INFO [GroupCoordinator 2]: Group vskingsoft remove dynamic members who haven't joined: Set(sarama-cfdf21c8-ef99-4382-85be-d3518dd9b1ff, sarama-a1086e50-96c5-4395-9cc5-c2f5a72b68e1, sarama-42fb1447-ff68-48b3-91d4-2d2a598e0755) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:10:19,165] INFO [GroupCoordinator 2]: Stabilized group vskingsoft generation 7261 (__consumer_offsets-0) (kafka.coordinator.group.GroupCoordinator)
[2022-04-04 16:10:19,182] INFO [GroupCoordinator 2]: Assignment received from leader for group vskingsoft for generation 7261 (kafka.coordinator.group.GroupCoordinator)

 

表現:

就是不斷重平衡,導致業務消費速度變慢。

 

客戶端日志沒找到有效消息。

 

以前遇到的情況,

kafka troubleshooting,The provided member is not known in the current generation,有時候日志里還會伴隨着 i/o timeout

可能原因1,代碼里解決了

case <-sess.Context().Done()

可能原因2,一個consumer group消費多個topic的問題,也解決了。

目前還沒有定位到,為什么有動態成員加入consumer group

可能原因3:

watermill庫的問題。這個庫已經不用了。

 

read tcp :49560->:9092: i/o timeout under sarama kafka golang panic

這里的也改了。config.Consumer.Group.Session.Timeout time.Second 120,也就是心跳超時時間,但我們的網絡超時時間很小,默認30秒,30秒我們的場景,文件掃描消費時間長,30秒是可能處理不完數據的。最終配置:

// https://github.com/Shopify/sarama/issues/1422  config.Net.ReadTimeout = config.Consumer.Group.Session.Timeout + 30*time.Second



從server端日志定位的結論:
目前定位的結論,我們使用的動態成員id,然后不斷有新成員申請加入group,然后觸發重平衡。而且這個新成員又經常加入不成功。


目前處置方式:
如果這個問題持續出現,就關注下是什么觸發的動態新成員加入。如果不出現了,就可以不關注了。
如果找不到是什么觸發的動態新成員加入,那就換靜態成員跑跑看.
---------------------------------
20220411,后續處置方式,重啟了kafka客戶端所在的程序后,問題不在出現了,說明卡bug了

-------------------------------
20220415 有一個找到問題了,是因為oom 然后進程被kill,然后重連kafka,所以日志里(reason: Adding new member sarama-a1086e50-96c5-4395-9cc5-c2f5a72b68e1 with group instance id None) (kafka.coordinator.group.GroupCoordinator,所以重平衡
----------------------------------
靜態消費組相關資料:

kafka 靜態消費組成員

kafka 靜態消費組成員

kafka的消費者組機制一直很受詬病,原因是他的設計看起來是比較美好的,但是在實際使用過程中,由於各種業務本身的消費邏輯漫長或者用戶的使用姿勢問題,導致自身的消費者組經常陷入無限的重平衡中,而由於消費者組的STW機制也會導致同組內的其他消費者出現消費停止的情況。這種現象在越大的工業集群中越容易出現,所以為了改進這種現象,kafka從2.3版本開始提供了靜態消費者組的機制。(雲上ckafka可以購買專業版2.4 也可以支持本特性)

為什么需要

kafka的消費者組機制,可以支持某個程序故障退出了,剩下的消費者可以快速擁有退出消費者的分區,並繼續消費。但是這里存在一些問題使得消費者組的實際表現並不怎么好,同時現代的程序架構下,並不需要kafka本身的消費組機制來達成故障恢復的能力。

  1. 消費者能力已經到頂了,如果再擁有退出消費者的分區,由於消費能力不夠,導致不斷觸發重平衡,於是整個消費者組都沒法繼續消費。
  2. 消費者雖然退出了,但是由於現代程序架構下大家普遍使用了supervisor機制或者是運行在k8s上的pod,消費者可能很快就會回來,但是這個時候重平衡已經觸發了,由於消費者回來,又會觸發一次重平衡,這種情況下每次退出恢復都會導致兩次重平衡的出現,這種不必要的重平衡在大型消費集群中出現是很難接受的。
  3. 快速的滾動升級,正常的程序迭代,由於每次發布都會導致服務的重啟,觸發整個消費者組的重平衡,這種情況在現代架構下看起來也是不必要的。
  4. kafka的消費者是不能超過分區數的,雖然在表面看來超過了分區數只是會有部分消費者無法擁有分區,但是從實際的生產環境來看,由於重平衡時多個消費者可能出現間歇性擁有某幾個分區,然后在消費能力不足,且消費邏輯比較漫長的情況下,又出現反復重平衡。

基本原理

靜態消費者組會盡量在 組成員發生一些變動的時候阻止消費者組狀態從 STABLE 變換為 PREPARE_REBALANCE。

為了達成這樣的目的,kafka在2.3版本修改了Group的多個API且更改了啟動了靜態消費者的客戶端退出邏輯

  1. 加入group.instance.id 參數,用於識別靜態消費者成員,一旦設定了這個參數消費者就會被認為是靜態消費者
  2. 靜態消費者退出的時候不再往服務端發生LeaveGroup請求,直到session超時,才會被剔除消費者組
  3. 加大了服務端的最大session超時,在服務端支持下,客戶端的最大session超時可以設定為30分鍾

靜態消費者情況下重平衡邏輯及注意事項

  1. 消費者組成員增加,會觸發重平衡
  2. session超時會觸發重平衡(這里session超時配置建議是基於能夠容忍不可用的時間來配置,盡量延長為重啟的程序和消費慢的程序留出時間)
  3. max.poll.interval.ms 始終大於 session.timeout.ms 如果session timeout為5min,那么poll.interval.ms也要大於5min
  4. 客戶端程序必須要自己確保group.instance.id的唯一性,重復的group.instance.id加入同一個消費者組會報錯
  5. 目前已知java官方客戶端(2.3以上)和Librdkafka(1.4.0以上) 支持本特性,sarama暫時不支持

附錄代碼

#include <iostream> #include <librdkafka/rdkafkacpp.h> int main() { std::string err; std::vector<std::string> topics; auto conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); conf->set("bootstrap.servers", "vip:port", err); conf->set("enable.partition.eof", "false", err); conf->set("group.id", "markstatic", err); conf->set("group.instance.id", "consumer-450", err); // instance.id必須唯一 conf->set("session.timeout.ms", "600000", err); // sesion timeout 為能夠忍受的分區不可用最長時間 conf->set("max.poll.interval.ms", "600500", err); // poll.interval.ms需要大於 sesion timeout auto consumer = RdKafka::KafkaConsumer::create(conf, err); if (!consumer) { std::cerr << "Failed to create consumer: " << err << std::endl; exit(1); } topics.push_back("mytest1"); auto suberr = consumer->subscribe(topics); if (suberr) { std::cerr << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(suberr) << std::endl; exit(1); } while (true) { auto msg = consumer->consume(1000); std::cout << " Message in " << msg->topic_name() << " [" << msg->partition() << "] at offset " << msg->offset() << std::endl; delete msg; } }



1.參數group.instance.id說明:

配置了此參數,說明該consumer是靜態成員。

靜態成員配以較大的session超時設置能夠避免因成員臨時不可用(比如重啟)而引發的Rebalance。

2.參數session.timeout.ms說明:

配合group.instance.id使用,如果該consumer超過了該時間,還沒有上線,那么將觸發rebalance。

如果不配置此參數,默認時間是6000ms。

實際生成中,程序或容器的重啟可能需要幾分鍾。因此可以設置大一點。





參考連接:
StackOverflow上也有這個問題,不過還沒有答案:
https://stackoverflow.com/questions/62878735/kafka-one-particular-consumers-group-is-rebalancing


https://cloud.tencent.com/developer/article/1786605
https://stackoverflow.com/questions/62878735/kafka-one-particular-consumers-group-is-rebalancing
https://blog.csdn.net/weixin_33970380/article/details/113316660  這是sarama庫加入新成員的源碼
https://olnrao.wordpress.com/2015/05/15/apache-kafka-case-of-mysterious-rebalances/
https://www.cnblogs.com/chanshuyi/p/kafka_rebalance_quick_guide.html
https://www.cnblogs.com/zstiancai/p/15708612.html
https://www.icode9.com/content-1-956448.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM