kafka負載均衡相關資料收集(三)


 

apache kafka系列之Producer處理邏輯

下文是轉載的,原文鏈接地址:點這兒
【轉】 Kafka ProducerKafka Producer處理邏輯kafka生產者處理邏輯apache kafka系列

apache kafka中國社區QQ群:162272557

 

轉載自同事(董重)寫得一篇wiki博客

Kafka Producer處理邏輯

 

Kafka Producer產生數據發送給Kafka Server,具體的分發邏輯及負載均衡邏輯,全部由producer維護。

Kafka結構圖

Kafka Producer默認調用邏輯

默認Partition邏輯

1、沒有key時的分發邏輯

每隔 topic.metadata.refresh.interval.ms 的時間,隨機選擇一個partition。這個時間窗口內的所有記錄發送到這個partition。

發送數據出錯后也會重新選擇一個partition

2、根據key分發

對key求hash,然后對partition數量求模

Utils.abs(key.hashCode) % numPartitions

如何獲取Partition的leader信息(元數據)

決定好發送到哪個Partition后,需要明確該Partition的leader是哪台broker才能決定發送到哪里。

具體實現位置

kafka.client.ClientUtils#fetchTopicMetadata

 實現方案

1、從broker獲取Partition的元數據。由於Kafka所有broker存有所有的元數據,所以任何一個broker都可以返回所有的元數據

2、broker選取策略:將broker列表隨機排序,從首個broker開始訪問,如果出錯,訪問下一個

3、出錯處理:出錯后向下一個broker請求元數據

注意

  • Producer是從broker獲取元數據的,並不關心zookeeper。
  • broker發生變化后,producer獲取元數據的功能不能動態變化。
  • 獲取元數據時使用的broker列表由producer的配置中的 metadata.broker.list 決定。該列表中的機器只要有一台正常服務,producer就能獲取元數據。
  • 獲取元數據后,producer可以寫數據到非 metadata.broker.list 列表中的broker

錯誤處理

producer的send函數默認沒有返回值。出錯處理有EventHandler實現。

DefaultEventHandler的錯誤處理如下:

  • 獲取出錯的數據
  • 等待一個間隔時間,由配置 retry.backoff.ms 決定這段時間長短
  • 重新獲取元數據
  • 重新發送數據

出錯重試次數由配置 message.send.max.retries 決定

所有重試全部失敗時,DefaultEventHandler會拋出異常。代碼如下

 

if(outstandingProduceRequests.size >0) {

  producerStats.failedSendRate.mark()

  val correlationIdEnd = correlationId.get()

  error("Failed to send requests for topics %s with correlation ids in [%d,%d]"

    .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","),

    correlationIdStart, correlationIdEnd-1))

  thrownewFailedToSendMessageException("Failed to send messages after "+ config.messageSendMaxRetries +" tries.", null)

}


 

請注明轉載自:http://write.blog.csdn.NET/postedit/26687109


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM