kafka發送消息流程


1. 簡單流程概述

  A. 流程描述

    a. producer先從zookeeper的"/brokers/.../state"節點找到該partition的leader
    b. producer將消息發送給該leader
    c. leader將消息寫入到本地的log
    d. follower從leader拉取消息,寫入本地的log后向leader發送ack
    e. leader收到ISR中的replica的ack后,增加HW(high watermark,最后commit的offset)並向producer發送ACK。

 

2. 消息發送的方式

  Kafka的Producer發送消息采用的是異步發送的方式,且在新版的Kafka ApI中只有異步的發送方式。但是可以通過異步發送的API達到同步的效果。

3. Kafka發送消息的具體流程

  

 

  在消息發送過程中,涉及到兩個線程,以及一個線程共享變量-RecordAccumulator.

  兩個線程:

    一是主線程,負責將消息進行封裝和加工發送給消息中間件(RecordAccumulator)

    二是send線程,負責從消息中間件中拉取數據發送到主題(Topic)的對應分區(Partition)

  A. main線程
    a.生產將要發送的數據封裝成ProducerRecord對象,目的是發送到消息中間件
    b.中間要經過攔截器列表、序列化器和分區器將消息發送到消息中間件
    c.RecordAccumulator中有多個隊列,與topic的分區相對應。消息發送時直接發送到分區對應的RecordAccumulator隊列中

  B. sender線程

    a.當RecordAccumulator中攢夠一批數據后,即達到指定量的數據之后,Sender線程將這一批數據拉取並發送給Topic。

       控制參數:batch.size=>只有數據積累到batch.size之后,sender才會發送數據。

    b.同時,如果RecordAccumulator中隊列遲遲到不到指定量的數據時,會等到一定時長時發送。

       控制參數:linger.ms=>如果數據遲遲未達到batch.size,sender等待linger.time之后就會發送數據


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM