1. 簡單流程概述
A. 流程描述
a. producer先從zookeeper的"/brokers/.../state"節點找到該partition的leader
b. producer將消息發送給該leader
c. leader將消息寫入到本地的log
d. follower從leader拉取消息,寫入本地的log后向leader發送ack
e. leader收到ISR中的replica的ack后,增加HW(high watermark,最后commit的offset)並向producer發送ACK。
2. 消息發送的方式
Kafka的Producer發送消息采用的是異步發送的方式,且在新版的Kafka ApI中只有異步的發送方式。但是可以通過異步發送的API達到同步的效果。
3. Kafka發送消息的具體流程
在消息發送過程中,涉及到兩個線程,以及一個線程共享變量-RecordAccumulator.
兩個線程:
一是主線程,負責將消息進行封裝和加工發送給消息中間件(RecordAccumulator)
二是send線程,負責從消息中間件中拉取數據發送到主題(Topic)的對應分區(Partition)
A. main線程
a.生產將要發送的數據封裝成ProducerRecord對象,目的是發送到消息中間件
b.中間要經過攔截器列表、序列化器和分區器將消息發送到消息中間件
c.RecordAccumulator中有多個隊列,與topic的分區相對應。消息發送時直接發送到分區對應的RecordAccumulator隊列中
B. sender線程
a.當RecordAccumulator中攢夠一批數據后,即達到指定量的數據之后,Sender線程將這一批數據拉取並發送給Topic。
控制參數:batch.size=>只有數據積累到batch.size之后,sender才會發送數據。
b.同時,如果RecordAccumulator中隊列遲遲到不到指定量的數據時,會等到一定時長時發送。
控制參數:linger.ms=>如果數據遲遲未達到batch.size,sender等待linger.time之后就會發送數據