Kafka 0.10 Producer網絡流程簡述


1.Producer 網絡請求

1.1 Producer Client角度

KafkaProducer主要靠Sender來發送數據給Broker。

  • Sender: 該線程handles the sending of produce requests to the Kafka cluster. 該線程發送metadata requests來更新它感知的整個集群的視圖;另外,主要負責發送produce請求到相關的broker。
  • Selector的主要目的是網絡事件的 loop 循環,通過調用selector.poll,不斷輪詢每個Channel上讀寫事件。
  • SocketChannel注冊到Selector,Selector輪詢到事件之后,讓SocketChanel和ServerSocketChannel進行通信,做實際的IO等操作。

關注三個方法

  • 1.注冊事件(connect,accept,read,write)
  • 2.輪詢IO是否就緒
  • 3.執行實際IO等操作。

思考一下,connect, accept, read, write 這4種事件,分別在這3個階段對應的函數。

Producer發送請求的調用順序:
KafkaProducer -- Sender -- KafkaClient(NetworkClient) -- Selector -- KafkaChannel

1.2 一次請求的詳細過程

  1. KafkaProducer 構造函數啟動了Sender線程
  2. Sender.run 調用 NetworkClient的send()函數,調用了selector.send(Send send), 但這個時候數據並沒有真的發送出去,只是暫存在了selector內部相對應的KafkaChannel里面。

KafkaChannel先進行了檢查,是否存在send的目的地,這是一個2輪詢。確保有相應的KafkaChannel之后,調用this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); 1注冊 write事件。接下來就交給Selector進行2輪詢3實際操作,詳細方法是Selector.poll

  1. KafkaProducer.send 調用doSend方法,首先調用waitOnMetadata獲取metaData信息, 最后調用的是nioSelector.wakeUp(),讓阻塞在select()的Selector立即返回,准備IO事件。(在send之前,會先讀取metadata。如果metadata讀不到,會一直阻塞在那,直到超時,拋出TimeoutException)

1.3 Selector處理注冊的事件

小知識

  • 一個Selector可以處理多個Channel。
  • SelectionKey用來記錄一個Channel上的事件集合,每個Channel對應一個SelectionKey。
  • SelectionKey也是Selector和Channel之間的關聯,通過SelectionKey可以取到對應的Selector和Channel。

poll 和 pollSelectionKeys 的關鍵流程(正常情況的處理流程)

我們以write事件coming來舉例,當有事件到來的時候,

  1. 找到該事件對應的SocketChannel(即KafkaChannel),為了后續與對應的ServerSocketChannel進行通信

如果這個KafkaChannel是可用的,在channel不是ready的狀態下,會channel.prepare進行初始化, 里面包括了權限認證。(會調用下面的這個類進行權限認證,這里也是出過問題的地方:SaslServerAuthenticator#handleKafkaRequest)

  1. 輪詢事件的類型,connect, read, write

write事件的情況下,調用Send send = channel.write(); // write--階段3: 實際的IO操作, 讀取完數據后,就取消write事件.

期間出現過任何異常,都會關閉這個KafkaChanel(上面的授權都沒有了),常見的是IOException異常,Server端日志經常出現。

KafkaChannel的授權創建是在Selector的connect、register方法中

2 同步和異步

Producer有同步發送和異步發送2種策略。在以前的Kafka client api實現中,同步和異步是分開實現的。

而在0.9以后的版本中,同步發送其實是通過異步發送間接實現,其接口如下:

public class KafkaProducer<K, V> implements Producer<K, V> {
...
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)  //異步發送接口
     {
     ...
     }
}
  • 要實現同步發送,只要在拿到返回的Future對象之后,直接調用get()就可以了。

2.1 基本思路

異步發送的基本思路就是:send的時候,KafkaProducer把消息放到本地的消息隊列RecordAccumulator,然后一個后台線程Sender不斷循環,把消息發給Kafka集群。

要實現這個,還得有一個前提條件:就是KafkaProducer/Sender都需要獲取集群的配置信息Metadata。

所謂Metadata:Topic/Partion與broker的映射關系:每一個Topic的每一個Partition,得知道其對應的broker列表是什么,其中leader是誰,follower是誰。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM