Kafka 0.10 Coordinator概述


由Kafka內置實現了失敗檢測和Rebalance(ZKRebalancerListener),但是它存在羊群效應和腦裂的問題,客戶端代碼實現低級API也不能解決這個問題。如果將失敗探測和Rebalance的邏輯放到一個高可用的中心Coordinator,這兩個問題即可解決。同時還可大大減少Zookeeper的負載,有利於Kafka Broker的擴展(Broker也會作為協調節點的角色存在)。

有幾種類型:

  • GroupCoordinator: GroupCoordinator handles general group membership and offset management. Each Kafka server instantiates a coordinator which is responsible for a set of groups. Groups are assigned to coordinators based on their group names.
  • WorkerCoordinator:This class manages the coordination process with the Kafka group coordinator on the broker for managing assignments to workers.
  • ConsumerCoordinator:This class manages the coordination process with the consumer coordinator.

中文:

  • GroupCoordinator:broker端的,每個kafka server都有一個實例,管理部分的consumer group和它們的offset
  • WorkerCoordinator:broker端的,管理GroupCoordinator程序,主要管理workers的分配。
  • ConsumerCoordinator:consumer端的,管理ConsumerCoordinator程序。

1.Broker端的Coordinator

Kafka的group management protocol包括以下的動作序列:

  • Group Registration:Group的成員需要向cooridnator注冊自己,並且提供關於成員自身的元數據(比如,這個消費成員想要消費的topic)
  • Group/Leader Selection:cooridnator確定這個group包括哪些成員,並且選擇其中的一個作為leader。
  • State Assignment: leader收集所有成員的metadata,並且給它們分配狀態(state,可以理解為資源,或者任務)。
  • Group Stabilization: 每個成員收到leader分配的狀態,並且開始處理。

這里邊有三個角色:coordinator, group memeber, group leader.

有這么幾個情況:

  1. 所有的consumer線程要先向coordinator注冊,由coordinator選出leader, 然后由leader來分配state。 從group memeber里選出來一個做為leader,由leader來執行性能開銷大的協調任務, 這樣把負載分配到client端,可以減輕broker的壓力,支持更多數量的消費組。
  2. 所有group member(指的是consumer線程)都需要發心跳給coordinator,這樣coordinator才能確定group的成員。
  3. 對於Kafka consumer,它的實際上必須跟coordinator保持連接,因為它還需要提交offset給coordinator。所以coordinator實際上負責commit offset,那么,即使leader來確定狀態的分配,但是每個partition的消費起始點,還需要coordinator來確定。

問題:這就帶來了一問題,每個partition的消費開始的offset是由leader向coordinator請求,然后做為state分配,還是leader只分配partition,而follower去coordinator處請求開始消費的offset?

回答:我從合理性來思考,coordinator向leader發送了topic-partition的Offset消費情況,leader分配好partition后,回傳給leader。所有的follower同步這個狀態。

2.客戶端的ConsumerCoordinator

image
這張圖展示了Server和Client端的關系。

ConsumerCoordinator是KafkaConsumer的一個成員變量,所以每個消費者都要自己的ConsumerCoordinator,消費者的ConsumerCoordintor只是和服務端的GroupCoordinator通信的介質。

下文中提到的協調者一般指的是服務端的GroupCoordinator。

每個KafkaServer都有一個GroupCoordinator實例,服務端的GroupCoordinator管理消費組成員和offset,它可以管理多個消費組(因為Broker本身即使存儲一個topic的消息,也可以被不同的消費組訂閱)。注意:組成員的狀態管理(比如GroupMetadata)是在服務端的GroupCoordinator完成的,而不是由消費組的ConsumerCoordinator完成(因為消費者只能看到自己的,無法看到和自己同組的其他成員)。

3.Consumer消費者的工作過程:

  1. 在consumer啟動時或者coordinator節點故障轉移時,consumer發送ConsumerMetadataRequest給任意一個brokers。在ConsumerMetadataResponse中,它接收對應的Consumer Group所屬的Coordinator的位置信息。
  2. Consumer連接Coordinator節點,並發送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration錯誤碼,說明協調節點已經在初始化平衡。消費者就會停止抓取數據,提交offsets,發送JoinGroupRequest給協調節點。在JoinGroupResponse,它接收消費者應該擁有的topic-partitions列表以及當前Consumer Group的新的generation編號。這個時候Consumer Group管理已經完成,Consumer就可以開始fetch數據,並為它擁有的partitions提交offsets。
  3. 如果HeartbeatResponse沒有錯誤返回,Consumer會從它上次擁有的partitions列表繼續抓取數據,這個過程是不會被中斷的。
    image

4. Coordinator協調節點的工作過程

  1. 在穩定狀態下,Coordinator節點通過故障檢測協議跟蹤每個Consumer Group中每個Consumer的健康狀況。
  2. 在選舉和啟動時,Coordinator節點讀取它管理的Consumer Group列表,以及從ZK中讀取每個消費組的成員信息。如果之前沒有成員信息,它不會做任何動作。只有在同一個消費組的第一個消費者注冊進來時,Coordinator節點才開始工作(即開始加載Group的Consumer成員信息)。
  3. 當Coordinator節點完全加載完它所負責的Consumer Group列表的所有組成員之前,它會在以下幾種請求的響應中返回CoordinatorStartupNotComplete錯誤碼:HeartbeatRequest,OffsetCommitRequest,JoinGroupRequest。這樣消費者就會過段時間重試(直到完全加載,沒有錯誤碼返回為止)。
  4. 在選舉或啟動時,Coordinator節點會對消費組中的所有消費者進行故障檢測。根據故障檢測協議被協調節點標記為Dead的消費者會從消費組中移除,這個時候協調節點會為Dead的消費者所屬的消費組觸發一個Rebalance操作(消費者Dead之后,這個消費者擁有的partition需要平衡給其他消費者)。
  5. 當HeartbeatResponse返回IllegalGeneration錯誤碼,就會觸發平衡操作。一旦所有存活的Consumer通過JoinGroupRequests重新注冊到Coordinator節點,Coordinator節點會將最新的partition所有權信息在JoinGroupResponse的每個消費者之間通信(同步),然后就完成了Rebalance操作。
  6. Coordinator節點會跟蹤任何一個Consumer已經注冊的topics的topic-partition的變更。如果它檢測到某個topic新增的partition,就會觸發Rebalance操作。當創建一個新的topics也會觸發Rebalance操作,因為消費者可以在topic被創建之前就注冊它感興趣的topics。

Consumer狀態機

image

Coordinator狀態機

image

當Coordinator發生故障時,Consumer發現new Coordinator的順序可能發生在新的協調者完成故障處理(包括從zk中加載消費組元數據等)之前或之后。如果在完成故障處理之后才發現new Coordinator,new Coordinator就會像之前一樣接收消費者的心跳請求。而如果是在之前,新的協調者則會拒絕消費者的心跳請求,會導致消費者重新發現協調者,並重新連接協調者。如果消費者太晚連接新的協調者,協調者可能會標記消費者掛掉了,消費者再次加入時,會認為這是一個新的消費者,並觸發rebalance。

消費者發現新的協調者(co-ordinator re-discovery),包括兩個步驟,首先確定新的協調者,然后消費者連接協調者。如果新的協調者確定了,並且消費者成功連接上新協調者,這樣消費者發送的心跳請求就會被新的協調者正常接收。但是如果新協調者已經確定,而消費者並沒有連接上新的協調者,消費者發送的心跳請求並不會被接收:因為連接都還沒有建立!

5.Coordinator存儲的信息有什么

對於每個Consumer Group,Coordinator會存儲以下信息:

  1. 對每個存在的topic,可以有多個消費組group訂閱同一個topic(對應消息系統中的廣播)
  2. 對每個Consumer Group,元數據如下:
  • 訂閱的topics列表
  • Consumer Group配置信息,包括session timeout等
  • 組中每個Consumer的元數據。包括主機名,consumer id
  • 每個正在消費的topic partition的當前offsets
  • Partition的ownership元數據,包括consumer消費的partitions映射關系

沒看完,這里還有更多的細節, 博文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM