GroupCoordinator機制


GroupCoordinator機制

1 介紹:

Kafka 的 Server 端主要有三塊內容:GroupCoordinator、Controller 和 ReplicaManager,其中,GroupCoordinator 的內容是與 Consumer 端緊密結合在一起的,簡單來說就是,GroupCoordinator 是負責進行 consumer 的 group 成員的rebalance與 offset 管理。GroupCoordinator 處理的 client 端請求類型可以看出來,它處理的請求類型主要有以下幾種:

  • l  ApiKeys.OFFSET_COMMIT;
  • l  ApiKeys.OFFSET_FETCH;
  • l  ApiKeys.JOIN_GROUP;
  • l  ApiKeys.LEAVE_GROUP;
  • l  ApiKeys.SYNC_GROUP;
  • l  ApiKeys.DESCRIBE_GROUPS;
  • l  ApiKeys.LIST_GROUPS;
  • l  ApiKeys.HEARTBEAT;

而 Kafka Server 端要處理的請求總共有21 種,其中有 8 種是由 GroupCoordinator 來完成的。

2 rebalance機制

我們知道kafka保證同一消費組中的每個consumer能夠消費一個或者多個特定的partition數據,一個partition的數據只能被一個consumer消費;因為每個partition里的消息是有序的,這樣可以保證partition中的數據被同一個消費者有序消費;同時consumer只需要和自己消費的partition的broker通信就可以,減少開銷。

在如下條件下,partition要在consumer中重新分配:

  • l  條件1:有新的consumer加入
  • l  條件2:舊的consumer掛了
  • l  條件3:coordinator掛了,集群選舉出新的coordinator
  • l  條件4:topic的partition新加
  • l  條件5:consumer調用unsubscrible(),取消topic的訂閱

在kafka中消費者的分區分配策略默認有兩種:range和RoundRobin。

給定一個topic,有4個partition: p0, p1, p2, p3, 一個group有3個consumer: c0, c1, c2。那么,如果按范圍分配策略,分配結果是:

c0: p0, c1: p1, c2: p2, p3

如果按輪詢分配策略:

c0: p1, p3, c1: p1, c2: p2

2.1基於zk的rebalance

在kafka0.9版本之前,consumer的rebalance是通過在zookeeper上注冊watch完成的。每個consumer創建的時候,會在在Zookeeper上的路徑為/consumers/[consumer group]/ids/[consumer id]下將自己的id注冊到消費組下;然后在/consumers/[consumer group]/ids 和/brokers/ids下注冊watch;最后強制自己在消費組啟動rebalance。

這種做法很容易帶來zk的羊群效應,任何Broker或者Consumer的增減都會觸發所有的Consumer的Rebalance,造成集群內大量的調整;同時由於每個consumer單獨通過zookeeper判斷Broker和consumer宕機,由於zk的腦裂特性,同一時刻不同consumer通過zk看到的表現可能是不一樣,這就可能會造成很多不正確的rebalance嘗試;除此之外,由於consumer彼此獨立,每個consumer都不知道其他consumer是否rebalance成功,可能會導致consumer group消費不正確。

2.2 Coordinator

基於zk的rebalance存在不可避免的羊群效應和腦裂問題,如何不用zk來協調,而是將失敗探測和Rebalance的邏輯放到一個高可用的中心,那么上述問題就能得以解決;因此kafka0.9.*的版本重新設計了consumer端,誕生了這樣一個高可用中心Coordinator,大大減少了zookeeper負載。

對於每一個Consumer Group,Kafka集群為其從broker集群中選擇一個broker作為其coordinator。coordinator主要做兩件事:

  • 維持group的成員組成。這包括加入新的成員,檢測成員的存活性,清除不再存活的成員。
  • 協調group成員的行為。

Coordinator有如下幾種類型:

  • GroupCoordinator:broker端的,每個kafka server都有一個實例,管理部分的consumer group和它們的offset
  • WorkerCoordinator:broker端的,管理GroupCoordinator程序,主要管理workers的分配。
  • ConsumerCoordinator:consumer端的,和GroupCoordinator通信的媒介。

ConsumerCoordinator是KafkaConsumer的一個成員,只負責與GroupCoordinator通信,所以真正的協調者還是GroupCoordinator。

2.3分區步驟:

Kafka為consumerGroup

 

 

步驟1:對於每1個consumer group,Kafka集群為其從broker集群中選擇一個broker作為其coordinator。因此,第1步就是找到這個coordinator。

 

 

 

2.4 group 如何選擇相應的 GroupCoordinator

要說這個,就必須介紹一下這個 __consumer_offsets topic 了,它是 Kafka 內部使用的一個 topic,專門用來存儲 group 消費的情況,默認情況下有50個 partition,每個 partition 默認有三個副本,而具體的一個 group 的消費情況要存儲到哪一個 partition 上,是根據 abs(GroupId.hashCode()) % NumPartitions來計算的(其中,NumPartitions 是 __consumer_offsets 的 partition 數,默認是50個)。

對於 consumer group 而言,是根據其 group.id 進行 hash 並計算得到其具對應的 partition 值,該 partition leader 所在 Broker 即為該 Group 所對應的 GroupCoordinator,GroupCoordinator 會存儲與該 group 相關的所有的 Meta 信息。

       此外,每一個分區,其實是4個offset:

 

 

Leo:代表了producer提交的message set的最后一個offset

HW:代表了最大可以供Consumer消費的message set的offset

Current Position:代表了某個Consumer Group消費到的位置

Last Committed Offset:代表了某個Consumer Group Commit的offset。

其中leo,hw都不必說了,他們是存在於recovery-point-offset-checkpoint, replication-checkpoint 兩個文件中的它兩個都是針對於分區來說的。

Current position 是有Consumer自身知道的。

Last Committed Offset則是記錄了Partition在某個Consumer Group的消費情況。

 

步驟2:找到coordinator之后,發送JoinGroup請求

 

 

 

 

步驟3:JoinGroup返回之后,發送SyncGroup,得到自己所分配到的partition

 

 

 

 

partition的分配策略和分配結果其實是由client決定的,而不是由coordinator決定的。在第2步,所有consumer都往coordinator發送JoinGroup消息之后,coordinator會指定其中一個consumer作為leader,其他consumer作為follower。

 

然后由這個leader進行partition分配。然后在第3步,leader通過SyncGroup消息,把分配結果發給coordinator,其他consumer也發送SyncGroup消息,獲得這個分配結果。

為什么要在consumer中選一個leader出來,進行分配,而不是由coordinator直接分配呢?關於這個, Kafka的官方文檔有詳細的分析。其中一個重要原因是為了靈活性:如果讓server分配,一旦需要新的分配策略,server集群要重新部署,這對於已經在線上運行的集群來說,代價是很大的;而讓client分配,server集群就不需要重新部署了。

 

Rebalance Generation

它表示了rebalance之后的一屆成員,主要是用於保護consumer group,隔離無效offset提交的。比如上一屆的consumer成員是無法提交位移到新一屆的consumer group中。每次group進行rebalance之后,generation號都會加1,表示group進入到了一個新的版本,如下圖所示: Generation 1時group有3個成員,隨后成員2退出組,coordinator觸發rebalance,consumer group進入Generation 2,之后成員4加入,再次觸發rebalance,group進入Generation 3.

 




 

2.5 heartbeat的實現原理

前面介紹了rebalance的條件,這些條件主要是通過heartbeat感知,每一個consumer都會定期的往coordinator發送heartbeat消息,一旦coordinator返回了某個特定的error code:ILLEGAL_GENERATION, 就說明之前的group無效了(解散了),要重新進行JoinGroup + SyncGroup操作。

 

那這個定期發送如何實現呢?一個直觀的想法就是開一個后台線程,定時發送heartbeat消息,但維護一個后台線程,很顯然會增大實現的復雜性。上面也說了, consumer是單線程程序。在這里是通過DelayedQueue來實現的。

 

DelayedQueue與HeartBeatTask

其基本思路是把HeartBeatRequest放入一個DelayedQueue中,然后在while循環的poll中,每次從DelayedQueue中把請求拿出來發送出去(只有時間到了,Task才能從Queue中拿出來)。

 

 

HeartbeatRequest消息體比較簡單,包含group_id(String),group_generation_id(int),member_id(String)三個字段。HeartbeatResponse消息體只包含short類型的error_code。

HeartbeatTask是一個實現DelayedTask接口的定時任務,負責定時發送HeartbeatRequest並處理其響應,實現邏輯都在run()方法中實現,HeartbeatTask.run()的具體流程:

 

 

(1)首先檢查是否需要發送HeartbeatRequest,條件有三個,一個不滿足就不能發送心跳:

  • GroupCoordinator 已經確定且已連接。
  • 不處於正在等待Partition分配結果的狀態。
  • 之前的HeartbeatRequest請求正常收到響應且沒有過期。
    如果不符合條件,就不會再執行HeartbeatTask,等待后續調用reset()方法重啟HeartbeatTask任務。


(2)調用Heartbeat.sessionTimeoutExpired(now),判斷HeartbeatResponse是否超時,如果超時,則認為GroupCoordinator宕機,調用coordinatorDead()清空其unsent集合中對應的請求隊列並將這些請求標記為異常后結束,將coordinator字段設置為Null,表示將重新選舉GroupCoordinator。同時停止HeartbeatTask的執行。 coordinatorDead()代碼:

 

 

(3)檢測HeartbeatTask是否到期,如果不到期則更新到期時間,將HeartbeatTask對象重新添加到DelayedTaskQueue中,等待其到期后執行;如果已經到期就發送HeartbeatRequest請求。

(4)更新最近一次發送HeartbeatRequest請求的時間,將requestInFlignt設置為true,表示有未響應的HeartbeatRequest請求,防止重復發送。

(5)創建HeartbeatRequest請求,並調用ConsumerNetworkClient.send()方法,將請求放入unsent集合中緩存並返回RequestFuture<Void>。然后ConsumerNetworkClient.poll()會將HeartbeatRequest請求發送給GroupCoordinator。
(6)在RequestFuture<Void>對象上添加RequestFutureListener。
HeartbeatTask.run()具體實現:

 

 

sendHeartbeatRequest()

使用HeartbeatCompletionHandler將client.send()方法返回的RequestFuture<ClientResponse>適配成RequestFuture<Void>后返回

 


使用HeartbeatCompletionHandler中實現的是HeartbeatResponse的核心邏輯:

CoordinatorResponseHandler是一個抽象類,其中有parse()和handle()兩個抽象方法,parse()方法對ClientResponse進行解析,得到指定類型的響應;handle()對解析后的響應進行處理。CoordinatorResponseHandler實現了RequestFuture抽象類的onSuccess()方法和onFailure方法

 

 

RequestFuture<ClientResponse>和RequestFutureListener<ClientResponse>
實現了配適器的功能。當ClientResponse傳遞到HeartbeatCompletionHandler時,會通過parse()方法解析成HeartbeatResponse,然后進入handle()方法處理。

在HeartbeatCompletionHandler.handle()方法中,判斷HeartbeatResponse中是否包含錯誤碼,如果不包含,則調用RequestFuture<Void>的complete(null)方法,將HeartbeatResponse成功的事件傳播下去,否則,根據錯誤碼分類處理,並調用raise()設置對應的異常。如:
錯誤碼是Errors.ILLEGAL_GENERATION,表示HeartbeatRequest中攜帶的generationId過期,GroupCoordinator已經開始新一輪的Rebalance操作,則將rejoinNeeded設置為true,這樣會重新發送JoinGroupRequest請求嘗試加入Consumer Group,也會導致HeartbeatTask任務停止。如果錯誤碼是UNKNOWN_MEMBER_ID,表示GroupCoordinator識別不了此Consumer,則清空memberId,嘗試重新加入Consumer Group。
分析handle()方法的具體實現代碼:

 



HeartbeatCompletionHandler.handle()方法中會調用RequestFuture<Void>的complete()方法或raise()方法,這兩個方法中沒有處理邏輯,但是會觸發其上的RequestFutureListener<Void>(在HeartbeatTask.run()方法中注冊),此監聽器會將requestInFlight設置為false,表示所有HeartbeatRequest都已經完成,並將HeartbeatTask重新放入定時任務隊列,等待下一次到期執行。

2.6 consumer group狀態機

和很多kafka組件一樣,group也做了個狀態機來表明組狀態的流轉。coordinator根據這個狀態機會對consumer group做不同的處理,如下圖所示

 

 

下圖中的各個狀態:

  • l  Dead:組內已經沒有任何成員的最終狀態,組的元數據也已經被coordinator移除了。這種狀態響應各種請求都是一個response: UNKNOWN_MEMBER_ID
  • l  Empty:組內無成員,但是位移信息還沒有過期。這種狀態只能響應JoinGroup請求
  • l  PreparingRebalance:組准備開啟新的rebalance,等待成員加入
  • l  AwaitingSync:正在等待leader consumer將分配方案傳給各個成員
  • l  Stable:rebalance完成!可以開始消費了

 

新成員加入:

 

 

成員崩潰:

 

 

成員主動離組:

 

 

提交offset:

 

 

 

 


3 offset管理

老版本的位移是提交到zookeeper中的,目錄結構是:/consumers/<group.id>/offsets/<topic>/<partitionId>,但是zookeeper其實並不適合進行大批量的讀寫操作,尤其是寫操作。

 

 


因此kafka提供了另一種解決方案:增加__consumeroffsets topic,將offset信息寫入這個topic,擺脫對zookeeper的依賴(指保存offset這件事情)。__consumer_offsets中的消息保存了每個consumer group某一時刻提交的offset信息。依然以上圖中的consumer group為例,格式大概如下:

 


 

__consumers_offsets topic配置了compact策略,使得它總是能夠保存最新的位移信息,既控制了該topic總體的日志容量,也能實現保存最新offset的目的。

 


3.1 offset的管理過程

offset提交消息會根據消費組的key(消費組名稱)進行分區. 對於一個給定的消費組,它的所有消息都會發送到唯一的broker(即Coordinator)
Coordinator上負責管理offset的組件是Offset manager。負責存儲,抓取,和維護消費者的offsets. 每個broker都有一個offset manager實例. 有兩種具體的實現:
ZookeeperOffsetManager: 調用zookeeper來存儲和接收offset(老版本的位移管理)。
DefaultOffsetManager: 提供消費者offsets內置的offset管理。
通過在config/server.properties中的offset.storage參數選擇。
DefaultOffsetManager
除了將offset作為logs保存到磁盤上,DefaultOffsetManager維護了一張能快速服務於offset抓取請求的consumer offsets表。這個表作為緩存,包含的含僅僅是”offsets topic”的partitions中屬於leader partition對應的條目(存儲的是offset)。
對於DefaultOffsetManager還有兩個其他屬性: “offsets.topic.replication.factor和”offsets.topic.num.partitions”,默認值都是1。這兩個屬性會用來自動地創建”offsets topic”。

offset manager接口的概要:

 

 

 

3.2 Offset Commit提交過程:

消費端
一條offset提交消息會作為生產請求.當消費者啟動時,會為”offsets topic”創建一個消費者。下面是內置的生產者的一些屬性:
可以使用異步.但是使用同步可以避免延遲的生產請求(因為是批量消息),並且我們需要立即知道offset消息是否被broker成功接收
|request.required.acks|-1|確保所有的replicas和leader是同步的,並且能看到所有的offset消息
|key.serializer.class|StringEncoder|key和payload都是strings
注意我們沒有對提交的offset消息進行壓縮,因為每條消息本身大小是很小的,如果壓縮了反而適得其反.
目前key和offset的值通過純文本方式傳遞. 我們可以轉換為更加緊湊的二進制協議,而不是把Long類型的offset和Int類型的partition作為字符串. 當然在不斷演進時還要考慮版本和格式協議.
broker
broker把接收到的offset提交信息當做一個正常的生產請求,對offset請求的處理和正常的生產者請求處理方式是一樣的.一旦將數據追加到leader的本地日志中,並且所有的replicas都趕上leader.leader檢查生產請求是”offsets topic”,(因為broker端的處理邏輯針對offset請求和普通生產請求是一樣的,如果是offset請求,還需要有不同的處理分支)它就會要求offset manager添加這個offset(對於延遲的生產請求,更新操作會在延遲的生產請求被完成的時候).
因為設置了acks=-1,只有當這些offsets成功地復制到ISR中的所有brokers,才會被提交給offset manager.

 

 

 

 

3.3 Offset Fetch獲取過程:

消費端
消費者啟動時,會首先創建到任意一個存活的brokers的通道.因此消費者會發送它所有”OffsetFetchRequest”到這個隨機選中的broker.。如果出現錯誤,這個通道就會被關閉,並重新創建一個隨機的通道。

broker
一個Offset抓取請求包含了多個topic-partitions. 接收請求的broker可能有也可能沒有請求的partitions的offset信息。因此接收請求的brokers也會和其他broker通信. 一個通道連接池會用來轉發請求給partition的leader broker.
下面是一個broker在接收到一個offset抓取請求后的步驟:

 


接收請求的broker首先決定”offset topic”的哪個partition負責這個請求從broker的leader cache中找出對應partition的leader(會在controller的每次metadata更新請求中更新緩存)
如果接收請求的broker就是leader,它會從自己的offset manager中讀取出offset,並添加到響應中如果offset不存在,返回UnknownTopicOrPartitionCode如果broker正在加載offsets table,返回OffsetLoadingCode.消費者受到這個狀態碼會在之后重試。如果接收請求的broker不是指定topic-partition的leader,它會將OffsetFetchRequest轉發給這個partition的當前leader,如果”offsets topic”這個時候不存在,它會嘗試自動創建,在創建成果后,會返回offset=-1

 

4 參考資料:

https://blog.csdn.net/chunlongyu/article/details/52791874

http://www.clouder.top/2018/03/12/kafka-consumer1/

https://segmentfault.com/a/1190000011441747

http://matt33.com/2018/01/28/server-group-coordinator/

https://www.jianshu.com/p/7e99830b1236

https://www.jianshu.com/p/5aa8776868bb

https://blog.csdn.net/qq_26222859/article/details/55101973

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM