kafka 副本復制的幾個參數


producer

acks 0, 1, -1

0 客戶端不需要響應,如果 broker 寫入異常,直接關閉連接
1 分區 leader 寫入 FileChannel 即返回
-1 和 min.insync.replicas 參數聯動

broker

min.insync.replicas = 1
// 這個參數可配置為 topic 級別

 

// kafka.cluster.Partition#checkEnoughReplicasReachOffset
def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
  leaderReplicaIfLocal match {
    case Some(leaderReplica) =>
      // keep the current immutable replica list reference
      val curInSyncReplicas = inSyncReplicas

      def numAcks = curInSyncReplicas.count { r =>
        if (!r.isLocal)
          if (r.logEndOffset.messageOffset >= requiredOffset) {
            trace(s"Replica ${r.brokerId} received offset $requiredOffset")
            true
          }
          else
            false
        else
          true /* also count the local (leader) replica */
      }

      trace(s"$numAcks acks satisfied with acks = -1")

      val minIsr = leaderReplica.log.get.config.minInSyncReplicas

      if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
        /*
         * The topic may be configured not to accept messages if there are not enough replicas in ISR
         * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
         */
        if (minIsr <= curInSyncReplicas.size)
          (true, Errors.NONE)
        else
          (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
      } else
        (false, Errors.NONE)
    case None =>
      (false, Errors.NOT_LEADER_FOR_PARTITION)
  }
}

checkEnoughReplicasReachOffset 只有在 producer 的 acks = -1 時,broker才會執行。

當 leader 的 HW 值大於 requiredOffset,且當前 isr 的副本數大於等於 minIsr,則返回正常。

replica.lag.time.max.ms = 10000

如何判斷一個 replica 不在 isr 中?如果超過 10s,follower 還沒有趕上 leader,則會被移出 isr。

// kafka.cluster.Partition#getOutOfSyncReplicas
def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
  // 排除 leader
  val candidateReplicas = inSyncReplicas - leaderReplica
  // 當前時間 - lastCaughtUpTimeMs > replica.lag.time.max.ms
  val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
  if (laggingReplicas.nonEmpty)
    debug("Lagging replicas are %s".format(laggingReplicas.map(_.brokerId).mkString(",")))

  laggingReplicas
}

// kafka.cluster.Replica#updateLogReadResult
def updateLogReadResult(logReadResult: LogReadResult) {
  // follower 的 offset 等於 leader 的 LEO
  if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset)
    _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs)
  // follower 的 offset 大於等於上一次 fetch 的 leader 的 LEO
  else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
    _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)

  logStartOffset = logReadResult.followerLogStartOffset
  logEndOffset = logReadResult.info.fetchOffsetMetadata
  lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset
  lastFetchTimeMs = logReadResult.fetchTimeMs
}

典型配置:

分區 3 副本
acks = -1
min.insync.replicas = 2

highwatermark 即 HW 的更新,leader 副本是取所有副本最小的 LEO 值,follower 副本取 min(leo, fetchResponse.hw)

leader 先更新 HW 值,然后 follower 拉取消息,根據 leader 的 HW 值更新自身的 HW 值。如果 follower 在更新 HW 值之前,崩潰后又重啟,會截斷日志到 HW 處。

leader epoch 的作用是啥?
改變過去根據 hw 做日志截斷可能導致的消息丟失和消息不一致。
https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation


follower 向 leader 發送 OffsetsForLeaderEpochRequest 請求,獲取 leader 在指定 epoch 的 LEO,根據 leader 的 LEO 截斷日志。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM