Kafka 0.8 Producer處理邏輯


Kafka Producer產生數據發送給Kafka Server,具體的分發邏輯及負載均衡邏輯,全部由producer維護。

1.Kafka Producer默認調用邏輯

image

1.1 默認Partition邏輯

1、沒有key時的分發邏輯

每隔 topic.metadata.refresh.interval.ms 的時間,隨機選擇一個partition。這個時間窗口內的所有記錄發送到這個partition。發送數據出錯后也會重新選擇一個partition

2、根據key分發

對key求hash,然后對partition數量求模
Utils.abs(key.hashCode) % numPartitions

1.2 如何獲取Partition的leader信息(元數據):

決定好發送到哪個Partition后,需要明確該Partition的leader是哪台broker才能決定發送到哪里。

具體實現位置:kafka.client.ClientUtils#fetchTopicMetadata

實現方案

  1. 從broker獲取Partition的元數據。由於Kafka所有broker存有所有的元數據,所以任何一個broker都可以返回所有的元數據
  2. broker選取策略:將broker列表隨機排序,從首個broker開始訪問,如果出錯,訪問下一個
  3. 出錯處理:出錯后向下一個broker請求元數據

注意

  • Producer是從broker獲取元數據的,並不關心zookeeper。
  • broker發生變化后,producer獲取元數據的功能不能動態變化。
  • 獲取元數據時使用的broker列表由producer的配置中的 metadata.broker.list 決定。該列表中的機器只要有一台正常服務,producer就能獲取元數據。
    獲取元數據后,producer可以寫數據到非 metadata.broker.list 列表中的broker

錯誤處理: producer的send函數默認沒有返回值。出錯處理有EventHandler實現。

DefaultEventHandler的錯誤處理如下:

  • 獲取出錯的數據
  • 等待一個間隔時間,由配置 retry.backoff.ms 決定這段時間長短
  • 重新獲取元數據
  • 重新發送數據

出錯重試次數由配置 message.send.max.retries 決定.所有重試全部失敗時,DefaultEventHandler會拋出異常。代碼如下:

if(outstandingProduceRequests.size > 0) {
        producerStats.failedSendRate.mark()
        val correlationIdEnd = correlationId.get()
        error("Failed to send requests for topics %s with correlation ids in [%d,%d]"
          .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","),
          correlationIdStart, correlationIdEnd-1))
        throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
      }

2.Producer的發送方式剖析

Kafka提供了Producer類作為java producer的api,該類有sync和async兩種發送方式。

2.1 Producer sync發送消息流程

image

2.2 Producer async發送消息流程

image

Kafka中Producer異步發送消息是基於同步發送消息的接口來實現的,異步發送消息的實現很簡單,客戶端消息發送過來以后,先放入到一個BlockingQueue隊列中然后就返回了。Producer再開啟一個線程(ProducerSendThread)不斷從隊列中取出消息,然后調用同步發送消息的接口將消息發送給Broker。下面是具體的調用流程:
image

代碼流程如下:

Producer:當new Producer(new ProducerConfig()), 其底層實現,實際會產生兩個核心類的實例:ProducerDefaultEventHandler。在創建的同時,會默認new一個ProducerPool,即我們每new一個java的Producer類,就會有創建ProducerEventHandlerProducerPoolProducerPool是連接不同kafka broker的池,初始連接個數有broker.list參數決定。

調用producer.send方法流程:

當應用程序調用producer.send方法時,其內部其實調的是eventhandler.handle(message)方法,eventHandler會首先序列化該消息,
eventHandler.serialize(events)-->dispatchSerializedData()-->partitionAndCollate()-->send()-->SyncProducer.send()

調用邏輯解釋:當客戶端應用程序調用producer發送消息messages時(既可以發送單條消息,也可以發送List多條消息),調用eventhandler.serialize首先序列化所有消息,序列化操作用戶可以自定義實現Encoder接口; 下一步調用partitionAndCollate(): 根據topicsmessages進行分組操作,messages分配給dataPerBroker(多個不同的Broker的Map),根據不同Broker調用不同的SyncProducer.send批量發送消息數據,SyncProducer包裝了nio網絡操作信息

partitionAndCollate()方法詳細作用:獲取所有partitions的leader所在leaderBrokerId(就是在該partiionid的leader分布在哪個broker上),
創建一個HashMap<int, Map<TopicAndPartition, List<KeyedMessage<K,Message>>>> ,把messages按照brokerId分組組裝數據,然后為SyncProducer分別發送消息作准備工作。

名稱解釋:partKey:分區關鍵字,當客戶端應用程序實現Partitioner接口時,傳入參數key為分區關鍵字,根據key和numPartitions,返回分區(partitions)索引。記住partitions分區索引是從0開始的。

3.Producer平滑擴容機制

如果開發過producer客戶端代碼,會知道metadata.broker.list參數,它的含義是kafak broker的ip和port列表,producer初始化時,就連接這幾個broker,這時大家會有疑問,producer支持kafka cluster新增broker節點?它又沒有監聽zk broker節點或從zk中獲取broker信息,答案是肯定的,producer可以支持平滑擴容broker,是通過定時與現有的metadata.broker.list通信,獲取新增broker信息,然后把新建的SyncProducer放入ProducerPool中。等待后續應用程序調用。

DefaultEventHandler類中初始化實例化BrokerPartitionInfo類,然后定期brokerPartitionInfo.updateInfo方法,DefaultEventHandler部分代碼如下:

def handle(events: Seq[KeyedMessage[K,V]]) {  
  ......  
  while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {  
    topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)  
    if (topicMetadataRefreshInterval >= 0 &&  
        SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {  
      Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))  
      sendPartitionPerTopicCache.clear()  
      topicMetadataToRefresh.clear  
      lastTopicMetadataRefreshTime = SystemTime.milliseconds  
    }  
    outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)  
    if (outstandingProduceRequests.size > 0) {  
      info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))  
      //休眠時間,多長時間刷新一次  
      Thread.sleep(config.retryBackoffMs)  
      // 生產者定期請求刷新最新topics的broker元數據信息  
      Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))  
      .....  
    }  
  }  
  
}  

有空可以細看:ClientUtils.fetchTopicMetadata方法ProducerPool的updateProducer方法

4.重難點理解

刷新metadata並不僅在第一次初始化時做。為了能適應kafka broker運行中因為各種原因掛掉、paritition改變等變化,
eventHandler會定期的再去刷新一次該metadata,刷新的間隔用參數topic.metadata.refresh.interval.ms定義,默認值是10分鍾。
這里有三點需要強調:

  • 客戶端調用send, 才會新建SyncProducer,只有調用send才會去定期刷新metadata
  • 在每次取metadata時,kafka會新建一個SyncProducer去取metadata,邏輯處理完后再close。
  • 根據當前SyncProducer(一個Broker的連接)取得的最新的完整的metadata,刷新ProducerPool中到broker的連接.
  • 每10分鍾的刷新會直接重新把到每個broker的socket連接重建,意味着在這之后的第一個請求會有幾百毫秒的延遲。如果不想要該延遲,把topic.metadata.refresh.interval.ms值改為-1,這樣只有在發送失敗時,才會重新刷新。Kafka的集群中如果某個partition所在的broker掛了,可以檢查錯誤后重啟重新加入集群,手動做rebalance,producer的連接會再次斷掉,直到rebalance完成,那么刷新后取到的連接着中就會有這個新加入的broker。

說明:每個SyncProducer實例化對象會建立一個socket連接(這個好理解,SyncProducer是threadsafe的,每個SyncProducer的這個socket連接對應一個Broker。它會被放到ProducerPool中去)。

4.特別注意

ClientUtils.fetchTopicMetadata調用完成后,回到BrokerPartitionInfo.updateInfo繼續執行,在其末尾,pool會根據上面取得的最新的metadata建立所有的SyncProducer,即Socket通道

producerPool.updateProducer(topicsMetadata)
在ProducerPool中,SyncProducer的數目是由該topic的partition數目控制的,即每一個SyncProducer對應一個broker,內部封了一個到該broker的socket連接。每次刷新時,會把已存在SyncProducer給close掉,即關閉socket連接,然后新建SyncProducer,即新建socket連接,去覆蓋老的。
如果不存在,則直接創建新的。

5.美團的一個Producer應用問題

問題:一個topic的partition的所有broker宕機(這個可能性太小了。)當Kafka集群中某個Partition所有存活的節點都失效或掛掉。會造成Producer嘗試重新發送message.send.max.retries(默認值為3)次后拋出Exception,每次嘗試都會休眠一定時間(默認值為100ms)。用戶捕捉到異常其結果,停止發送會阻塞,繼續發送消息會丟失。

簡單提一下解決思路

Kafka中默認發送消息方式不變,給用戶提供一種可選擇方式,增加一個消息發送失效轉移的開關,當Producer發送到目標Partition的(所有副本本來存活的)節點都失效或掛掉,就轉發到其他Partition上。

image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM