【原創】大叔問題定位分享(40)kafka reassign卡住


kafka reassign過程詳見:reassign過程

最近kafka集群發生reassign過程卡住的情況,問題發生過程如下
問題日志

2021-07-16 10:35:41,193 INFO kafka.controller.KafkaController: [Controller id=3] 0/2 replicas have caught up with the leader for partition kafka-9 being reassigned. Replica(s) 3,2 still need to catch up

對應代碼邏輯

  case class PartitionReassignmentIsrChange(partition: TopicPartition) extends ControllerEvent {
    override def state: ControllerState = ControllerState.PartitionReassignment

    override def process(): Unit = {
      if (!isActive) return
      // check if this partition is still being reassigned or not
      controllerContext.partitionsBeingReassigned.get(partition).foreach { reassignedPartitionContext =>
        val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet
        zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match {
          case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR
            val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
            val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
            if (caughtUpReplicas == reassignedReplicas) {
              // resume the partition reassignment process
              info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
                s"partition $partition being reassigned. Resuming partition reassignment")
              onPartitionReassignment(partition, reassignedPartitionContext)
            }
            else {
              info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
                s"partition $partition being reassigned. Replica(s) " +
                s"${(reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")} still need to catch up")
            }
          case None => error(s"Error handling reassignment of partition $partition to replicas " +
                         s"${reassignedReplicas.mkString(",")} as it was never created")
        }
      }
    }
  }

因為此時replica計划的節點不在isr里,只能等待,問題是一直在等待

等待的broker里的日志如下:

2021-07-16 10:35:41,150 WARN state.change.logger: [Broker id=3] Ignoring LeaderAndIsr request from controller 3 with correlation id 969 epoch 44 for partition kafka-9 as the local replica for the partition is in an offline log directory

對應代碼

        leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
          val partition = getOrCreatePartition(topicPartition)
          val partitionLeaderEpoch = partition.getLeaderEpoch
          if (partition eq ReplicaManager.OfflinePartition) {
            stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
              s"controller $controllerId with correlation id $correlationId " +
              s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
              "partition is in an offline log directory")
            responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
          }

因為partition為offline狀態,LeaderAndIsr請求實際不會執行,所以不會加入isr,然后reassign就一直等待

那partition為什么會offline,繼續跟進日志發現

2021-07-16 00:25:17,836 INFO kafka.log.LogManager: Stopping serving logs in dir /data/kafka/data

而且當時有大量的磁盤滿報錯

java.io.IOException: No space left on device

因為之前發生過磁盤滿,所以kafka將這個目錄標記為offline,所有在這個目錄里的partition都會被標記offline,LeaderAnsIsr請求不會執行

可以從controller里搜索 ‘still need to catch up’ 看看有哪些broker一直在等待,然后到這些服務器上看啟動之后是否有 ‘Stopping serving logs’ 日志,如果有就會卡住,解決方法就是將等待的broker重啟,然后reassign就可以開始


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM