Kafka Producer產生數據發送給Kafka Server,具體的分發邏輯及負載均衡邏輯,全部由producer維護。
1.Kafka Producer默認調用邏輯
1.1 默認Partition邏輯
1、沒有key時的分發邏輯
每隔 topic.metadata.refresh.interval.ms
的時間,隨機選擇一個partition。這個時間窗口內的所有記錄發送到這個partition。發送數據出錯后也會重新選擇一個partition
2、根據key分發
對key求hash,然后對partition數量求模
Utils.abs(key.hashCode) % numPartitions
1.2 如何獲取Partition的leader信息(元數據):
決定好發送到哪個Partition后,需要明確該Partition的leader是哪台broker才能決定發送到哪里。
具體實現位置:kafka.client.ClientUtils#fetchTopicMetadata
實現方案
- 從broker獲取Partition的元數據。由於Kafka所有broker存有所有的元數據,所以任何一個broker都可以返回所有的元數據
- broker選取策略:將broker列表隨機排序,從首個broker開始訪問,如果出錯,訪問下一個
- 出錯處理:出錯后向下一個broker請求元數據
注意
- Producer是從broker獲取元數據的,並不關心zookeeper。
- broker發生變化后,producer獲取元數據的功能不能動態變化。
- 獲取元數據時使用的broker列表由producer的配置中的 metadata.broker.list 決定。該列表中的機器只要有一台正常服務,producer就能獲取元數據。
獲取元數據后,producer可以寫數據到非 metadata.broker.list 列表中的broker
錯誤處理: producer的send函數默認沒有返回值。出錯處理有EventHandler實現。
DefaultEventHandler的錯誤處理如下:
- 獲取出錯的數據
- 等待一個間隔時間,由配置
retry.backoff.ms
決定這段時間長短 - 重新獲取元數據
- 重新發送數據
出錯重試次數由配置 message.send.max.retries 決定.所有重試全部失敗時,DefaultEventHandler會拋出異常。代碼如下:
if(outstandingProduceRequests.size > 0) {
producerStats.failedSendRate.mark()
val correlationIdEnd = correlationId.get()
error("Failed to send requests for topics %s with correlation ids in [%d,%d]"
.format(outstandingProduceRequests.map(_.topic).toSet.mkString(","),
correlationIdStart, correlationIdEnd-1))
throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
}
2.Producer的發送方式剖析
Kafka提供了Producer類作為java producer的api,該類有sync和async兩種發送方式。
2.1 Producer sync發送消息流程
2.2 Producer async發送消息流程
Kafka中Producer異步發送消息是基於同步發送消息的接口來實現的,異步發送消息的實現很簡單,客戶端消息發送過來以后,先放入到一個BlockingQueue隊列中然后就返回了。Producer再開啟一個線程(ProducerSendThread)不斷從隊列中取出消息,然后調用同步發送消息的接口將消息發送給Broker。下面是具體的調用流程:
代碼流程如下:
Producer:當new Producer(new ProducerConfig())
, 其底層實現,實際會產生兩個核心類的實例:Producer
、DefaultEventHandler
。在創建的同時,會默認new一個ProducerPool
,即我們每new一個java的Producer類,就會有創建Producer
、EventHandler
和ProducerPool
,ProducerPool
是連接不同kafka broker的池,初始連接個數有broker.list
參數決定。
調用producer.send方法流程:
當應用程序調用producer.send
方法時,其內部其實調的是eventhandler.handle(message)
方法,eventHandler
會首先序列化該消息,
eventHandler.serialize(events)-->dispatchSerializedData()-->partitionAndCollate()-->send()-->SyncProducer.send()
調用邏輯解釋:當客戶端應用程序調用producer
發送消息messages
時(既可以發送單條消息,也可以發送List多條消息),調用eventhandler.serialize
首先序列化所有消息,序列化操作用戶可以自定義實現Encoder接口
; 下一步調用partitionAndCollate()
: 根據topics
的messages
進行分組操作,messages分配給dataPerBroker(多個不同的Broker的Map)
,根據不同Broker調用不同的SyncProducer.send
批量發送消息數據,SyncProducer包裝了nio網絡操作信息。
partitionAndCollate()
方法詳細作用:獲取所有partitions的leader所在leaderBrokerId(就是在該partiionid的leader分布在哪個broker上),
創建一個HashMap<int, Map<TopicAndPartition, List<KeyedMessage<K,Message>>>>
,把messages
按照brokerId分組組裝數據,然后為SyncProducer
分別發送消息作准備工作。
名稱解釋:partKey:分區關鍵字,當客戶端應用程序實現Partitioner接口時,傳入參數key為分區關鍵字,根據key和numPartitions,返回分區(partitions)索引。記住partitions分區索引是從0開始的。
3.Producer平滑擴容機制
如果開發過producer客戶端
代碼,會知道metadata.broker.list
參數,它的含義是kafak broker的ip和port列表,producer初始化時,就連接這幾個broker,這時大家會有疑問,producer支持kafka cluster新增broker節點?它又沒有監聽zk broker節點或從zk中獲取broker信息,答案是肯定的,producer可以支持平滑擴容broker,是通過定時與現有的metadata.broker.list
通信,獲取新增broker信息,然后把新建的SyncProducer
放入ProducerPool
中。等待后續應用程序調用。
DefaultEventHandler類
中初始化實例化BrokerPartitionInfo類
,然后定期brokerPartitionInfo.updateInfo
方法,DefaultEventHandler部分代碼如下:
def handle(events: Seq[KeyedMessage[K,V]]) {
......
while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
if (topicMetadataRefreshInterval >= 0 &&
SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
sendPartitionPerTopicCache.clear()
topicMetadataToRefresh.clear
lastTopicMetadataRefreshTime = SystemTime.milliseconds
}
outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
if (outstandingProduceRequests.size > 0) {
info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
//休眠時間,多長時間刷新一次
Thread.sleep(config.retryBackoffMs)
// 生產者定期請求刷新最新topics的broker元數據信息
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
.....
}
}
}
有空可以細看:
ClientUtils.fetchTopicMetadata方法
、ProducerPool的updateProducer方法
4.重難點理解
刷新metadata
並不僅在第一次初始化時做。為了能適應kafka broker運行中因為各種原因掛掉、paritition改變等變化,
eventHandler會定期的再去刷新一次該metadata,刷新的間隔用參數topic.metadata.refresh.interval.ms
定義,默認值是10分鍾。
這里有三點需要強調:
- 客戶端調用send, 才會新建
SyncProducer
,只有調用send才會去定期刷新metadata - 在每次取metadata時,kafka會新建一個
SyncProducer
去取metadata,邏輯處理完后再close。 - 根據當前SyncProducer(一個Broker的連接)取得的最新的完整的metadata,刷新
ProducerPool
中到broker的連接. - 每10分鍾的刷新會直接重新把到每個broker的socket連接重建,意味着在這之后的第一個請求會有幾百毫秒的延遲。如果不想要該延遲,把
topic.metadata.refresh.interval.ms
值改為-1,這樣只有在發送失敗時,才會重新刷新。Kafka的集群中如果某個partition所在的broker掛了,可以檢查錯誤后重啟重新加入集群,手動做rebalance,producer的連接會再次斷掉,直到rebalance完成,那么刷新后取到的連接着中就會有這個新加入的broker。
說明:每個SyncProducer實例化對象會建立一個socket連接(這個好理解,SyncProducer是threadsafe的,每個SyncProducer的這個socket連接對應一個Broker。它會被放到ProducerPool
中去)。
4.特別注意
在ClientUtils.fetchTopicMetadata
調用完成后,回到BrokerPartitionInfo.updateInfo
繼續執行,在其末尾,pool會根據上面取得的最新的metadata建立所有的SyncProducer,即Socket通道
producerPool.updateProducer(topicsMetadata)
在ProducerPool中,SyncProducer的數目是由該topic的partition數目控制的,即每一個SyncProducer對應一個broker,內部封了一個到該broker的socket連接。每次刷新時,會把已存在SyncProducer給close掉,即關閉socket連接,然后新建SyncProducer,即新建socket連接,去覆蓋老的。
如果不存在,則直接創建新的。
5.美團的一個Producer應用問題
問題:一個topic的partition的所有broker宕機(這個可能性太小了。)當Kafka集群中某個Partition所有存活的節點都失效或掛掉。會造成Producer嘗試重新發送message.send.max.retries(默認值為3)次后拋出Exception,每次嘗試都會休眠一定時間(默認值為100ms)。用戶捕捉到異常其結果,停止發送會阻塞,繼續發送消息會丟失。
簡單提一下解決思路
Kafka中默認發送消息方式不變,給用戶提供一種可選擇方式,增加一個消息發送失效轉移的開關,當Producer發送到目標Partition的(所有副本本來存活的)節點都失效或掛掉,就轉發到其他Partition上。