Kafka 之 async producer (2) kafka.producer.async.DefaultEventHandler


上次留下來的問題

  1. 如果消息是發給很多不同的topic的, async producer如何在按batch發送的同時區分topic的
  2. 它是如何用key來做partition的?
  3. 是如何實現對消息成批量的壓縮的?
  • async producer如何在按batch發送的同時區分topic的

  這個問題的答案是: DefaultEventHandler會把發給它的一個batch的消息(實際上是Seq[KeyedMessage[K,V]]類型)拆開,確定每條消息該發送給哪個broker。對發給每個broker的消息,會按topic和partition來組合。即:拆包=>根據metaData組裝

這個功能是通過partitionAndCollate方法實現的

def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]]

  它返回一個Option對象,這個Option的元素是一個Map,Key是brokerId,value是發給這個broker的消息。對每一條消息,先確定它要被發給哪一個topic的哪個parition。然后確定這個parition的leader broker,然后去Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]這個Map里找到對應的broker,然后把這條消息填充給對應的topic+partition對應的Seq[KeyedMessage[K,Message]]。這樣就得到了最后的結果。這個結果表示了哪些消息要以怎樣的結構發給一個broker。真正發送的時候,會按照brokerId的不同,把打包好的消息發給不同的broker。

首先,看一下kafka protocol里對於Producer Request結構的說明:

ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
   RequiredAcks => int16
   Timeout => int32
   Partition => int32
   MessageSetSize => int32

發給一個broker的消息就是這樣的結構。

同時,在kafka wiki里對於Produce API 有如下說明:

The produce API is used to send message sets to the server. For efficiency it allows sending message sets intended for many topic partitions in a single request.

即在一個produce request里,可以同時發消息給多個topic+partition的組合。當然一個produce request是發給一個broker的。

使用

send(brokerid, messageSetPerBroker)

  把消息set發給對應的brokerid。

  • 它是如何用key來做partition的?

首先看下KeyedMessage類的定義:

case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
  if(topic == null)
    throw new IllegalArgumentException("Topic cannot be null.") 
  def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)
  def this(topic: String, key: K, message: V) = this(topic, key, key, message)
  def partitionKey = {
    if(partKey != null)
      partKey
    else if(hasKey)
      key
    else
      null  
  }
  def hasKey = key != null
}

  當使用三個參數的構造函數時, partKey會等於key。partKey是用來做partition的,但它不會最當成消息的一部分被存儲。

前邊提到了,在確定一個消息應該發給哪個broker之前,要先確定它發給哪個partition,這樣才能根據paritionId去找到對應的leader所在的broker。

val topicPartitionsList = getPartitionListForTopic(message) //獲取這個消息發送給的topic的partition信息
val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)//確定這個消息發給哪個partition

  注意傳給getPartition方法中時使用的是partKey。getPartition方法為:

  private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
    val numPartitions = topicPartitionList.size
    if(numPartitions <= 0)
      throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
    val partition =
      if(key == null) {
        // If the key is null, we don't really need a partitioner
        // So we look up in the send partition cache for the topic to decide the target partition
        val id = sendPartitionPerTopicCache.get(topic)
        id match {
          case Some(partitionId) =>
            // directly return the partitionId without checking availability of the leader,
            // since we want to postpone the failure until the send operation anyways
            partitionId
          case None =>
            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId)
            partitionId
        }
      } else
        partitioner.partition(key, numPartitions)

  當partKey為null時,首先它從sendParitionPerTopicCache里取這個topic緩存的partitionId,這個cache是一個Map.如果之前己經使用sendPartitionPerTopicCache.put(topic, partitionId)緩存了一個,就直接取出它。否則就隨機從可用的partitionId里取出一個,把它緩存到sendParitionPerTopicCache。這就使得當sendParitionPerTopicCache里有一個可用的partitionId時,很多消息都會被發送給這同一個partition。因此若所有消息的partKey都為空,在一段時間內只會有一個partition能收到消息。之所以會說“一段”時間,而不是永久,是因為handler隔一段時間會重新獲取它發送過的消息對應的topic的metadata,這個參數通過topic.metadata.refresh.interval.ms來設置。當它重新獲取metadata之后,會消空一些緩存,就包括這個sendParitionPerTopicCache。因此,接下來就會生成另一個隨機的被緩存的partitionId。

  if (topicMetadataRefreshInterval >= 0 && 
          SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {  //若該refresh topic metadata 了,do the refresh
        Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
        sendPartitionPerTopicCache.clear()
        topicMetadataToRefresh.clear
        lastTopicMetadataRefreshTime = SystemTime.milliseconds
      }

  當partKey不為null時,就用傳給handler的partitioner的partition方法,根據partKey和numPartitions來確定這個消息被發給哪個partition。注意這里的numPartition是topicPartitionList.size獲取的,有可能會有parition不存在可用的leader。這樣的問題將留給send時解決。實際上發生這種情況時,partitionAndCollate會將這個消息分派給brokerId為-1的broker。而send方法會在發送前判斷brokerId

    if(brokerId < 0) {
      warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))
      messagesPerTopic.keys.toSeq

  當brokerId<0時,就返回一個非空的Seq,包括了所有沒有leader的topic+partition的組合,如果重試了指定次數還不能發送,將最終導致handle方法拋出一個 FailedToSendMessageException異常。

  • 是如何實現對消息成批量的壓縮的?

這個是在

private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]])

中處理。

說明為:

/** enforce the compressed.topics config here.
* If the compression codec is anything other than NoCompressionCodec,
* Enable compression only for specified topics if any
* If the list of compressed topics is empty, then enable the specified compression codec for all topics
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/

即,如果沒有設置壓縮,就所有topic對應的消息集都不壓縮。如果設置了壓縮,並且沒有設置對個別topic啟用壓縮,就對所有topic都使用壓縮;否則就只對設置了壓縮的topic壓縮。

在這個gruopMessageToSet中,並不有具體的壓縮邏輯。而是返回一個ByteBufferMessageSet對象。它的注釋為:

/**
* A sequence of messages stored in a byte buffer
*
* There are two ways to create a ByteBufferMessageSet
*
* Option 1: From a ByteBuffer which already contains the serialized message set. Consumers will use this method.
*
* Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.

 看來它是對於消息集進行序列化和反序列化的工具。

在它的實現里用到了CompressionFactory對象。從它的實現里可以看到Kafka只支持GZIP和Snappy兩種壓縮方式。

compressionCodec match {
      case DefaultCompressionCodec => new GZIPOutputStream(stream)
      case GZIPCompressionCodec => new GZIPOutputStream(stream)
      case SnappyCompressionCodec => 
        import org.xerial.snappy.SnappyOutputStream
        new SnappyOutputStream(stream)
      case _ =>
        throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM