1.Producer 網絡請求
1.1 Producer Client角度
KafkaProducer主要靠Sender
來發送數據給Broker。
- Sender: 該線程handles the sending of produce requests to the Kafka cluster. 該線程發送metadata requests來更新它感知的整個集群的視圖;另外,主要負責發送produce請求到相關的broker。
- Selector的主要目的是網絡事件的 loop 循環,通過調用selector.poll,不斷輪詢每個Channel上讀寫事件。
- SocketChannel注冊到Selector,Selector輪詢到事件之后,讓SocketChanel和ServerSocketChannel進行通信,做實際的IO等操作。
關注三個方法
- 1.注冊事件(connect,accept,read,write)
- 2.輪詢IO是否就緒
- 3.執行實際IO等操作。
思考一下,connect, accept, read, write 這4種事件,分別在這3個階段對應的函數。
Producer發送請求的調用順序:
KafkaProducer -- Sender -- KafkaClient(NetworkClient) -- Selector -- KafkaChannel
1.2 一次請求的詳細過程
KafkaProducer 構造函數
啟動了Sender線程
Sender.run
調用NetworkClient的send()
函數,調用了selector.send(Send send)
, 但這個時候數據並沒有真的發送出去,只是暫存在了selector內部相對應的KafkaChannel里面。
KafkaChannel先進行了檢查,是否存在send的目的地,這是一個2輪詢。確保有相應的KafkaChannel之后,調用this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
1注冊 write事件。接下來就交給Selector
進行2輪詢 和3實際操作,詳細方法是Selector.poll
。
KafkaProducer.send
調用doSend
方法,首先調用waitOnMetadata
獲取metaData信息, 最后調用的是nioSelector.wakeUp()
,讓阻塞在select()
的Selector立即返回,准備IO事件。(在send之前,會先讀取metadata。如果metadata讀不到,會一直阻塞在那,直到超時,拋出TimeoutException)
1.3 Selector
處理注冊的事件
小知識
- 一個Selector可以處理多個Channel。
- SelectionKey用來記錄一個Channel上的事件集合,每個Channel對應一個SelectionKey。
- SelectionKey也是Selector和Channel之間的關聯,通過SelectionKey可以取到對應的Selector和Channel。
poll 和 pollSelectionKeys
的關鍵流程(正常情況的處理流程)
我們以write事件coming來舉例,當有事件到來的時候,
- 找到該事件對應的SocketChannel(即KafkaChannel),為了后續與對應的ServerSocketChannel進行通信
如果這個KafkaChannel是可用的,在channel不是ready的狀態下,會channel.prepare
進行初始化, 里面包括了權限認證。(會調用下面的這個類進行權限認證,這里也是出過問題的地方:SaslServerAuthenticator#handleKafkaRequest)
- 輪詢事件的類型,
connect, read, write
write事件的情況下,調用Send send = channel.write();
// write--階段3: 實際的IO操作, 讀取完數據后,就取消write事件.
期間出現過任何異常,都會關閉這個KafkaChanel(上面的授權都沒有了),常見的是IOException
異常,Server端日志經常出現。
KafkaChannel的授權創建是在Selector的connect、register
方法中
2 同步和異步
Producer有同步發送和異步發送2種策略。在以前的Kafka client api實現中,同步和異步是分開實現的。
而在0.9以后的版本中,同步發送其實是通過異步發送間接實現,其接口如下:
public class KafkaProducer<K, V> implements Producer<K, V> {
...
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) //異步發送接口
{
...
}
}
- 要實現同步發送,只要在拿到返回的Future對象之后,直接調用get()就可以了。
2.1 基本思路
異步發送的基本思路就是:send的時候,KafkaProducer把消息放到本地的消息隊列RecordAccumulator,然后一個后台線程Sender不斷循環,把消息發給Kafka集群。
要實現這個,還得有一個前提條件:就是KafkaProducer/Sender都需要獲取集群的配置信息Metadata。
所謂Metadata:Topic/Partion與broker的映射關系:每一個Topic的每一個Partition,得知道其對應的broker列表是什么,其中leader是誰,follower是誰。