apache kafka中國社區QQ群:162272557
轉載自同事(董重)寫得一篇wiki博客
Kafka Producer處理邏輯
Kafka Producer產生數據發送給Kafka Server,具體的分發邏輯及負載均衡邏輯,全部由producer維護。
Kafka結構圖

Kafka Producer默認調用邏輯

默認Partition邏輯
1、沒有key時的分發邏輯
每隔 topic.metadata.refresh.interval.ms 的時間,隨機選擇一個partition。這個時間窗口內的所有記錄發送到這個partition。
發送數據出錯后也會重新選擇一個partition
2、根據key分發
對key求hash,然后對partition數量求模
| Utils.abs(key.hashCode) % numPartitions |
如何獲取Partition的leader信息(元數據)
決定好發送到哪個Partition后,需要明確該Partition的leader是哪台broker才能決定發送到哪里。
具體實現位置
| kafka.client.ClientUtils#fetchTopicMetadata |
實現方案
1、從broker獲取Partition的元數據。由於Kafka所有broker存有所有的元數據,所以任何一個broker都可以返回所有的元數據
2、broker選取策略:將broker列表隨機排序,從首個broker開始訪問,如果出錯,訪問下一個
3、出錯處理:出錯后向下一個broker請求元數據
注意
- Producer是從broker獲取元數據的,並不關心zookeeper。
- broker發生變化后,producer獲取元數據的功能不能動態變化。
- 獲取元數據時使用的broker列表由producer的配置中的 metadata.broker.list 決定。該列表中的機器只要有一台正常服務,producer就能獲取元數據。
- 獲取元數據后,producer可以寫數據到非 metadata.broker.list 列表中的broker
錯誤處理
producer的send函數默認沒有返回值。出錯處理有EventHandler實現。
DefaultEventHandler的錯誤處理如下:
- 獲取出錯的數據
- 等待一個間隔時間,由配置 retry.backoff.ms 決定這段時間長短
- 重新獲取元數據
- 重新發送數據
出錯重試次數由配置 message.send.max.retries 決定
所有重試全部失敗時,DefaultEventHandler會拋出異常。代碼如下
| if(outstandingProduceRequests.size >0) { producerStats.failedSendRate.mark() val correlationIdEnd = correlationId.get() error("Failed to send requests for topics %s with correlation ids in [%d,%d]" .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","), correlationIdStart, correlationIdEnd-1)) thrownewFailedToSendMessageException("Failed to send messages after "+ config.messageSendMaxRetries +" tries.", null) } |
請注明轉載自:http://write.blog.csdn.NET/postedit/26687109
