Apache Kafka源碼分析 - KafkaApis


kafka apis反映出kafka broker server可以提供哪些服務,
broker server主要和producer,consumer,controller有交互,搞清這些api就清楚了broker server的所有行為

 

handleOffsetRequest

提供對offset的查詢的需求,比如查詢earliest,latest offset是什么,或before某個時間戳的offset是什么

   try {
        // ensure leader exists
        // 確定是否是leader replica,因為只有leader可以響應offset請求
        // 如果不是會拋異常
        val localReplica = if(!offsetRequest.isFromDebuggingClient)
          replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
        else
          replicaManager.getReplicaOrException(topicAndPartition.topic, topicAndPartition.partition)
        val offsets = {
          val allOffsets = fetchOffsets(replicaManager.logManager,  //獲取offsets列表
                                        topicAndPartition,
                                        partitionOffsetRequestInfo.time,
                                        partitionOffsetRequestInfo.maxNumOffsets)
          if (!offsetRequest.isFromOrdinaryClient) {
            allOffsets
          } else {
            val hw = localReplica.highWatermark.messageOffset
            if (allOffsets.exists(_ > hw))   //過濾掉hw以后的offsets,因為那些都不是應該用戶可見的
              hw +: allOffsets.dropWhile(_ > hw)
            else 
              allOffsets
          }
        }
        (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets))
      } catch {
        // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages
        // are typically transient and there is no value in logging the entire stack trace for the same
        case utpe: UnknownTopicOrPartitionException =>
          warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
               offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition, utpe.getMessage))
          (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), Nil) )
        case nle: NotLeaderForPartitionException =>
          warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
               offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage))
          (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) )
        case e: Throwable =>
          warn("Error while responding to offset request", e)
          (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )
      }

可以看到,當沒有找到topic->partition, 或partition leader,或其他異常的時候,就會導致返回offsets為nil
這樣在客戶端,經常通過獲取latestOffset來算spoutLag,會出現負值的情況

然后,fetchOffset調用fetchOffsetsBefore,來完成offset的獲取,

def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
    val segsArray = log.logSegments.toArray   //取出所有segments
    var offsetTimeArray: Array[(Long, Long)] = null
    if(segsArray.last.size > 0)   //看最新的segment,即真正被寫入的,是否有數據(Segment.size取出segment中log的bytes)
      offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
    else
      offsetTimeArray = new Array[(Long, Long)](segsArray.length)

    for(i <- 0 until segsArray.length)
      offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified) //對每個segment, 生成(baseOffset,最后更新的時間)
    if(segsArray.last.size > 0)
      offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds) //對於最新的segment邏輯不同,這里取的是log.logEndOffset,有點tricky,因為只有取latest offset時才會取到最后這個

    var startIndex = -1
    timestamp match {
      case OffsetRequest.LatestTime =>
        startIndex = offsetTimeArray.length - 1 //Latest,取的其實是log.logEndOffset
      case OffsetRequest.EarliestTime =>
        startIndex = 0  //earlist, 取的是第一個segment的baseOffset
      case _ =>  //對某一個時間,去offset
        var isFound = false
        debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
        startIndex = offsetTimeArray.length - 1
        while (startIndex >= 0 && !isFound) {  //從最后一個segment開始,向前遍歷
          if (offsetTimeArray(startIndex)._2 <= timestamp) //找到小於等於timestamp的segment 
            isFound = true
          else
            startIndex -=1
        }
    }

    val retSize = maxNumOffsets.min(startIndex + 1) //選擇返回幾個offset
    val ret = new Array[Long](retSize)
    for(j <- 0 until retSize) {
      ret(j) = offsetTimeArray(startIndex)._1  //返回當前segment,往前的所有segment的baseoffset
      startIndex -= 1
    }
    // ensure that the returned seq is in descending order of offsets
    ret.toSeq.sortBy(- _)
  }

 

handleProducerOrOffsetCommitRequest

這個用於處理Producer的請求,其實就是寫數據
名字有些tricky,和offsetCommit有什么關系,因為對於kafka的highlevel consumer,consumeroffset是被寫入kafka topic的,所以offsetCommitRequest其實就是一種特殊的producer request
你看他實際也是,用producerRequestFromOffsetCommit,將它轉換成了producer request

主要調用appendToLocalLog,核心邏輯

val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) //取到partition,如果沒有找到,拋異常
        val info = partitionOpt match {
          case Some(partition) =>
            partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks)  //將數據寫入
          case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
            .format(topicAndPartition, brokerId))
        }

Partition.appendMessagesToLeader

def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int=0) = {
    inReadLock(leaderIsrUpdateLock) {
      val leaderReplicaOpt = leaderReplicaIfLocal()    //是否是leader replica
      leaderReplicaOpt match {
        case Some(leaderReplica) =>
          val log = leaderReplica.log.get   //取得replica.log
          val minIsr = log.config.minInSyncReplicas  //配置的最小isr的size
          val inSyncSize = inSyncReplicas.size  //當前isr真實的size

          // Avoid writing to leader if there are not enough insync replicas to make it safe
          if (inSyncSize < minIsr && requiredAcks == -1) {
            throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
              .format(topic,partitionId,minIsr,inSyncSize))
          }

          val info = log.append(messages, assignOffsets = true) //將message append到log
          // 當有新數據產生了,需要去觸發delayedFetchRequest,consumer的fetch request當達到log end offset的時候是會block的,所以這里需要unblock
          // probably unblock some follower fetch requests since log end offset has been updated
          replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId))
          // we may need to increment high watermark since ISR could be down to 1
          maybeIncrementLeaderHW(leaderReplica)  //增加HW
          info
        case None => //如果找不到leader,往往是因為發生了遷移
          throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
            .format(topic, partitionId, localBrokerId))
      }
    }
  }

對於producer的寫策略,取決於配置的acker機制,

acks = 0,那沒有failover處理的,發就發了
acks = 1,當寫leader replica成功后就返回,其他的replica都是通過fetcher去同步的,所以kafka是異步寫
不過有數據丟失的風險,如果leader的數據沒有來得及同步,leader掛了,那么會丟失數據
acks = –1, 要等待所有的replicas都成功后,才能返回
所以這里需要產生DelayedProducerRequest,這個request只有在所有的follower都fetch成功后才能reponse
所以DelayedProducerRequest會在fetch request中被觸發unblock

   if(produceRequest.requiredAcks == 0) {
      //acks == 0,即不需要ack,沒啥需要特別做的
    } else if (produceRequest.requiredAcks == 1 ||  //acks == 1,即需要立即返回response
        produceRequest.numPartitions <= 0 ||  //沒有要求取數據,因為request里面的partition數為0
        numPartitionsInError == produceRequest.numPartitions) {  //所有的partition都取失敗了
      //這幾種情況都需要立即返回
      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
    } else { //這個地方沒加注釋,應該是ack == -1的情況
      // create a list of (topic, partition) pairs to use as keys for this delayed request
      val producerRequestKeys = produceRequest.data.keys.toSeq
      val statuses = localProduceResults.map(r =>
        r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
      val delayedRequest =  new DelayedProduce(
        producerRequestKeys,
        request,
        produceRequest.ackTimeoutMs.toLong,
        produceRequest,
        statuses,
        offsetCommitRequestOpt)

      // add the produce request for watch if it's not satisfied, otherwise send the response back
      val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
      if (satisfiedByMe)
        producerRequestPurgatory.respond(delayedRequest)
    }

 

handleFetchRequest

響應讀數據的請求,來自consumer或follower fetcher

def handleFetchRequest(request: RequestChannel.Request) {
    val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
    val dataRead = replicaManager.readMessageSets(fetchRequest)  //從replicaManager讀出數據

    // if the fetch request comes from the follower,
    // update its corresponding log end offset
    if(fetchRequest.isFromFollower)     //如果是follower的fetch request,更新follower的leo,還可能需要更新ISR
      recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset))

    // check if this fetch request can be satisfied right away
    val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum
    val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, dataAndOffset) =>
      errorIncurred || (dataAndOffset.data.error != ErrorMapping.NoError))
    //fetch request是可以delay的,但滿足如下要求時是需要立刻返回
    // send the data immediately if 1) fetch request does not want to wait
    //                              2) fetch request does not require any data 
    //                              3) has enough data to respond
    //                              4) some error happens while reading data
    if(fetchRequest.maxWait <= 0 ||  //不想等
       fetchRequest.numPartitions <= 0 ||  //沒有請求數據
       bytesReadable >= fetchRequest.minBytes ||  //讀到的數據已足夠
       errorReadingData) { //有異常
      debug("Returning fetch response %s for fetch request with correlation id %d to client %s"
        .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId))
      val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data))
      requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
    } else { //否則產生delay fetcher request,比如沒新數據的時候,后續有數據時會unblock這些request
      debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId,
        fetchRequest.clientId))
      // create a list of (topic, partition) pairs to use as keys for this delayed request
      val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq
      val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest,
        dataRead.mapValues(_.offset))

      // add the fetch request for watch if it's not satisfied, otherwise send the response back
      val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch)
      if (satisfiedByMe)
        fetchRequestPurgatory.respond(delayedFetch)
    }
  }

readMessageSets其實就是對每個topicAndPartititon調用readMessageSet

private def readMessageSet(topic: String,
                             partition: Int,
                             offset: Long,
                             maxSize: Int,
                             fromReplicaId: Int): (FetchDataInfo, Long) = {
    // check if the current broker is the leader for the partitions
    val localReplica = if(fromReplicaId == Request.DebuggingConsumerId)
      getReplicaOrException(topic, partition)
    else
      getLeaderReplicaIfLocal(topic, partition) //判斷是否是leader,非leader也不能響應fetch請求
    trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
    //我的理解,fromReplicaId只有從follower來的fetch請求才會有
    val maxOffsetOpt =
      if (Request.isValidBrokerId(fromReplicaId))
        None  //從follower來的fetch請求,不需要設最大的offset,有多少讀多少好了
      else  //對於普通的fetch請求,不能讀超出hw offset
        Some(localReplica.highWatermark.messageOffset)
    val fetchInfo = localReplica.log match {
      case Some(log) =>
        log.read(offset, maxSize, maxOffsetOpt)
      case None =>
        error("Leader for partition [%s,%d] does not have a local log".format(topic, partition))
        FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty)
    }
    (fetchInfo, localReplica.highWatermark.messageOffset)
  }

如果是follower fetch request,需要做recordFollowerLogEndOffsets更新follower的leo,

  private def recordFollowerLogEndOffsets(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) {
    debug("Record follower log end offsets: %s ".format(offsets))
    offsets.foreach {
      case (topicAndPartition, offset) =>
        replicaManager.updateReplicaLEOAndPartitionHW(topicAndPartition.topic, //更新LEO和HW
          topicAndPartition.partition, replicaId, offset) 
        //當一次follower fetch成功后,需要check之前的delayedProduceRequest是否可以response
        //因為ack=-1時,需要所有的follower都fetch成功后才能response
        // for producer requests with ack = -1, we need to check
        // if they can be unblocked after some follower's log end offsets have moved
        replicaManager.unblockDelayedProduceRequests(topicAndPartition)
    }
  }

最終調用到ReplicaManager.updateReplicaLEOAndPartitionHW,並修正改partition的ISR

def updateReplicaLEOAndPartitionHW(topic: String, partitionId: Int, replicaId: Int, offset: LogOffsetMetadata) = {
    getPartition(topic, partitionId) match {
      case Some(partition) =>
        partition.getReplica(replicaId) match {
          case Some(replica) =>
            replica.logEndOffset = offset //將follower的replica的leo設為當前取得的offset
            // check if we need to update HW and expand Isr
            partition.updateLeaderHWAndMaybeExpandIsr(replicaId) //更新ISR
            debug("Recorded follower %d position %d for partition [%s,%d].".format(replicaId, offset.messageOffset, topic, partitionId))
          case None =>
            throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" +
              " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId,
              offset.messageOffset, partition.assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId))

        }
      case None =>
        warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId))
    }
  }

最終調到partition.updateLeaderHWAndMaybeExpandIsr來更新ISR

def updateLeaderHWAndMaybeExpandIsr(replicaId: Int) {
    inWriteLock(leaderIsrUpdateLock) {
      // check if this replica needs to be added to the ISR
      leaderReplicaIfLocal() match {  //只有當前的replica是leader,才能更新ISR
        case Some(leaderReplica) =>
          val replica = getReplica(replicaId).get
          val leaderHW = leaderReplica.highWatermark
          // For a replica to get added back to ISR, it has to satisfy 3 conditions- //滿足下面3條就需要加到ISR中
          // 1. It is not already in the ISR
          // 2. It is part of the assigned replica list. See KAFKA-1097
          // 3. It's log end offset >= leader's high watermark 
          if (!inSyncReplicas.contains(replica) && //本身不在ISR中
            assignedReplicas.map(_.brokerId).contains(replicaId) && //在AR中
            replica.logEndOffset.offsetDiff(leaderHW) >= 0) { //當前的leo大於leader的HW, 說明已經追上了
            // expand ISR
            val newInSyncReplicas = inSyncReplicas + replica //擴展ISR
            info("Expanding ISR for partition [%s,%d] from %s to %s"
                 .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
            // update ISR in ZK and cache
            updateIsr(newInSyncReplicas)  //把ISR更新到zk
            replicaManager.isrExpandRate.mark()
          }
          maybeIncrementLeaderHW(leaderReplica) 增加hw
        case None => // nothing to do if no longer leader
      }
    }
  }

maybeIncrementLeaderHW

private def maybeIncrementLeaderHW(leaderReplica: Replica) {
    val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) //取出ISR中所有replica的leo列表
    val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) //取最小的作為新的hw,這樣可以保證只有在所有replica都完成同步的offset,才會設為hw
    val oldHighWatermark = leaderReplica.highWatermark //當前舊的hw
    if(oldHighWatermark.precedes(newHighWatermark)) {  //判斷新的hw一定要大於就的hw
      leaderReplica.highWatermark = newHighWatermark  //更新hw
      debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
      // some delayed requests may be unblocked after HW changed
      val requestKey = new TopicAndPartition(this.topic, this.partitionId)
      replicaManager.unblockDelayedFetchRequests(requestKey)  //hw變化,觸發unblockDelayedFetch很容易理解,有新數據,你之前block的讀請求,可以繼續讀數據
      replicaManager.unblockDelayedProduceRequests(requestKey) //也觸發unblock DelayedProduce,hw變化表示有數據完成所有replica同步,這樣可以reponse produce request
    } else {
      debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
        .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
    }
  }

 

handleControlledShutdownRequest

響應broker發來的shutdown請求,

def handleControlledShutdownRequest(request: RequestChannel.Request) {
    val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest]
    val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
    val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
      ErrorMapping.NoError, partitionsRemaining)
    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse)))
  }

單純的調用,controller.shutdownBroker,這種是優雅的shutdown,會做很多的准備工作

def shutdownBroker(id: Int) : Set[TopicAndPartition] = {

    if (!isActive()) { //如果當前broker不是controller,拋異常退出
      throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
    }

    controllerContext.brokerShutdownLock synchronized {
      info("Shutting down broker " + id)

      inLock(controllerContext.controllerLock) {
        if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) //如果broker不存在,拋異常
          throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))

        controllerContext.shuttingDownBrokerIds.add(id)  //將broker加入真正shuttingDown的broker list
        debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
        debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
      }

      val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] = //找出broker上所有的partition和replica
        inLock(controllerContext.controllerLock) {
          controllerContext.partitionsOnBroker(id)
            .map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))
        }

      allPartitionsAndReplicationFactorOnBroker.foreach {
        case(topicAndPartition, replicationFactor) =>
          // Move leadership serially to relinquish lock.
          inLock(controllerContext.controllerLock) {
            controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
              if (replicationFactor > 1) { //如果打開副本機制,=1就是沒有副本
                if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) { //如果是leader
                  // If the broker leads the topic partition, transition the leader and update isr. Updates zk and
                  // notifies all affected brokers
                  partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
                    controlledShutdownPartitionLeaderSelector)    //主動做leader重新選舉
                } else { //如果該broker上的replica不是leader,發送stopReplicas請求
                  // Stop the replica first. The state change below initiates ZK changes which should take some time
                  // before which the stop replica request should be completed (in most cases)
                  brokerRequestBatch.newBatch()
                  brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
                    topicAndPartition.partition, deletePartition = false)
                  brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)

                  // If the broker is a follower, updates the isr in ZK and notifies the current leader
                  replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
                    topicAndPartition.partition, id)), OfflineReplica)
                }
              }
            }
          }
      }
      def replicatedPartitionsBrokerLeads() = inLock(controllerContext.controllerLock) {
        trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
        controllerContext.partitionLeadershipInfo.filter {
          case (topicAndPartition, leaderIsrAndControllerEpoch) =>
            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
        }.map(_._1)
      }
      replicatedPartitionsBrokerLeads().toSet
    }
  }

這里做leader重新選舉用的是controlledShutdownPartitionLeaderSelector
這個選舉策略很簡單,
排除了shuttingDownBroker的產生新的ISR,然后選擇head作為新的leader

val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
val newLeaderOpt = newIsr.headOption

 

handleTopicMetadataRequest,handleUpdateMetadataRequest

就是處理讀取和更新MetadataCache的請求,

KafkaApis.metadataCache
首先看看MetaCache是什么?
/**
 *  A cache for the state (e.g., current leader) of each partition. This cache is updated through
 *  UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
 */
private[server] class MetadataCache {
  private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
    new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
  private var aliveBrokers: Map[Int, Broker] = Map()
  private val partitionMetadataLock = new ReentrantReadWriteLock()

可見cache為,Map[String, mutable.Map[Int, PartitionStateInfo],記錄每個topic,每個partition的PartitionStateInfo

case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
                              val allReplicas: Set[Int])

包含,leaderIsrAndControllerEpoch,記錄leader和isr
allReplicas記錄所有的replicas,即AR,注意這里只會記錄replica id,replica的具體情況,只會在replicaManager里面記錄
這里為每個partition記錄leaderIsrAndControllerEpoch,是不是有點浪費

而aliveBrokers,記錄所有活的brokers的id和ip:port

所以也比較簡單,這個cache在每個brokers之間是會被異步更新的,通過handleUpdateMetadataRequest

 

handleStopReplicaRequest

停止replica請求,一般是當broker stop或需要刪除某replica時被調用

處理很簡單,主要就是停止fetcher線程,並刪除partition目錄

stopReplicas

stopReplica,注意很多情況下是不需要真正刪除replica的,比如宕機

 

handleLeaderAndIsrRequest

處理leaderAndIsr的更新,這個和handleUpdateMetadataRequest的區別是,不光更新cache,需要真正去做replica的leader切換
主要調用,
replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager)
核心邏輯如下,前面那段主要是判斷這個request是否有效,根據controllerEpoch和leaderEpoch

def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = {
    replicaStateChangeLock synchronized {// 加鎖
      val responseMap = new collection.mutable.HashMap[(String, Int), Short]
      if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { // 檢查requset epoch
        (responseMap, ErrorMapping.StaleControllerEpochCode)
      } else {
        val controllerId = leaderAndISRRequest.controllerId
        val correlationId = leaderAndISRRequest.correlationId
        controllerEpoch = leaderAndISRRequest.controllerEpoch

        // First check partition's leader epoch
        // 前面只是檢查了request的epoch,但是還要檢查其中的每個partitionStateInfo中的leader epoch
        val partitionState = new HashMap[Partition, PartitionStateInfo]()
        leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) =>
          val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) // get或創建partition,partition只是邏輯存在,所以也是創建partition對象
          val partitionLeaderEpoch = partition.getLeaderEpoch()
          // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
          // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
          if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) { // local的partitionLeaderEpoch要小於request中的leaderEpoch,否則就是過時的request 
            if(partitionStateInfo.allReplicas.contains(config.brokerId)) // 判斷該partition是否被assigned給當前的broker
              partitionState.put(partition, partitionStateInfo) // 只將被分配到當前broker的partition放入partitionState,其中partition是當前的狀況,partitionStateInfo是request中最新的狀況
            else { }
          } else { // Received invalid LeaderAndIsr request
            // Otherwise record the error code in response
            responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode)
          }
        }

        //核心邏輯,判斷是否為leader或follower,分別調用makeLeaders和makeFollowers
        val partitionsTobeLeader = partitionState  //從partitionState中篩選出以該broker為leader replica的
          .filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
        val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)

        if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap)
        if (!partitionsToBeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap)

        // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
        // have been completely populated before starting the checkpointing there by avoiding weird race conditions
        if (!hwThreadInitialized) {
          startHighWaterMarksCheckPointThread() // 啟動HighWaterMarksCheckPointThread,hw很重要,需要定期存到磁盤,這樣failover的時候可以重新load
          hwThreadInitialized = true
        }
        replicaFetcherManager.shutdownIdleFetcherThreads() //關閉idle的fether,如果成為leader,就不需要fetch
        (responseMap, ErrorMapping.NoError)
      }
    }
  }

replicaManager里面有個allPartitions,記錄所有partition的情況,

private val allPartitions = new Pool[(String, Int), Partition]

其中Partition結構中,比較主要的數據是,

private val assignedReplicaMap = new Pool[Int, Replica]

這個記錄brokerid和replica的對應關系

def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
    var partition = allPartitions.get((topic, partitionId))
    if (partition == null) {
      allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, time, this))
      partition = allPartitions.get((topic, partitionId))
    }
    partition
  }

所以getOrCreatePartition,只是get當前replicaManager里面保存的該partiiton的情況

 

replicaManager.makeLeaders

關閉所有成為leader的replica對應的fetcher,然后關鍵是調用,

// Update the partition information to be the leader
      partitionState.foreach{ case (partition, partitionStateInfo) =>
        partition.makeLeader(controllerId, partitionStateInfo, correlationId)}

上面提到case (partition, partitionStateInfo)中,partition是replicaManager當前的情況,而partitionStateInfo中間放的是request的新的分配情況,

def makeLeader(controllerId: Int,
                 partitionStateInfo: PartitionStateInfo, correlationId: Int,
                 offsetManager: OffsetManager): Boolean = {
    inWriteLock(leaderIsrUpdateLock) {
      val allReplicas = partitionStateInfo.allReplicas
      val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
      val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
      // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
      // to maintain the decision maker controller's epoch in the zookeeper path
      controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
      // add replicas that are new
      allReplicas.foreach(replica => getOrCreateReplica(replica)) //request中allReplicas
      val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet //request中ISR中的所有replicas
      // remove assigned replicas that have been removed by the controller
      // assignedReplicas表示當前partition分配情況,需要根據allReplicas更新,如果replicaid不在allReplicas中,則需要從assignedReplicas中刪除
      (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
      inSyncReplicas = newInSyncReplicas // 用request中的數據來更新當前partition中的
      leaderEpoch = leaderAndIsr.leaderEpoch
      zkVersion = leaderAndIsr.zkVersion
      leaderReplicaIdOpt = Some(localBrokerId)
      // construct the high watermark metadata for the new leader replica
      val newLeaderReplica = getReplica().get
      newLeaderReplica.convertHWToLocalOffsetMetadata() //對於新建的replica,只有offset,需要從log讀取一下metadata
      // reset log end offset for remote replicas
// 理解這,關鍵知道leo什么時候被更新的,leo只有當follower成功fetch leader的數據時,才會更新leader.assignedReplicas.getReplica.leo
// 所以這里需要把leo給reset,因為如果有數據,可能是上次該broker稱為leader時的遺留數據 assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = LogOffsetMetadata.UnknownOffsetMetadata) // 上面把所有remote replica的leo重置了成UnknownOffsetMetadata(-1),那么在maybeIncrementLeaderHW中會取所有replica中最小的leo,如果除leader外有其他replica,因為剛被重置過,最小leo一定是-1
// -1一定小於當前的hw,所以hw其實不會被increment。只有當isr中只有leader時,那hw會被increment到leader.leo

maybeIncrementLeaderHW(newLeaderReplica) if (topic == OffsetManager.OffsetsTopicName) offsetManager.loadOffsetsFromLog(partitionId) true } }

這里還有個函數getOrCreateReplica,知道兩點,
a. 在這里當local replica不存在的時候,會真正的創建replica
b. 所有生成replica都是用這個函數,所以其他的replica list都是assignedReplicaMap中replica的引用,比如inSyncReplicas

def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
    val replicaOpt = getReplica(replicaId)//assignedReplicaMap.get(replicaId)
    replicaOpt match {
      case Some(replica) => replica
      case None =>
        if (isReplicaLocal(replicaId)) { //如果是local,並且在AR中沒有,那么需要創建這個replica
          val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic))
          val log = logManager.createLog(TopicAndPartition(topic, partitionId), config) //真正的創建replica文件
          val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) //需要讀出hw checkpoint
          val offsetMap = checkpoint.read
          if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
            warn("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
          val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset) //讀出hw,和loe求min,防止hw大於loe
          val localReplica = new Replica(replicaId, this, time, offset, Some(log))
          addReplicaIfNotExists(localReplica)
        } else { //
          val remoteReplica = new Replica(replicaId, this, time)
          addReplicaIfNotExists(remoteReplica)
        }
        getReplica(replicaId).get
    }
  }

 

replicaManager.makeFollowers

var partitionsToMakeFollower: Set[Partition] = Set() //記錄leader發生變化的partition
//調用partition.makeFollower
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager)) // 僅僅當partition的leader發生變化時才返回true,因為如果不變,不需要做任何操作
    partitionsToMakeFollower += partition
//由於leader已發生變化,需要把舊的fetcher刪除
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_)))

//由於leader已發生變化,所以之前和舊leader同步的數據可能和新的leader是不一致的,但hw以下的數據,大家都是一致的,所以就把hw以上的數據truncate掉,防止不一致
logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)

if (isShuttingDown.get()) {
    //真正shuttingDown,就不要再加fetcher
}
else {
    // we do not need to check if the leader exists again since this has been done at the beginning of this process
    val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => //
      new TopicAndPartition(partition) -> BrokerAndInitialOffset(
        leaders.find(_.id == partition.leaderReplicaIdOpt.get).get,
        partition.getReplica().get.logEndOffset.messageOffset)).toMap

    replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) //增加新的fetcher
}

 

partition.makeFollower
比較簡單,只是更新assignedReplicas和ISR

def makeFollower(controllerId: Int,
                   partitionStateInfo: PartitionStateInfo,
                   correlationId: Int, offsetManager: OffsetManager): Boolean = {
    inWriteLock(leaderIsrUpdateLock) {
      val allReplicas = partitionStateInfo.allReplicas
      val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
      val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
      val newLeaderBrokerId: Int = leaderAndIsr.leader
      // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
      // to maintain the decision maker controller's epoch in the zookeeper path
      controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
      // add replicas that are new
      allReplicas.foreach(r => getOrCreateReplica(r))
      // remove assigned replicas that have been removed by the controller
      (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
      inSyncReplicas = Set.empty[Replica] // 將isr置空,不同於makeLeader
      leaderEpoch = leaderAndIsr.leaderEpoch
      zkVersion = leaderAndIsr.zkVersion

      if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) { // 判斷replica leader是否發生了變化
        false
      }
      else {
        leaderReplicaIdOpt = Some(newLeaderBrokerId) // 如果發生變化,則更新leader
        true
      }
    }
  }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM