kafka rebalance 部分分區沒有owner


 轉發請注明原創地址http://www.cnblogs.com/dongxiao-yang/p/6234673.html     

      最近業務同學反饋kafka上線的時候某個topic的部分分區一直沒有owner注冊上,監控界面形式如圖,其中分區5和7無法被消費者注冊到,重啟客戶端程序rebalance依舊是這兩個分區沒有被消費。

     由於最近業務方機房大遷移,第一反應是網絡連通性,但是消費端程序挨個測試網絡沒有問題,而且即使通過增加或者減少consumer數量,甚至消費端只開一個客戶端,rebalance結束后依然會有分區沒有owner,而且隨着消費端個數的變化,無owner的分區號也發生了變化,整個rebalance過程客戶端程序沒有任何錯誤日志。

    這種情況還得去過客戶端日志,在只起了兩個客戶端的時候發現有這么一段:

 
         

16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], Consumer xxx rebalancing the following partitions: ArrayBuffer(5, 7, 3, 8, 1, 4, 6, 2, 0, 9) for topic onlineAdDemographicPredict with consumers: List(aaa-0, yyy-0, xxx-0)
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 attempting to claim partition 2
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 attempting to claim partition 0
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 attempting to claim partition 9
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 successfully owned partition 0 for topic onlineAdDemographicPredict
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 successfully owned partition 9 for topic onlineAdDemographicPredict
16/12/29 15:52:56 INFO consumer.ZookeeperConsumerConnector: [xxx], xxx-0 successfully owned partition 2 for topic onlineAdDemographicPredict

 

ArrayBuffer里分區10個分區都全了說明客戶端讀取所有Partirtion個數是沒有問題的,出問題的是with consumers: List()這個信息,此時業務方只起了xxx和yyy兩個客戶端,

但是Consumer確拿到了三個client-id,然后經過計算自己正好需要注冊三個分區2,0,9,剩下的分區就沒人認領了。

查找日志對應kafka源碼如下

 

class RangeAssignor() extends PartitionAssignor with Logging {

  def assign(ctx: AssignmentContext) = {
    val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()

    for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) {
      val curConsumers = ctx.consumersForTopic(topic)
      val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)

      val nPartsPerConsumer = curPartitions.size / curConsumers.size
      val nConsumersWithExtraPart = curPartitions.size % curConsumers.size

      info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions + " for topic " + topic + " with consumers: " + curConsumers)

      for (consumerThreadId <- consumerThreadIdSet) {
        val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
        assert(myConsumerPosition >= 0)
        val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
        val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)

        /**
         *   Range-partition the sorted partitions to consumers for better locality.
         *  The first few consumers pick up an extra partition, if any.
         */
        if (nParts <= 0)
          warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
        else {
          for (i <- startPart until startPart + nParts) {
            val partition = curPartitions(i)
            info(consumerThreadId + " attempting to claim partition " + partition)
            // record the partition ownership decision
            partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)
          }
        }
      }
    }

    partitionOwnershipDecision
  }
}

 

object PartitionAssignor {
  def createInstance(assignmentStrategy: String) = assignmentStrategy match {
    case "roundrobin" => new RoundRobinAssignor()
    case _ => new RangeAssignor()
  }
}

class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) {
  val myTopicThreadIds: collection.Map[String, collection.Set[ConsumerThreadId]] = {
    val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkClient, excludeInternalTopics)
    myTopicCount.getConsumerThreadIdsPerTopic
  }

  val partitionsForTopic: collection.Map[String, Seq[Int]] =
    ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIds.keySet.toSeq)

  val consumersForTopic: collection.Map[String, List[ConsumerThreadId]] = ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics)

  val consumers: Seq[String] = ZkUtils.getConsumersInGroup(zkClient, group).sorted
}

 

class ZKGroupDirs(val group: String) {
  def consumerDir = ConsumersPath
  def consumerGroupDir = consumerDir + "/" + group
  def consumerRegistryDir = consumerGroupDir + "/ids"
  def consumerGroupOffsetsDir = consumerGroupDir + "/offsets"
  def consumerGroupOwnersDir = consumerGroupDir + "/owners"
}



  def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean): mutable.Map[String, List[ConsumerThreadId]] = {
    val dirs = new ZKGroupDirs(group)
    val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir)
    val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]]
    for (consumer <- consumers) {
      val topicCount = TopicCount.constructTopicCount(group, consumer, this, excludeInternalTopics)
      for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {
        for (consumerThreadId <- consumerThreadIdSet)
          consumersPerTopicMap.get(topic) match {
            case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)
            case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))
          }
      }
    }
    for ( (topic, consumerList) <- consumersPerTopicMap )
      consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))
    consumersPerTopicMap
  }

 

 def constructTopicCount(group: String, consumerId: String, zkUtils: ZkUtils, excludeInternalTopics: Boolean) : TopicCount = {
    val dirs = new ZKGroupDirs(group) val topicCountString = zkUtils.readData(dirs.consumerRegistryDir + "/" + consumerId)._1
    var subscriptionPattern: String = null
    var topMap: Map[String, Int] = null
    try {
      Json.parseFull(topicCountString) match {
        case Some(m) =>
          val consumerRegistrationMap = m.asInstanceOf[Map[String, Any]]
          consumerRegistrationMap.get("pattern") match {
            case Some(pattern) => subscriptionPattern = pattern.asInstanceOf[String]
            case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
          }
          consumerRegistrationMap.get("subscription") match {
            case Some(sub) => topMap = sub.asInstanceOf[Map[String, Int]]
            case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
          }
        case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
      }
    } catch {
      case e: Throwable =>

    

      通過上面着色的代碼一路跟下來,可以看出來Consumer獲取group所有客戶端數量邏輯是讀取zk上 /kafkachroot/consumers/{groupid}/ids路徑下

所有存在的consumerid,然后讀取這些consumerid對應的topic信息,最終返回一個[topic, List[ConsumerThreadId]] 的二維數組。

      於是跑到zk上看節點結構,發現在出問題的group/ids 路徑下果然存在aaa這個臨時節點,通知應用方發現原來有個很老的程序之前也用同樣的groupid消費過這個topic,但是現在業務程序很久沒人管處在一個半假死的狀態,所以這個臨時節點一直不過期,導致后來使用同樣group消費同樣的每次都會感知到一個多余的消費段存在,所以每次都有部分分區無法被消費。

 

附:

 1  Consumer Rebalance的算法

2  本文討論的版本建立在kafka 0.8.2-beta版本前提上,新出的版本目前沒有研究,可能情況不符。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM