Kafka消息發送流程


1. 簡單流程概述

 

  A. 流程描述

    a. producer先從zookeeper的"/brokers/.../state"節點找到該partition的leader
    b. producer將消息發送給該leader
    c. leader將消息寫入到本地的log
    d. follower從leader拉取消息,寫入本地的log后向leader發送ack
    e. leader收到ISR中的replica的ack后,增加HW(high watermark,最后commit的offset)並向producer發送ACK。

 

2. 消息發送的方式

  Kafka的Producer發送消息采用的是異步發送的方式,且在新版的Kafka ApI中只有異步的發送方式。

  但是可以通過異步發送的API達到同步的效果。

3. Kafka發送消息的具體流程

  

 

  在消息發送過程中,涉及到兩個線程,以及一個線程共享變量-RecordAccumulator.

  兩個線程:

    一是主線程,負責將消息進行封裝和加工發送給消息中間件(RecordAccumulator)

    二是send線程,負責從消息中間件中拉取數據發送到主題(Topic)的對應分區(Partition)

  A. 流程描述

    main線程

      a. 生產將要發送的數據封裝成ProducerRecord對象,目的是發送到消息中間件

      b. 中間要經過攔截器列表、序列化器和分區器將消息發送到消息中間件

      c. RecordAccumulator中有多個隊列,與topic的分區相對應。消息發送時直接發送到分區對應的RecordAccumulator隊列中

    sender線程

      d. 當RecordAccumulator中攢夠一批數據后,即達到指定量的數據之后,Sender線程將這一批數據拉取並發送給Topic。

          控制參數:batch.size=>只有數據積累到batch.size之后,sender才會發送數據。

      e. 同時,如果RecordAccumulator中隊列遲遲到不到指定量的數據時,會等到一定時長時發送。

          控制參數:linger.ms=>如果數據遲遲未達到batch.size,sender等待linger.time之后就會發送數據

  

 

 

 4. Kafka的重試機制

  

 

  消息的重試的實現需要在API的send方法中完成回調函數,回調函數會在producer收到ack時調用。

  回調函數為異步調用,相當於另一個線程去實現。

  該方法有兩個參數,分別是RecordMetadata和Exception。

  如果Exception為空,說明消息發送成功,如果Exception不為空,說明消息發送失敗。

  在回調函數中除了記錄日之外,基本不能實現其他有意義的事情。

  注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。

   A. 流程描述

    a. sender線程發送數據到Kafka Server,當Kafka處理數據結束后,會判斷數據是否寫入成功。

    b. 如果數據處理成功,則返回給Producer相關RecordMetadata和Exception,此時Exception為空

    c. 如果數據處理失敗,則再次判定是否可以進行重發消息。

    d. 如果消息可以重試,則再次放入到RecordAcumulater,進行消息的重發。

    e. 如果消息不可進行重試,則將消息返回給Producer相關的RecordMetadata和Exception,此時Exception不為空。

   

    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM