1. 簡單流程概述
A. 流程描述
a. producer先從zookeeper的"/brokers/.../state"節點找到該partition的leader
b. producer將消息發送給該leader
c. leader將消息寫入到本地的log
d. follower從leader拉取消息,寫入本地的log后向leader發送ack
e. leader收到ISR中的replica的ack后,增加HW(high watermark,最后commit的offset)並向producer發送ACK。
2. 消息發送的方式
Kafka的Producer發送消息采用的是異步發送的方式,且在新版的Kafka ApI中只有異步的發送方式。
但是可以通過異步發送的API達到同步的效果。
3. Kafka發送消息的具體流程
在消息發送過程中,涉及到兩個線程,以及一個線程共享變量-RecordAccumulator.
兩個線程:
一是主線程,負責將消息進行封裝和加工發送給消息中間件(RecordAccumulator)
二是send線程,負責從消息中間件中拉取數據發送到主題(Topic)的對應分區(Partition)
A. 流程描述
main線程
a. 生產將要發送的數據封裝成ProducerRecord對象,目的是發送到消息中間件
b. 中間要經過攔截器列表、序列化器和分區器將消息發送到消息中間件
c. RecordAccumulator中有多個隊列,與topic的分區相對應。消息發送時直接發送到分區對應的RecordAccumulator隊列中
sender線程
d. 當RecordAccumulator中攢夠一批數據后,即達到指定量的數據之后,Sender線程將這一批數據拉取並發送給Topic。
控制參數:batch.size=>只有數據積累到batch.size之后,sender才會發送數據。
e. 同時,如果RecordAccumulator中隊列遲遲到不到指定量的數據時,會等到一定時長時發送。
控制參數:linger.ms=>如果數據遲遲未達到batch.size,sender等待linger.time之后就會發送數據
4. Kafka的重試機制
消息的重試的實現需要在API的send方法中完成回調函數,回調函數會在producer收到ack時調用。
回調函數為異步調用,相當於另一個線程去實現。
該方法有兩個參數,分別是RecordMetadata和Exception。
如果Exception為空,說明消息發送成功,如果Exception不為空,說明消息發送失敗。
在回調函數中除了記錄日之外,基本不能實現其他有意義的事情。
注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。
A. 流程描述
a. sender線程發送數據到Kafka Server,當Kafka處理數據結束后,會判斷數據是否寫入成功。
b. 如果數據處理成功,則返回給Producer相關RecordMetadata和Exception,此時Exception為空
c. 如果數據處理失敗,則再次判定是否可以進行重發消息。
d. 如果消息可以重試,則再次放入到RecordAcumulater,進行消息的重發。
e. 如果消息不可進行重試,則將消息返回給Producer相關的RecordMetadata和Exception,此時Exception不為空。