Kafka Rebalance機制分析


什么是 Rebalance

Rebalance 本質上是一種協議,規定了一個 Consumer Group 下的所有 consumer 如何達成一致,來分配訂閱 Topic 的每個分區。

例如:某 Group 下有 20 個 consumer 實例,它訂閱了一個具有 100 個 partition 的 Topic 。正常情況下,kafka 會為每個 Consumer 平均的分配 5 個分區。這個分配的過程就是 Rebalance。

觸發 Rebalance 的時機

Rebalance 的觸發條件有3個。

  • 組成員個數發生變化。例如有新的 consumer 實例加入該消費組或者離開組。
  • 訂閱的 Topic 個數發生變化。
  • 訂閱 Topic 的分區數發生變化。

Rebalance 發生時,Group 下所有 consumer 實例都會協調在一起共同參與,kafka 能夠保證盡量達到最公平的分配。但是 Rebalance 過程對 consumer group 會造成比較嚴重的影響。在 Rebalance 的過程中 consumer group 下的所有消費者實例都會停止工作,等待 Rebalance 過程完成。

Rebalance 過程分析

Rebalance 過程分為兩步:Join 和 Sync。

  1. Join 顧名思義就是加入組。這一步中,所有成員都向coordinator發送JoinGroup請求,請求加入消費組。一旦所有成員都發送了JoinGroup請求,coordinator會從中選擇一個consumer擔任leader的角色,並把組成員信息以及訂閱信息發給leader——注意leader和coordinator不是一個概念。leader負責消費分配方案的制定。

img

  1. Sync,這一步leader開始分配消費方案,即哪個consumer負責消費哪些topic的哪些partition。一旦完成分配,leader會將這個方案封裝進SyncGroup請求中發給coordinator,非leader也會發SyncGroup請求,只是內容為空。coordinator接收到分配方案之后會把方案塞進SyncGroup的response中發給各個consumer。這樣組內的所有成員就都知道自己應該消費哪些分區了。

img

Rebalance 場景分析

新成員加入組

img

組成員“崩潰”

組成員崩潰和組成員主動離開是兩個不同的場景。因為在崩潰時成員並不會主動地告知coordinator此事,coordinator有可能需要一個完整的session.timeout周期(心跳周期)才能檢測到這種崩潰,這必然會造成consumer的滯后。可以說離開組是主動地發起rebalance;而崩潰則是被動地發起rebalance。

img

組成員主動離開組

img

提交位移

img

如何避免不必要的rebalance

要避免 Rebalance,還是要從 Rebalance 發生的時機入手。我們在前面說過,Rebalance 發生的時機有三個:

  • 組成員數量發生變化
  • 訂閱主題數量發生變化
  • 訂閱主題的分區數發生變化

后兩個我們大可以人為的避免,發生rebalance最常見的原因是消費組成員的變化。

消費者成員正常的添加和停掉導致rebalance,這種情況無法避免,但是時在某些情況下,Consumer 實例會被 Coordinator 錯誤地認為 “已停止” 從而被“踢出”Group。從而導致rebalance。

當 Consumer Group 完成 Rebalance 之后,每個 Consumer 實例都會定期地向 Coordinator 發送心跳請求,表明它還存活着。如果某個 Consumer 實例不能及時地發送這些心跳請求,Coordinator 就會認為該 Consumer 已經 “死” 了,從而將其從 Group 中移除,然后開啟新一輪 Rebalance。這個時間可以通過Consumer 端的參數 session.timeout.ms進行配置。默認值是 10 秒。

除了這個參數,Consumer 還提供了一個控制發送心跳請求頻率的參數,就是 heartbeat.interval.ms。這個值設置得越小,Consumer 實例發送心跳請求的頻率就越高。頻繁地發送心跳請求會額外消耗帶寬資源,但好處是能夠更加快速地知曉當前是否開啟 Rebalance,因為,目前 Coordinator 通知各個 Consumer 實例開啟 Rebalance 的方法,就是將 REBALANCE_NEEDED 標志封裝進心跳請求的響應體中。

除了以上兩個參數,Consumer 端還有一個參數,用於控制 Consumer 實際消費能力對 Rebalance 的影響,即 max.poll.interval.ms 參數。它限定了 Consumer 端應用程序兩次調用 poll 方法的最大時間間隔。它的默認值是 5 分鍾,表示你的 Consumer 程序如果在 5 分鍾之內無法消費完 poll 方法返回的消息,那么 Consumer 會主動發起 “離開組” 的請求,Coordinator 也會開啟新一輪 Rebalance。

通過上面的分析,我們可以看一下那些rebalance是可以避免的:

第一類非必要 Rebalance 是因為未能及時發送心跳,導致 Consumer 被 “踢出”Group 而引發的。這種情況下我們可以設置 session.timeout.ms 和 heartbeat.interval.ms 的值,來盡量避免rebalance的出現。(以下的配置是在網上找到的最佳實踐,暫時還沒測試過

  • 設置 session.timeout.ms = 6s。
  • 設置 heartbeat.interval.ms = 2s。
  • 要保證 Consumer 實例在被判定為 “dead” 之前,能夠發送至少 3 輪的心跳請求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

將 session.timeout.ms 設置成 6s 主要是為了讓 Coordinator 能夠更快地定位已經掛掉的 Consumer,早日把它們踢出 Group。

第二類非必要 Rebalance 是 Consumer 消費時間過長導致的。此時,max.poll.interval.ms 參數值的設置顯得尤為關鍵。如果要避免非預期的 Rebalance,你最好將該參數值設置得大一點,比你的下游最大處理時間稍長一點。

總之,要為業務處理邏輯留下充足的時間。這樣,Consumer 就不會因為處理這些消息的時間太長而引發 Rebalance 。

相關概念

coordinator

Group Coordinator是一個服務,每個Broker在啟動的時候都會啟動一個該服務。Group Coordinator的作用是用來存儲Group的相關Meta信息,並將對應Partition的Offset信息記錄到Kafka內置Topic(__consumer_offsets)中。Kafka在0.9之前是基於Zookeeper來存儲Partition的Offset信息(consumers/{group}/offsets/{topic}/{partition}),因為ZK並不適用於頻繁的寫操作,所以在0.9之后通過內置Topic的方式來記錄對應Partition的Offset。

每個Group都會選擇一個Coordinator來完成自己組內各Partition的Offset信息,選擇的規則如下:

  • 1,計算Group對應在__consumer_offsets上的Partition
  • 2,根據對應的Partition尋找該Partition的leader所對應的Broker,該Broker上的Group Coordinator即就是該Group的Coordinator

Partition計算規則:

partition-Id(__consumer_offsets) = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)

其中groupMetadataTopicPartitionCount對應offsets.topic.num.partitions參數值,默認值是50個分區

一次Rebalance所耗時間

測試環境

1個Topic,10個partition,3個consumer

在本地環境進行測試

測試結果

經過幾輪測試發現每次rebalance所消耗的時間大概在 80ms~100ms平均耗時在87ms左右。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM