前言
根據源碼分析kafka java客戶端的生產者和消費者的流程。
基於zookeeper的舊消費者
kafka消費者從消費數據到關閉經歷的流程。
由於3個核心線程
- 基於zookeeper的連接器監聽該消費者是否觸發重平衡,並獲取該消費者客戶端消費的topic下group對應的partition以及offset。參考` ZookeeperConsumerConnector`
- 尋找partition leader線程循環尋找partition的leader,原理是基於zk的watch,並判斷哪些partition的leader發生改變,如果有發生改變,那么創建新的線程來消費這些partition,並把檢查之前的線程,如果有消費不是最新的leader那么就shutdown,參考`ConsumerFetcherManager`
- 數據拉取線程通過消費partition的leader,循環拉取數據
解放zookeeper的新消費者
總體思路
- 解放了消費者對zookeeper的依賴,現在只需要和broker交互就可以獲取所有需要的元數據。因此,和zk相關的線程都被解放了,取而代之的是基於事件觸發思想的流程。
- 減少了不必要的線程,將所有操作lazy化處理,也就是說整個消費流程是基於事件的。比如之前有專門維護心跳的線程,現在改為將心跳任務加入隊列,只有在某些事件達到時才會執行。由於在拉取數據時,頻繁去檢驗分區相關元數據會導致拉取性能下降,因此消費者每次在拉取完一批record后,會自動異步去拉取下一批的數據,並更新最新的元數據到內存,而此時使用者正在處理上一批的record,這樣處理就保證了基於事件思路下的元數據更新和拉取數據的性能。
拉取數據的兩種方式
- 手動分配topic和partition,也就是assign,分配之后,消費者客戶端不會感知到其他事件的觸發。
- 訂閱topic,自動分配到topic的partition,以及該groupid之前消費到的offset,當然也可以自動感知到kafka的rebalance,並獲取到相關事件。
拉取流程
- 確保kafka協調者認可了此次消費,並初始化和協調者的連接。認可很多層次的含義,包括kafka集群是否正常,安全認證是否通過之類。
- 確保分區被分配,除了手動assgin的topic,partition和offset,自動subscribe需要從kafka協調者獲取相關元數據,也是發生重平衡事件的來源。
- 確保已經獲取拉取的offset,否則為從協調者那重新獲取對應groupid的offset,如果獲取失敗(比如這是一個新的groupid),那么會重置offset,根據配置用最舊或者最新來代替。參考`ConsumerCoordinator`
- 拉取數據,通過拉取每個partition的leader,基於NIO思路拉取數據緩存在內存中;參考`Fetcher`。
- 提交offset,如果開啟自動提交offset的功能,那么消費者會在兩個情況同步提交offset,(1)重平衡或者和broker心跳超時,參考流程2;(2)消費者關閉。如果是手動提交的話可以采用異步或者同步兩種提交方式
重平衡觸發原因
- 消費的topic的partition數量發生了變化
- topic被創建或者刪除
- 有其他消費者客戶端使用相同的group來消費或者停止消費同一個topic
數據拉取線程分為兩類
kafka集群在同步副本數據的過程,也是基於客戶端拉取數據,和消費者稍有不同
- 基於消費者的數據拉取線程
這也就是我們常用的消費者,數據拉取后緩存在內存中給客戶端獲取
- 基於副本備份的數據拉取線程
kafka集群的副本通過此類線程拉取數據,並寫到本地日志中
kafka生產者
相對消費者,生產者的流程簡單很多。
生產者采用雙線程的方式,主線程也就是用戶線程,將數據交給kafka的生產者;另外一個是生產者自己的子線程,將數據源源不斷的發送到kafka。
生產流程
- 找到數據發送的partition;
- 將數據按照topic partition分類,每類采用一個雙向隊列存儲數據;檢查隊列最后一個批是否正在處理,嘗試加入到該批,否則創建一個新的批並加入到隊列末尾。如果最后一個批已經滿了(容量滿了或者數據超過1)或者有新的批被創建了,那么觸發生產者將數據發送到kafka
- 另外生產者的子線程也會不斷的自我循環,判斷內存中緩存的數據是否可以被發送到kafka,有5種情況,只要滿足其一就把數據發送到broker,full || expired || exhausted || closed || flushInProgress();(1) full參考流程2;(2) expired 距離上一次數據發送的時間超過了一個閾值;(3) exhausted 生產者內部維護了內存隊列,記錄了當前使用的內存大小,exhausted表示申請的總數>1的情況;(4) closed 生產者被關閉了;(5) 使用者顯示的調用了flush()