Kafka 2.3發布后官網的Consumer參數中增加了一個新的參數:group.instance.id。下面是這個參數的解釋:
A unique identifier of the consumer instance provided by end user. Only non-empty strings are permitted. If set, the consumer is treated as a static member, which means that only one instance with this ID is allowed in the consumer group at any time. This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability (e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.
大致意思是:它是用戶指定的一個consumer成員ID。每個消費者組下這些ID必須是唯一的。一旦設置了該ID,該消費者就會被視為是一個靜態成員(Static Member)。靜態成員配以較大的session超時設置能夠避免因成員臨時不可用(比如重啟)而引發的Rebalance。由此可見,消費者組靜態成員是2.3版本新引入的一個概念,主要是為了避免不必要的Rebalance。
Rebalance Recap
之前我們在Kafka消費者組一文中討論過Rebalance機制。它的主要作用是為消費者組下所有成員分配分區。Client端和Broker端需要同時參與到Rebalance過程。在Broker端,Coordinator組件負責處理成員管理,比如處理組成員發送的JoinGroup請求、SyncGroup請求、Heartbeat請求和LeaveGroup請求;在Client端,Leader Consumer成員接收Coordinator發送的成員訂閱信息,然后根據一定的策略(Range/Round-Robin/Sticky/自定義)制定分配方案。
Rebalance發生的條件有三個:
- 成員數量發生變化,即有新成員加入或現有成員離組(包括主動離組和崩潰被動離組)
- 訂閱主題數量發生變化
- 訂閱主題分區數量發生變化
其實,后兩個條件可以合並成一個,即Rebalance觸發條件只有兩個:1. 成員數量發生變化;2. 訂閱信息發生變化。
Rebalance的流程在那篇文章中也談到了:首先,各個成員發送JoinGroup請求入組,Coordinator會等待一段時間等它們加入——這段時間由所有成員中max.poll.interval.ms的最大值來決定(在Kafka Connect中則是有專屬的參數rebalance.timeout.ms來指定)。之后各成員發送SyncGroup請求等待Coordinator發送分配方案,然后開始正常消費。在消費的同時,各個consumer還會定期(heartbeat.interval.ms)上報心跳,告訴Coordinator組件它還活着。
Issues for Rebalance
在實際場景中,因為成員離組而發生的Rebalance應該算是最多的,但有些場景下的Rebalance是非常不合理的。比如我們公司就有這樣的痛點:Consumer的處理邏輯發生變更,必須要更新代碼重新上線,此時就要引發Rebalance,但其實重啟Consumer也許只需要幾分鍾而已,也就是說我的消費只要中斷幾分鍾就可以了,Kafka完全沒必要為這個就觸發一輪Rebalance,更沒有必要重新分配分區,維持之前的分配方案足矣。雖然社區提供的Sticky分配方案在一定程度上能夠緩解此問題,但Rebalance的Stop The World(STW)的特性還是決定了生產環境中Rebalance越少越好。
Static Member
在目前的Rebalance設計中,消費者組下的每個實例都會被Coordinator分配一個成員ID,即member.id。很多Kafka用戶都有過這樣的疑問:我能手動設置這個member.id嗎?很遺憾,這個memberID是Kafka自動生成的,在靜態成員被引入前,規則是client.id-UUID,這里的client.id就是Consumer端參數client.id的值,而且這個ID會隨着每輪Rebalance發生變化的。換句話說,Coordinator無法持久化地保存某個consumer實例的member.id。我想這可能是制約Rebalance時所有成員必須強制重新加入的部分原因,因為Coordinator無法記住每個成員都是誰。如果你看源代碼,可以發現在每次Client重啟回來發送JoinGroup時,它會封裝一個UNKNOWN_MEMBER_ID的空串,沒有任何有意義的信息給到Broker端。Coordinator接收到后只能把它當做是一個全新的成員。相反地,如果member.id能夠被記住,那么Coordinator就可以容忍它短暫的離線而不開啟Rebalance,從而縮短消費者組整體不可用的時間窗口。
為此,社區於2.3和2.4版本引入了靜態成員(Static Member)的概念以及一個新的Consumer端參數:group.instance.id。一旦配置了該參數,成員將自動成為靜態成員,否則的話和以前一樣依然被視為是動態成員。你可以認為這個新參數是一個要被持久化的新member.id。它依然不能由用戶指定,構建規則是`group.instsance.id`-UUID。和member.id不同的是,每次成員重啟回來后,其靜態成員ID值是不變的,因此之前分配給該成員的所有分區也是不變的,而且在沒有超時前靜態成員重啟回來是不會觸發Rebalance的。
靜態成員Rebalance條件
顯然,靜態成員觸發Rebalance的難度要小於動態成員。如果使用了靜態成員,現在觸發Rebalance的條件變更為:
- 新成員加入組:這個條件依然不變。當有新成員加入時肯定會觸發Rebalance重新分配分區
- Leader成員重新加入組:比如主題分配方案發生變更
- 現有成員離組時間超過了session超時時間:即使它是靜態成員,Coordinator也不會無限期地等待它。一旦超過了session超時時間依然會觸發Rebalance
- Coordinator接收到LeaveGroup請求:成員主動通知Coordinator永久離組。畢竟Kafka還是要提供方法讓一個成員能夠永遠地退出組,此時重啟Rebalance還是必要的
請求協議變更
為了支持group.instance.id,與消費者組相關的協議格式也要做對應的變化。我看了下官網,JoinGroup、SyncGroup、LeaveGroup和OffsetCommit請求的協議格式都做了相應的變更。比如JoinGroup請求的Request和Response格式都增加了group-instance-id字段,如下所示:
JoinGroup Request (Version: 5) => group_id session_timeout_ms rebalance_timeout_ms member_id group_instance_id protocol_type [protocols]
group_id => STRING
session_timeout_ms => INT32
rebalance_timeout_ms => INT32
member_id => STRING
group_instance_id => NULLABLE_STRING
protocol_type => STRING
protocols => name metadata
name => STRING
metadata => BYTESJoinGroup Response (Version: 5) => throttle_time_ms error_code generation_id protocol_name leader member_id [members]
throttle_time_ms => INT32
error_code => INT16
generation_id => INT32
protocol_name => STRING
leader => STRING
member_id => STRING
members => member_id group_instance_id metadata
member_id => STRING
group_instance_id => NULLABLE_STRING
metadata => BYTES
其他請求格式的變更也是類似的,這里就不貼了。
其他變更
鑒於目前靜態成員短暫重啟或不可用不會觸發Rebalance的改動,社區對消費者組最大session過期時間也做了修改。之前Consumer端參數group.min.session.timeout.ms值是6秒——要想在這個時間內重啟完一個應用通常都是很困難的,因此社區現在將該值默認值改為30分鍾。這就是說,只要配置有靜態成員的Consumer程序代碼更新及重啟在30分鍾之內完成,Consumer Group就不會發生Rebalance。當然在這段時間內,該Consumer的消費進度會中斷,但是分區分配方案不會發生變化。
總結
目前靜態成員的部分功能已經集成進Kafka 2.3版本,還有一部分功能正在開發中,未來會進到2.4版本中。從目前的設計來看,靜態成員機制能夠幫助我們規避很多線上環境中本不必要的Rebalance,應該說是個很令人期待的新特性。同時,社區針對Rebalance的Stop The World醞釀一次大的修正,即所謂的增量協同式Rebalance(Incremental Cooperative Rebalance)。大致思想是允許單個consumer實例自行采用增量或漸進式的方式進行Rebalance,避免全局的STW。相關的代碼正在開發中,后續我也會帶來這方面的功能介紹。