Kafka的rebalance


Rebalance 本質上是一種協議,規定了一個 Consumer Group 下的所有 Consumer 如何達成一致,來分配訂閱 Topic 的每個分區。

在 Rebalance 過程中,所有 Consumer 實例都會停止消費,等待 Rebalance 完成。

Rebalance 的弊端:

  1.Rebalance 影響 Consumer 端 TPS。(因為rebalance過程中,kafka會停止消費)

  2.Rebalance 要完成需要比較久的時間。

      3.Rebalance 效率不高,每次都要全部consumer參加。

Rebalance 的觸發條件有 3 個。

  1. 組成員數發生變更。比如有新的 Consumer 實例加入組或者離開組,抑或是有 Consumer 實例崩潰被“踢出”組。99% 都是這個原因
  2. 訂閱主題數發生變更。Consumer Group 可以使用正則表達式的方式訂閱主題,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明該 Group 訂閱所有以字母 t 開頭、字母 c 結尾的主題。在 Consumer Group 的運行過程中,你新創建了一個滿足這樣條件的主題,那么該 Group 就會發生 Rebalance。
  3. 訂閱主題的分區數發生變更。Kafka 當前只能允許增加一個主題的分區數。當分區數增加時,就會觸發訂閱該主題的所有 Group 開啟 Rebalance。

如何避免這99%的情況發生的rebalance?

可以從consumer組員變化的原因分析起:

  第一類非必要 Rebalance 是因為未能及時發送心跳,導致 Consumer 被“踢出”Group 而引發的。

      當 Consumer Group 完成 Rebalance 之后,每個 Consumer 實例都會定期地向 Coordinator 發送心跳請求,表明它還存活着。如果某個 Consumer 實例不能及時地發送這些心跳請求,Coordinator 就會認為該 Consumer 已經“死”了,從而將其從 Group 中移除,然后開啟新一輪 Rebalance。

      這個發送心跳的間隔在Consumer 端有個參數,叫 session.timeout.ms,默認值是 10 秒,即如果 Coordinator 在 10 秒之內沒有收到 Group 下某 Consumer 實例的心跳,它就會認為這個 Consumer 實例已經掛了。session.timout.ms 決定了 Consumer 存活性的時間間隔。

      除了這個參數,Consumer 還提供了一個允許你控制發送心跳請求頻率的參數,就是 heartbeat.interval.ms。這個值設置得越小,Consumer 實例發送心跳請求的頻率就越高。頻繁地發送心跳請求會額外消耗帶寬資源,但好處是能夠更加快速地知曉當前是否開啟 Rebalance,因為,目前 Coordinator 通知各個 Consumer 實例開啟 Rebalance 的方法,就是將 REBALANCE_NEEDED 標志封裝進心跳請求的響應體中。

  Consumer 端還有一個參數,用於控制 Consumer 實際消費能力對 Rebalance 的影響,即 max.poll.interval.ms 參數,默認5min,Consumer 端應用程序兩次調用 poll 方法的最大時間間隔,表示你的 Consumer 程序如果在 5 分鍾之內無法消費完 poll 方法返回的消息,那么 Consumer 會主動發起“離開組”的請求,Coordinator 也會開啟新一輪 Rebalance。

     所以可以修改為以下經驗推薦值:

  • 設置 session.timeout.ms = 6s。
  • 設置 heartbeat.interval.ms = 2s。
  • 要保證 Consumer 實例在被判定為“dead”之前,能夠發送至少 3 輪的心跳請求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

  將 session.timeout.ms 設置成 6s 主要是為了讓 Coordinator 能夠更快地定位已經掛掉的 Consumer。

 

  第二類非必要 Rebalance 是 Consumer 消費時間過長導致的。所以可以估算消費這條消息后,處理的時間,設置max.poll.interval.ms大於等於這個時長多1min。

如果這些都設置好,還是出現rebalance,可以排查一下Consumer 端的 GC 表現,比如是否出現了頻繁的 Full GC 導致的長時間停頓,從而引發了 Rebalance。

 

Rebalance流程

重平衡過程是如何通知到其他消費者實例的?答案就是,靠消費者端的心跳線程(Heartbeat Thread)。

當協調者決定開啟新一輪重平衡后,它會將“REBALANCE_IN_PROGRESS”封裝進心跳請求的響應中,發還給消費者實例。當消費者實例發現心跳響應中包含了“REBALANCE_IN_PROGRESS”,就能立馬知道重平衡又開始了,這就是重平衡的通知機制。

 消費者組狀態機

 

 

一個消費者組最開始是 Empty 狀態,當重平衡過程開啟后,它會被置於 PreparingRebalance 狀態等待成員加入,之后變更到 CompletingRebalance 狀態等待分配方案,最后流轉到 Stable 狀態完成重平衡。

當有新成員加入或已有成員退出時,消費者組的狀態從 Stable 直接跳到 PreparingRebalance 狀態,此時,所有現存成員就必須重新申請加入組。當所有成員都退出組后,消費者組狀態變更為 Empty。Kafka 定期自動刪除過期位移的條件就是,組要處於 Empty 狀態。因此,如果你的消費者組停掉了很長時間(超過 7 天),那么 Kafka 很可能就把該組的位移數據刪除了。我相信,你在 Kafka 的日志中一定經常看到下面這個輸出:

Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.

這就是 Kafka 在嘗試定期刪除過期位移。現在你知道了,只有 Empty 狀態下的組,才會執行過期位移刪除的操作。

消費者端重平衡流程

在消費者端,重平衡分為兩個步驟:分別是加入組和等待領導者消費者(Leader Consumer)分配方案。這兩個步驟分別對應兩類特定的請求:JoinGroup 請求和 SyncGroup 請求。

 

當組內成員加入組時,它會向協調者發送 JoinGroup 請求。在該請求中,每個成員都要將自己訂閱的主題上報,這樣協調者就能收集到所有成員的訂閱信息。一旦收集了全部成員的 JoinGroup 請求后,協調者會從這些成員中選擇一個擔任這個消費者組的領導者。

通常情況下,第一個發送 JoinGroup 請求的成員自動成為領導者。你一定要注意區分這里的領導者和之前我們介紹的領導者副本,它們不是一個概念。這里的領導者是具體的消費者實例,它既不是副本,也不是協調者。領導者消費者的任務是收集所有成員的訂閱信息,然后根據這些信息,制定具體的分區消費分配方案。

選出領導者之后,協調者會把消費者組訂閱信息封裝進 JoinGroup 請求的響應體中,然后發給領導者,由領導者統一做出分配方案后,進入到下一步:發送 SyncGroup 請求。

在這一步中,領導者向協調者發送 SyncGroup 請求,將剛剛做出的分配方案發給協調者。值得注意的是,其他成員也會向協調者發送 SyncGroup 請求,只不過請求體中並沒有實際的內容。這一步的主要目的是讓協調者接收分配方案,然后統一以 SyncGroup 響應的方式分發給所有成員,這樣組內所有成員就都知道自己該消費哪些分區了。

JoinGroup處理流程

 

 

 SyncGroup 請求的處理流程

 

 

 

Broker 端重平衡流程

場景一:新成員入組。

新成員入組是指組處於 Stable 狀態后,有新成員加入。如果是全新啟動一個消費者組,Kafka 是有一些自己的小優化的,流程上會有些許的不同。我們這里討論的是,組穩定了之后有新成員加入的情形。

當協調者收到新的 JoinGroup 請求后,它會通過心跳請求響應的方式通知組內現有的所有成員,強制它們開啟新一輪的重平衡。具體的過程和之前的客戶端重平衡流程是一樣的。現在,我用一張時序圖來說明協調者一端是如何處理新成員入組的。

 場景二:組成員主動離組。

消費者實例所在線程或進程調用 close() 方法主動通知協調者它要退出。這個場景就涉及到了第三類請求:LeaveGroup 請求。協調者收到 LeaveGroup 請求后,依然會以心跳響應的方式通知其他成員。

 

 場景三:組成員崩潰離組。

崩潰離組是指消費者實例出現嚴重故障,突然宕機導致的離組。它和主動離組是有區別的,因為后者是主動發起的離組,協調者能馬上感知並處理。但崩潰離組是被動的,協調者通常需要等待一段時間才能感知到,這段時間一般是由消費者端參數 session.timeout.ms 控制的。也就是說,Kafka 一般不會超過 session.timeout.ms 就能感知到這個崩潰。

 

 場景四:重平衡時協調者對組內成員提交位移的處理。

正常情況下,每個組內成員都會定期匯報位移給協調者。當重平衡開啟時,協調者會給予成員一段緩沖時間,要求每個成員必須在這段時間內快速地上報自己的位移信息,然后再開啟正常的 JoinGroup/SyncGroup 請求發送。

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM