一、什么是kafka的Rebalance
kafka集群模式下,一個topic有多個partition,對於消費端,可以有多個consumer同時消費這些partition。為了保證大體上partition和consumer的均衡性,提升topic的並發消費能力,所以會有Rebalance。Rebalance 本質上是一種協議,規定了一個 Consumer Group 下的所有 consumer 如何達成一致,來分配訂閱 Topic 的每個分區。
二、什么時機觸發Rebalance
0.10kafka的rebalance條件
- 條件1:有新的consumer加入
- 條件2:舊的consumer掛了
- 條件3:coordinator掛了,集群選舉出新的coordinator(0.10 特有的)
- 條件4:topic的partition新加
- 條件5:consumer調用unsubscrible(),取消topic的訂閱
當一個group中,有consumer加入或者離開時,會觸發partitions均衡。Kafka的Consumer Rebalance方案是基於Zookeeper的Watcher來實現的。consumer啟動的時候,在zk下都維護一個”/consumers/[group_name]/ids”路徑,在此路徑下,使用臨時節點記錄屬於此cg的消費者的Id,該Id信息由對應的consumer在啟動時創建。每個consumer都會在此路徑下簡歷一個watcher,當有節點發生變化時,就會觸發watcher,然后觸發Rebalance過程。
三、0.9之前kafka的Rebalance算法
Consumer rebalacne算法:
1. 將目標 topic 下的所有 partirtion 排序,存於PT
2. 對某 consumer group 下所有 consumer 排序,存於 CG,第 i 個consumer 記為 Ci
3. N=size(PT)/size(CG),向上取整
4. 解除 Ci 對原來分配的 partition 的消費權(i從0開始)
5. 將第i*N到(i+1)*N-1個 partition 分配給 Ci
在Rebalance期間,consumer不能正常消費,並且這種Rebalance過程強依賴zk,存在以下問題:
- herd effect(羊群效應):一個被Watch的zk節點變化,導致大量的watcher通知需要被發送給客戶端,這會導致在通知期間其他操作的延遲。
- split brain:每個Consumer都是通過zk中保存的元數據來判斷group中各其他成員的狀態,以及broker的狀態,進而分別進入各自的Rebalance,執行各自的Rebalance邏輯。不同的Consumer在同一時刻可能連接在不同的zk服務器上,看到的元數據就可能不一樣,基於不一樣的元數據,執行Rebalance就會產生不一致(沖突)的Rebalance結果,Rebalance的沖突,會到導致consumer的rebalance失敗。
- 重復消費問題:因為Rebalance時,很有可能導致offset commit不成功,所以可能造成重復消費問題。
解決辦法:
- 加大Rebalance的重試時間:"rebalance.backoff.ms=5000"
- 加大Rebalance失敗的retry次數: "rebalance.max.retries=10"
- 捕獲"ConsumerRebalanceFailedException",退出程序。
四、0.9后kafka對Rebalance過程進行了改進
Group Coordinator是一個服務,每個Broker在啟動的時候都會啟動一個該服務。Group Coordinator的作用是用來存儲Group的相關Meta信息,並將對應Partition的Offset信息記錄到Kafka內置Topic(__consumer_offsets)中。Kafka在0.9之前是基於Zookeeper來存儲Partition的Offset信息(consumers/{group}/offsets/{topic}/{partition}),因為ZK並不適用於頻繁的寫操作,所以在0.9之后通過內置Topic的方式來記錄對應Partition的Offset。
每個Group都會選擇一個Coordinator來完成自己組內各Partition的Offset信息。那么consumer group如何確定自己的coordinator是誰呢? 簡單來說分為兩步:
- 確定consumer group位移信息寫入__consumers_offsets的哪個分區。具體計算公式:
- __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) 注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默認是50個分區。
- 該分區leader所在的broker就是被選定的coordinator。
前面說過, rebalance本質上是一組協議。group與coordinator共同使用它來完成group的rebalance。目前kafka提供了5個協議來處理與consumer group coordination相關的問題:
- Heartbeat請求:consumer需要定期給coordinator發送心跳來表明自己還活着
- LeaveGroup請求:主動告訴coordinator我要離開consumer group
- SyncGroup請求:group leader把分配方案告訴組內所有成員
- JoinGroup請求:成員請求加入組
- DescribeGroup請求:顯示組的所有信息,包括成員信息,協議名稱,分配方案,訂閱信息等
rebalance過程分為2步:Join和Sync
1 Join, 顧名思義就是加入組。這一步中,所有成員都向coordinator發送JoinGroup請求,請求入組。一旦所有成員都發送了JoinGroup請求,coordinator會從中選擇一個consumer擔任leader的角色,並把組成員信息以及訂閱信息發給leader——注意leader和coordinator不是一個概念。leader負責消費分配方案的制定。
2 Sync,這一步leader開始分配消費方案,即哪個consumer負責消費哪些topic的哪些partition。一旦完成分配,leader會將這個方案封裝進SyncGroup請求中發給coordinator,非leader也會發SyncGroup請求,只是內容為空。coordinator接收到分配方案之后會把方案塞進SyncGroup的response中發給各個consumer。這樣組內的所有成員就都知道自己應該消費哪些分區了。
五、如何避免不必要的Rebalance
除開consumer正常的添加和停掉導致rebalance外,在某些情況下,Consumer 實例會被 Coordinator 錯誤地認為 “已停止” 從而被“踢出”Group,導致rebalance,這種情況應該避免。
第一類非必要 Rebalance 是因為未能及時發送心跳,導致 Consumer 被 “踢出”Group 而引發的。這種情況下我們可以設置 session.timeout.ms 和 heartbeat.interval.ms 的值,來盡量避免rebalance的出現。(以下的配置是在網上找到的最佳實踐,暫時還沒測試過)
- 設置 session.timeout.ms = 6s。
- 設置 heartbeat.interval.ms = 2s。
- 要保證 Consumer 實例在被判定為 “dead” 之前,能夠發送至少 3 輪的心跳請求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
- 這兩個參數的區別 https://stackoverflow.com/questions/43881877/difference-between-heartbeat-interval-ms-and-session-timeout-ms-in-kafka-consume
將 session.timeout.ms 設置成 6s 主要是為了讓 Coordinator 能夠更快地定位已經掛掉的 Consumer,早日把它們踢出 Group。
第二類非必要 Rebalance 是 Consumer 消費時間過長導致的。此時,max.poll.interval.ms 參數值的設置顯得尤為關鍵。如果要避免非預期的 Rebalance,你最好將該參數值設置得大一點,比你的下游最大處理時間稍長一點。