kafka生產者和消費者流程


前言

根據源碼分析kafka java客戶端的生產者和消費者的流程。

 

基於zookeeper的舊消費者

kafka消費者從消費數據到關閉經歷的流程。

由於3個核心線程

  1. 基於zookeeper的連接器監聽該消費者是否觸發重平衡,並獲取該消費者客戶端消費的topic下group對應的partition以及offset。參考` ZookeeperConsumerConnector`
  2. 尋找partition leader線程循環尋找partition的leader,原理是基於zk的watch,並判斷哪些partition的leader發生改變,如果有發生改變,那么創建新的線程來消費這些partition,並把檢查之前的線程,如果有消費不是最新的leader那么就shutdown,參考`ConsumerFetcherManager`
  3. 數據拉取線程通過消費partition的leader,循環拉取數據

 

解放zookeeper的新消費者

總體思路

  1. 解放了消費者對zookeeper的依賴,現在只需要和broker交互就可以獲取所有需要的元數據。因此,和zk相關的線程都被解放了,取而代之的是基於事件觸發思想的流程。
  2. 減少了不必要的線程,將所有操作lazy化處理,也就是說整個消費流程是基於事件的。比如之前有專門維護心跳的線程,現在改為將心跳任務加入隊列,只有在某些事件達到時才會執行。由於在拉取數據時,頻繁去檢驗分區相關元數據會導致拉取性能下降,因此消費者每次在拉取完一批record后,會自動異步去拉取下一批的數據,並更新最新的元數據到內存,而此時使用者正在處理上一批的record,這樣處理就保證了基於事件思路下的元數據更新和拉取數據的性能。

 

拉取數據的兩種方式

  1. 手動分配topic和partition,也就是assign,分配之后,消費者客戶端不會感知到其他事件的觸發。
  2. 訂閱topic,自動分配到topic的partition,以及該groupid之前消費到的offset,當然也可以自動感知到kafka的rebalance,並獲取到相關事件。

 

拉取流程

  1. 確保kafka協調者認可了此次消費,並初始化和協調者的連接。認可很多層次的含義,包括kafka集群是否正常,安全認證是否通過之類。
  2. 確保分區被分配,除了手動assgin的topic,partition和offset,自動subscribe需要從kafka協調者獲取相關元數據,也是發生重平衡事件的來源。
  3. 確保已經獲取拉取的offset,否則為從協調者那重新獲取對應groupid的offset,如果獲取失敗(比如這是一個新的groupid),那么會重置offset,根據配置用最舊或者最新來代替。參考`ConsumerCoordinator`
  4. 拉取數據,通過拉取每個partition的leader,基於NIO思路拉取數據緩存在內存中;參考`Fetcher`。
  5. 提交offset,如果開啟自動提交offset的功能,那么消費者會在兩個情況同步提交offset,(1)重平衡或者和broker心跳超時,參考流程2;(2)消費者關閉。如果是手動提交的話可以采用異步或者同步兩種提交方式

 

重平衡觸發原因

  1. 消費的topic的partition數量發生了變化
  2. topic被創建或者刪除
  3. 有其他消費者客戶端使用相同的group來消費或者停止消費同一個topic

 

數據拉取線程分為兩類

kafka集群在同步副本數據的過程,也是基於客戶端拉取數據,和消費者稍有不同

  1. 基於消費者的數據拉取線程

這也就是我們常用的消費者,數據拉取后緩存在內存中給客戶端獲取

  1. 基於副本備份的數據拉取線程

kafka集群的副本通過此類線程拉取數據,並寫到本地日志中

 

kafka生產者

相對消費者,生產者的流程簡單很多。

生產者采用雙線程的方式,主線程也就是用戶線程,將數據交給kafka的生產者;另外一個是生產者自己的子線程,將數據源源不斷的發送到kafka。

 

生產流程

  1. 找到數據發送的partition;
  2. 將數據按照topic partition分類,每類采用一個雙向隊列存儲數據;檢查隊列最后一個批是否正在處理,嘗試加入到該批,否則創建一個新的批並加入到隊列末尾。如果最后一個批已經滿了(容量滿了或者數據超過1)或者有新的批被創建了,那么觸發生產者將數據發送到kafka
  3. 另外生產者的子線程也會不斷的自我循環,判斷內存中緩存的數據是否可以被發送到kafka,有5種情況,只要滿足其一就把數據發送到broker,full || expired || exhausted || closed || flushInProgress();(1) full參考流程2;(2) expired 距離上一次數據發送的時間超過了一個閾值;(3) exhausted 生產者內部維護了內存隊列,記錄了當前使用的內存大小,exhausted表示申請的總數>1的情況;(4) closed 生產者被關閉了;(5) 使用者顯示的調用了flush()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM