kafka consumer 分區reblance算法


轉載請注明原創地址 http://www.cnblogs.com/dongxiao-yang/p/6238029.html

     最近需要詳細研究下kafka reblance過程中分區計算的算法細節,網上搜了部分說法,感覺比較晦澀且不太易懂,還是自己摳源碼比較簡便一點。

kafka reblance計算部分代碼如下:

class RangeAssignor() extends PartitionAssignor with Logging {

  def assign(ctx: AssignmentContext) = {
    val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
    val partitionAssignment =
      new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
    for (topic <- ctx.myTopicThreadIds.keySet) {
 val curConsumers = ctx.consumersForTopic(topic) val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic) val nPartsPerConsumer = curPartitions.size / curConsumers.size val nConsumersWithExtraPart = curPartitions.size % curConsumers.size

      info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
        " for topic " + topic + " with consumers: " + curConsumers)

      for (consumerThreadId <- curConsumers) {
 val myConsumerPosition = curConsumers.indexOf(consumerThreadId) assert(myConsumerPosition >= 0) val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) /**
         *   Range-partition the sorted partitions to consumers for better locality.
         *  The first few consumers pick up an extra partition, if any.
         */
        if (nParts <= 0)
          warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
        else {
          for (i <- startPart until startPart + nParts) { val partition = curPartitions(i)
            info(consumerThreadId + " attempting to claim partition " + partition)
            // record the partition ownership decision
            val assignmentForConsumer = partitionAssignment.getAndMaybePut(consumerThreadId.consumer)
            assignmentForConsumer += (TopicAndPartition(topic, partition) -> consumerThreadId)
          }
        }
      }
    }

 

  def getPartitionsForTopics(topics: Seq[String]): mutable.Map[String, Seq[Int]] = {
    getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap =>
      val topic = topicAndPartitionMap._1
      val partitionMap = topicAndPartitionMap._2
      debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
   (topic -> partitionMap.keys.toSeq.sortWith((s,t) => s < t))
    }
  }

 

  def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[ConsumerThreadId]] = {
    val dirs = new ZKGroupDirs(group)
    val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir)
    val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]]
    for (consumer <- consumers) {
      val topicCount = TopicCount.constructTopicCount(group, consumer, this, excludeInternalTopics)
      for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {
        for (consumerThreadId <- consumerThreadIdSet)
          consumersPerTopicMap.get(topic) match {
            case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)
            case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))
          }
      }
    }
    for ( (topic, consumerList) <- consumersPerTopicMap )
   consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))
    consumersPerTopicMap
  }

 

 

計算過程主要由上述高亮代碼部分實現,舉例說明,一個擁有十個分區的topic,相同group擁有三個consumerid為aaa,ccc,bbb的消費者

1 由后兩段代碼可知,獲取consumerid列表和partition分區列表都是已經排好序的,所以

curConsumers=(aaa,bbb,ccc)

curPartitions=(0,1,2,3,4,5,6,7,8,9)

2

nPartsPerConsumer=10/3  =3

nConsumersWithExtraPart=10%3  =1

3 假設當前客戶端id為aaa

myConsumerPosition= curConsumers.indexof(aaa) =0

4 計算分區范圍

startPart= 3*0+0.min(1) = 0

nParts = 3+(if (0 + 1 > 1) 0 else 1)=3+1=4

所以aaa對應的分區號為[0,4),即0,1,2,3前面四個分區

同理可得bbb對應myConsumerPosition=1,對應分區4,5,6中間三個分區

ccc對應myConsumerPosition=2,對應7,8,9最后三個分區。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM