kafka多線程消費topic的問題


案例:

  topic:my-topic,分區:6

  消費者:部署三台機器,每台機器上面開啟6個線程消費。

  消費結果:只有一台機器可以正常消費,另外兩台機器直接輸出六條告警日志:

No broker partitions consumed by consumer thread my-topic-group_adfc6be4a509-1496976531798-d70f9a43-3 for topic my-topic
No broker partitions consumed by consumer thread my-topic-group_adfc6be4a509-1496976531798-d70f9a43-1 for topic my-topic
No broker partitions consumed by consumer thread my-topic-group_adfc6be4a509-1496976531798-d70f9a43-2 for topic my-topic
No broker partitions consumed by consumer thread my-topic-group_adfc6be4a509-1496976531798-d70f9a43-4 for topic my-topic
No broker partitions consumed by consumer thread my-topic-group_adfc6be4a509-1496976531798-d70f9a43-6 for topic my-topic
No broker partitions consumed by consumer thread my-topic-group_adfc6be4a509-1496976531798-d70f9a43-5 for topic my-topic

在源碼分析之前,先給個圖示吧,花了兩個小時才畫完。

 

源碼分析:

 

for (topic <- ctx.myTopicThreadIds.keySet) {
   // curConsumers = 6*3 = 18,當前消費者數量
      val curConsumers = ctx.consumersForTopic(topic)
   // curPartitions = 6,當前分區數量
      val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)
   // nPartsPerConsumer = 6/18 = 0,平均每個消費者能分到的分區數【取整】
      val nPartsPerConsumer = curPartitions.size / curConsumers.size
   /*
         nConsumersWithExtraPart = 6%18 = 6,如果分割不均勻(消費者和分區數不是倍數關系),那么前N個消費者將會消費一個額外的分區
         這里得出結果是6,那么其含義可以理解為前6個消費者可以比其他消費多消費一個分區,前6個各占有一個分區,后面12個消費者各占有0個分區
       */
      val nConsumersWithExtraPart = curPartitions.size % curConsumers.size

      info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
        " for topic " + topic + " with consumers: " + curConsumers)

      for (consumerThreadId <- curConsumers) {
    // myConsumerPosition是指當前consumerThreadId在消費者集合中的位置
        val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
        assert(myConsumerPosition >= 0)
    /*
             startPart = 0*6 + myConsumerPosition.min(6),min函數表示取兩個數值中小的一個,那么startPart的值就分成了兩個部分:[0-5] -> 0-5,[6-17] -> 6
             分區升序排列之后,startPart表示當前消費者從哪個分區開始消費。
         */
        val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
    /*
             nParts = 0 + (myConsumerPosition + 1 > 6 ) ? 0 : 1 ,這里nParts的值也分成了兩部分,[0-5] -> 1 , [6-17] -> 0
            如果消費者數量小於分區數量,則前nConsumersWithExtraPart個消費者的分區數量會是2,nParts只會有三種值【0,1,2】,
            表示當前消費者可以消費分區的數量。
            
        */
        val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)

        /**
         *   Range-partition the sorted partitions to consumers for better locality.
         *  The first few consumers pick up an extra partition, if any.
         */
    // 這里myConsumerPosition在[6-17]的comsumer都會直接告警,也就是上文提到的【額外部分消費者】
        if (nParts <= 0)
          warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
        else {
      // 這里myConsumerPosition在[0-5]的comsumer進入topic分區分配
          for (i <- startPart until startPart + nParts) {
            val partition = curPartitions(i)
            info(consumerThreadId + " attempting to claim partition " + partition)
            // record the partition ownership decision
            val assignmentForConsumer = partitionAssignment.getAndMaybePut(consumerThreadId.consumer)
            assignmentForConsumer += (TopicAndPartition(topic, partition) -> consumerThreadId)
          }
        }
      }
}

 

結果:topic里面的每個partition只會由一個線程消費,在分配的時候就已經指定好,如果有消費者線程加入或者退出,則會重新開始分配。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM