kafka-Reblance


誰來執行Rebalance以及管理consumer的group呢

coordinator來執行對於consumer group的管理,當consumer group的第一個consumer啟動的時候,它會去和kafka server確定誰是它們組的coordinator。之后該group內的所有成員都會和該coordinator進行協調通信

如何確定coordinator?

consumer group如何確定自己的coordinator是誰呢, 消費者向kafka集群中的任意一個broker發送一個GroupCoordinatorRequest請求,服務端會返回一個負載最小的broker節點的id,並將該broker設置為coordinator.

JoinGroup的過程

整個rebalance的過程分為兩個步驟,Join和Sync

join: 表示加入到consumer group中,在這一步中,所有的成員都會向coordinator發送joinGroup的請求。一旦所有成員都發送了joinGroup請求,那么coordinator會選擇一個consumer擔任leader角色,並把組成員信息和訂閱信息發送消費者
leader選舉算法比較簡單,如果消費組內沒有leader,那么第一個加入消費組的消費者就是消費者leader,如果這個時候leader消費者退出了消費組,那么重新選舉一個leader,這個選舉很隨意,類似於隨機算法

 

protocol_metadata: 序列化后的消費者的訂閱信息
leader_id: 消費組中的消費者,coordinator會選擇一個座位leader,對應的就是member_id
member_metadata 對應消費者的訂閱信息
members:consumer group中全部的消費者的訂閱信息
generation_id: 年代信息,類似於之前講解zookeeper的時候的epoch是一樣的,對於每一輪rebalance,generation_id都會遞增。主要用來保護consumer group。隔離無效的offset提交。也就是上一輪的consumer成員無法提交offset到新的consumer group中。

 

確定分區分配策略

每個消費者都可以設置自己的分區分配策略,對於消費組而言,會從各個消費者上報過來的分區分配策略中選舉一個彼此都贊同的策略來實現整體的分區分配,這個"贊同"的規則是,消費組內的各個消費者會通過投票來決定.

在joingroup階段,每個consumer都會把自己支持的分區分配策略發送到coordinator,coordinator手機到所有消費者的分配策略,組成一個候選集,每個消費者需要從候選集里找出一個自己支持的策略,並且為這個策略投票
最終計算候選集中各個策略的選票數,票數最多的就是當前消費組的分配策略

 

Synchronizing Group State階段

完成分區分配之后,就進入了Synchronizing Group State階段,主要邏輯是向GroupCoordinator發送SyncGroupRequest請求,並且處理SyncGroupResponse響應,簡單來說,就是leader將消費者對應的partition分配方案同步給consumer group 中的所有consumer

 

 

 

 

每個消費者都會向coordinator發送syncgroup請求,不過只有leader節點會發送分配方案,其他消費者只是打打醬油而已。當leader把方案發給coordinator以后,coordinator會把結果設置到SyncGroupResponse中。這樣所有成員都知道自己應該消費哪個分區。
Ø consumer group的分區分配方案是在客戶端執行的!Kafka將這個權利下放給客戶端主要是因為這樣做可以有更好的靈活性.

 

總結


我們再來總結一下consumer group rebalance的過程

 

  1. Ø 對於每個consumer group子集,都會在服務端對應一個GroupCoordinator進行管理,GroupCoordinator會在zookeeper上添加watcher,當消費者加入或者退出consumer group時,會修改zookeeper上保存的數據,從而觸發GroupCoordinator開始Rebalance操作
  2. Ø 當消費者准備加入某個Consumer group或者GroupCoordinator發生故障轉移時,消費者並不知道GroupCoordinator的在網絡中的位置,這個時候就需要確定GroupCoordinator,消費者會向集群中的任意一個Broker節點發送ConsumerMetadataRequest請求,收到請求的broker會返回一個response作為響應,其中包含管理當前ConsumerGroup的GroupCoordinator,
  3. Ø 消費者會根據broker的返回信息,連接到groupCoordinator,並且發送HeartbeatRequest,發送心跳的目的是要要奧噶蘇GroupCoordinator這個消費者是正常在線的。當消費者在指定時間內沒有發送心跳請求,則GroupCoordinator會觸發Rebalance操作。

Ø 發起join group請求,兩種情況

  1. 如果GroupCoordinator返回的心跳包數據包含異常,說明GroupCoordinator因為前面說的幾種情況導致了Rebalance操作,那這個時候,consumer會發起join group請求
  2. 新加入到consumer group的consumer確定好了GroupCoordinator以后,消費者會向GroupCoordinator發起join group請求,
  3. GroupCoordinator會收集全部消費者信息之后,來確認可用的消費者,並從中選取一個消費者成為group_leader。並把相應的信息(分區分配策略、leader_id、…)封裝成response返回給所有消費者,但是只有group leader會收到當前consumer group中的所有消費者信息
  4. 當消費者確定自己是group leader以后,會根據消費者的信息以及選定分區分配策略進行分區分配接着進入Synchronizing Group State階段,
  5. 每個消費者會發送SyncGroupRequest請求到GroupCoordinator,但是只有Group Leader的請求會存在分區分配結果(Leader負責根據分區分配規則進行分區分配),GroupCoordinator會根據Group Leader的分區分配結果形成SyncGroupResponse返回給所有的Consumer。
  6. consumer根據分配結果,執行相應的操作

 

注: 參照自咕泡mic 

 

 

kafka集群中的一個broker中最多只能有一個副本,leader副本所在的broker節點的 分區叫leader節點,follower副本所在的broker節點的分區叫follower節點, follow節點不支持client端的請求.

 ISR副本:包含了leader副本和所有與leader副本保持同步的follower副本,注意是所有的副本,而不只是 leader副本。

LEO:即日志末端位移(log end offset),記錄了該副本底層日志(log)中下一條消息的位移值。注意是下 一條消息!也就是說,如果LEO=10,那么表示該副本保存了10條消息,位移值范圍是[0, 9]。另外, leader LEO和follower LEO的更新是有區別的。

HW:即上面提到的水位值。對於同一個副本對象而言,其HW值不會大於LEO值。小於等於HW值的所 有消息都被認為是“已備份”的(replicated)。同理,leader副本和follower副本的HW更新是有區別的 從生產者發出的 一 條消息首先會被寫入分區的leader 副本,不過還需要等待ISR集合中的所有 follower副本都同步完之后才能被認為已經提交,之后才會更新分區的HW, 進而消費者可以消費 到這條消息。

 


ISR

 

ISR表示目前“可用且消息量與leader相差不多的副本集合,這是整個副本集合的一個子集”

 一個新leader被選舉並被接受客戶端的消息成功寫入。Kafka確保從同步副本列表中選舉一個副本為 leader;leader負責維護和跟蹤ISR(in-Sync replicas , 副本同步隊列)中所有follower滯后的狀態。當 producer發送一條消息到broker后,leader寫入消息並復制到所有follower。消息提交之后才被成功復 制到所有的同步副本。

具體來說,ISR集合中的副本必須滿足兩個條件

1. 副本所在節點必須維持着與zookeeper的連接

2. 副本最后一條消息的offset與leader副本的最后一條消息的offset之間的差值不能超過指定的閾值 (replica.lag.time.max.ms) replica.lag.time.max.ms:如果該follower在此時間間隔內一直沒有追 上過leader的所有消息,則該follower就會被剔除isr列表

3. ISR數據保存在Zookeeper的 /brokers/topics//partitions//state 節點中

follower副本把leader副本LEO之前的日志全部同步完成時,則認為follower副本已經追趕上了leader 副本,這個時候會更新這個副本的lastCaughtUpTimeMs標識,

kafk副本管理器會啟動一個副本過期檢 查的定時任務,這個任務會定期檢查當前時間與副本的lastCaughtUpTimeMs的差值是否大於參數 replica.lag.time.max.ms 的值,如果大於,則會把這個副本踢出ISR集合

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM