Kafka 之 async producer (1)


問題 

  1. 很多條消息是怎么打包在一起的?
  2. 如果消息是發給很多不同的topic的, async producer如何在按batch發送的同時區分topic的
  3. 它是如何用key來做partition的?
  4. 是如何實現對消息成批量的壓縮的?

async producer是將producer.type設為async時啟用的producer

此時,調用send方法的線程和實際完成消息發送的線程是分開的。

當調用java API中producer的send方法時,最終會調用kafka.producer.Producer的send方法。在kafka.producer.Producer類中,會根據producer.type配置使用不同的方法發送消息。

def send(messages: KeyedMessage[K,V]*) {
    lock synchronized {
      if (hasShutdown.get)
        throw new ProducerClosedException
      recordStats(messages)
      sync match {
        case true => eventHandler.handle(messages)
        case false => asyncSend(messages)
      }
    }
  }

  當async時,會使用asyncSend。asyncSend方法會根據“queue.enqueue.timeout.ms”配置選項采用BlockingQueue的put或offer方法把消息放入kafka.producer.Producer持有的一個LinkedBlockingQueue。一個ProducerSendThread線程從queue里取消息,成批量的用eventHandler來處理。

  當使用sync時,對每條消息會直接使用eventHandler來處理。這就是為什么前一種方式會被稱為"asynchornization",而這一種會稱為”synchronization"

  private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)

  在kafka.producer.Producer構造時,會檢查"producer.type“,如果是asnyc,就會開啟一個送發線程。

  config.producerType match {
    case "sync" =>
    case "async" =>
      sync = false
      producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
                                                       queue,
                                                       eventHandler,
                                                       config.queueBufferingMaxMs,
                                                       config.batchNumMessages,
                                                       config.clientId)
      producerSendThread.start()

  現在有了一個隊列,一個發送線程 。看來這個ProducerSendThread是來完成大部分發送的工作,而"async"的特性都主要都是由它來實現。

   這個線程的run方法實現為:

  override def run {
    try {
      processEvents
    }catch {
      case e: Throwable => error("Error in sending events: ", e)
    }finally {
      shutdownLatch.countDown
    }
  }

  看來實際工作由processEvents方法來實現嘍

  private def processEvents() {
    var lastSend = SystemTime.milliseconds //上一次發送的時間,每發送一次會更新
    var events = new ArrayBuffer[KeyedMessage[K,V]] //一起發送的消息的集合,發送完后也會更新
    var full: Boolean = false  //是否消息的數量已大於指定的batch大小(batch大小指多少消息在一起發送,由"batch.num.messages"確定)

    // drain the queue until you get a shutdown command
    //構造一個流,它的每個元素為queue.poll(timeout)取出來的值。
    //timeout的值是這么計算的:lastSend+queueTime表示下次發送的時間,再減去當前時間,就是最多還能等多長時間,也就是poll阻塞的最長時間
    //takeWhile接受的函數參數決定了當item是shutdownCommand時,流就結束了。這個shutdownCommand是shutdown()方法執行時,往隊列里發的一個特殊消息
    Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
                      .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
      currentQueueItem => 										//對每一條處理的消息
        val elapsed = (SystemTime.milliseconds - lastSend)  //距上次發送已逝去的時間,只記錄在debug里,並不會以它作為是否發送的條件
        // check if the queue time is reached. This happens when the poll method above returns after a timeout and
        // returns a null object
        val expired = currentQueueItem == null //當poll方法超時,就返回一個null,說明一定已經是時候發送這批消息了。當時間到了,poll(timeout)中timeout為負值時,poll一定返回null
        if(currentQueueItem != null) {
          trace("Dequeued item for topic %s, partition key: %s, data: %s"
              .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
          events += currentQueueItem //如果當前消息不為空,就附加在發送集合里
        }

        // check if the batch size is reached
        full = events.size >= batchSize //是否當前發送集合的大小已經大於batch size

        if(full || expired) {  //如果發送集合有了足夠多的消息或者按時間計可以發送了,就發送
          if(expired)
            debug(elapsed + " ms elapsed. Queue time reached. Sending..")
          if(full)
            debug("Batch full. Sending..")
          // if either queue time has reached or batch size has reached, dispatch to event handler
          tryToHandle(events)
          lastSend = SystemTime.milliseconds //更新lastSend,將一個新的ArrayBuffer的引用賦給events
          events = new ArrayBuffer[KeyedMessage[K,V]]
        }
    }
    // send the last batch of events
    tryToHandle(events) //當shutdownCommand遇到時,流會終結。此時之前的消息只要不是恰好發送完,就還會有一些在events里,做為最后一批發送。
    if(queue.size > 0) //些時producerSendThread已經不再發消息了,但是queue里若還有沒發完的,就是一種異常情況
      throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
        .format(queue.size))
  }

  看來Scala的Stream幫了不少忙。shutdown方法將一個特殊的shutdownCommand發給queue,也正好使得這個Stream可以用takeWhile方法正確結束。

  好吧,搞了這么多,這個ProducerSendThread只有打包的邏輯 ,並沒有處理topic、partition、壓縮的邏輯,這些邏輯都在另一個類中。明天再來看看這個handler


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM