Kafka源碼分析11:PartitionStateMachine分區狀態機(圖解+秒懂+史上最全)


文章很長,建議收藏起來,慢慢讀! Java 高並發 發燒友社群:瘋狂創客圈 奉上以下珍貴的學習資源:


推薦:入大廠 、做架構、大力提升Java 內功 的 精彩博文

入大廠 、做架構、大力提升Java 內功 必備的精彩博文 2021 秋招漲薪1W + 必備的精彩博文
1:Redis 分布式鎖 (圖解-秒懂-史上最全) 2:Zookeeper 分布式鎖 (圖解-秒懂-史上最全)
3: Redis與MySQL雙寫一致性如何保證? (面試必備) 4: 面試必備:秒殺超賣 解決方案 (史上最全)
5:面試必備之:Reactor模式 6: 10分鍾看懂, Java NIO 底層原理
7:TCP/IP(圖解+秒懂+史上最全) 8:Feign原理 (圖解)
9:DNS圖解(秒懂 + 史上最全 + 高薪必備) 10:CDN圖解(秒懂 + 史上最全 + 高薪必備)
11: 分布式事務( 圖解 + 史上最全 + 吐血推薦 ) 12:seata AT模式實戰(圖解+秒懂+史上最全)
13:seata 源碼解讀(圖解+秒懂+史上最全) 14:seata TCC模式實戰(圖解+秒懂+史上最全)

Java 面試題 30個專題 , 史上最全 , 面試必刷 阿里、京東、美團... 隨意挑、橫着走!!!
1: JVM面試題(史上最強、持續更新、吐血推薦) 2:Java基礎面試題(史上最全、持續更新、吐血推薦
3:架構設計面試題 (史上最全、持續更新、吐血推薦) 4:設計模式面試題 (史上最全、持續更新、吐血推薦)
17、分布式事務面試題 (史上最全、持續更新、吐血推薦) 一致性協議 (史上最全)
29、多線程面試題(史上最全) 30、HR面經,過五關斬六將后,小心陰溝翻船!
9.網絡協議面試題(史上最全、持續更新、吐血推薦) 更多專題, 請參見【 瘋狂創客圈 高並發 總目錄

SpringCloud 精彩博文
nacos 實戰(史上最全) sentinel (史上最全+入門教程)
SpringCloud gateway (史上最全) 更多專題, 請參見【 瘋狂創客圈 高並發 總目錄

背景:

下一個視頻版本,從架構師視角,尼恩為大家打造史上最強kafka源碼視頻

並且,進一步,帶大家實現一個超高質量的項目實操:10WQPS超高並發消息隊列架構與實操

why kafka:

kafka 是高性能、高並發應用的經典案例,從技術學習、架構學習的角度來講,渾身是寶。 netty 僅僅是通訊架構,kafka還有存儲架構、高並發架構、高可用架構等等,都是經典中的經典。

但是,kafka很難,大家要做好思想准備。不過,跟着尼恩一起學架構,估計大家也不難了。

最終的目標,帶大家穿透kafka, 掌握其存儲架構、高並發架構、高可用架構的精髓。

最后,結合netty高性能架構,最終手寫一個10WQPS超高並發消息隊列。

此視頻版本的整體的次序:

  • 首先,開始Kafka源碼分析,對kafka來一次徹底穿透
  • 然后,10WQPS超高並發消息隊列架構與實操,結合牛逼的Seata 源碼中的RPC框架

此文為Kafka源碼分析之11.

本系列博客的具體內容,請參見 Java 高並發 發燒友社群:瘋狂創客圈

在這里插入圖片描述

Kafka源碼分析11:PartitionStateMachine分區狀態機

kafka分區機制

分區機制是kafka實現高吞吐的秘密武器,但這個武器用得不好的話也容易出問題,作為背景,這里介紹分區的機制。

首先,從數據組織形式來說,kafka有三層形式,kafka有多個主題,每個主題有多個分區,每個分區又有多條消息。

而每個分區可以分布到不同的機器上,這樣一來,從服務端來說,分區可以實現高伸縮性,以及負載均衡,動態調節的能力。

下圖是一個3個分區的topic例子,並且每個分區有3個副本:

img

我們可以通過replication-factor指定創建topic時候所創建的分區數。

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

比如這里就是創建了1個分區,的主題。

值得注意的是,還有一種創建主題的方法,是使用zookeeper參數的,那種是比較舊的創建方法,這里是使用bootstrap參數的。

PartitionStateMachine:分區狀態機

PartitionStateMachine 負責管理 Kafka 分區狀態的轉換,和 ReplicaStateMachine 是一 脈相承的。

二者的對比

  • ReplicaStateMachine:

負責定義 Kafka 副本狀態、合法的狀態轉換,以及管理狀態之間的轉換。

  • PartitionStateMachine:

負責定義 Kafka 分區狀態、合法的狀態轉換,以及管理狀態之間的轉換。

分區狀態機相關的類設計

從代碼結構、實現功能和設計原理來看,二者都極為相似。

  • PartitionStateMachine:分區狀態機抽象類

它定義了諸如 startup、shutdown 這 樣的公共方法,定義了處理分區狀態轉換入口方法 handleStateChanges ,另外,定義了一個私有的 doHandleStateChanges方法,實現分區狀態轉換的操作。

  • PartitionState 接口及其實現對象:
    定義 4 類分區狀態,分別是 NewPartition、 OnlinePartition、OfflinePartition 和 NonExistentPartition。除此之外,還定義了它 們之間的依賴關系。

  • PartitionLeaderElectionStrategy 接口及其實現對象:

定義 4 類分區 Leader 選舉策 略,對應到 Leader 選舉的 4 種場景。

  • PartitionLeaderElectionAlgorithms:分區 Leader 選舉的算法實現。
    4 類分區 Leader 選舉策 略的實現代碼,PartitionLeaderElectionAlgorithms 就提供了 這 4 類選舉策略的實現代碼。

分區狀態機的啟用

每個 Broker 啟動時,都會創建對應的分區狀態機和副本狀態機實例,但只有 Controller 所在的 Broker 才會啟動它們。

如果 Controller 變更到其他 Broker,老 Controller 所在的 Broker 要調用這些狀態機的 shutdown 方法關閉它們,新 Controller 所在的 Broker 調用狀態機的 startup 方法啟動它們。

分區狀態

PartitionState 接口

PartitionState 接口及其實現類,用來定義分區狀態。

sealed trait PartitionState {
  def state: Byte // 狀態序號
  def validPreviousStates: Set[PartitionState] // 合法前置狀態集合
}

和 ReplicaState 類似,PartitionState 定義了分區的狀態空間以及依賴規則。

OnlinePartition 狀態

下面以 OnlinePartition 狀態為例,說明下代碼是如何實現流轉的:


case object OnlinePartition extends PartitionState {
  val state: Byte = 1
  val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
}

如代碼所示,每個 PartitionState 都定義了名為 validPreviousStates 的集合,也就是每個狀態對應的合法前置狀態集。

對於 OnlinePartition 而言,它的合法前置狀態集包括 NewPartition、OnlinePartition 和 OfflinePartition。

在這里插入圖片描述

在 Kafka 中,從合法狀態集以外的狀態向目標狀態進行轉換,將被視為非法操作。

在這里插入圖片描述

Kafka 的 4 類分區狀態

Kafka 為分區定義了 4 類狀態,分別是 NewPartition、OnlinePartition、OfflinePartition 和 NonExistentPartition。

在這里插入圖片描述

1. NewPartition:

分區被創建后被設置成這個狀態,表明它是一個全新的分區對象。

處於 這個狀態的分區,被 Kafka 認為是“未初始化”,因此,不能選舉 Leader。

case object NewPartition extends PartitionState {
  val state: Byte = 0
  val validPreviousStates: Set[PartitionState] = Set(NonExistentPartition)
}

2. OnlinePartition:

分區正式提供服務時所處的狀態。


case object OnlinePartition extends PartitionState {
  val state: Byte = 1
  val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
}

3. OfflinePartition:

分區下線后所處的狀態。


case object OfflinePartition extends PartitionState {
  val state: Byte = 2
  val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
}

4. NonExistentPartition:

分區被刪除,並且從分區狀態機移除后所處的狀態。


case object NonExistentPartition extends PartitionState {
  val state: Byte = 3
  val validPreviousStates: Set[PartitionState] = Set(OfflinePartition)
}

分區狀態之間的轉換關系

在這里插入圖片描述

處理分區狀態轉換的方法

handleStateChanges

handleStateChanges 把 partitions 的狀態設置為 targetState。

handleStateChanges 調用doHandleStateChanges方法執行真正的狀態變更邏輯,在這個方法中,可能需要用 為 partitions 選舉新的 Leader,最終將 partitions 的 Leader 信息返回。

def handleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
                     partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None): Unit = {
if (partitions.nonEmpty) {
  try {
    // raise error if the previous batch is not empty
    //為了提高KafkaController Leader和集群其他broker的通信效率,實現批量發送請求的功能
    // 檢查上一批請求KafkaController請求,如果沒有發送完成,就報錯
    controllerBrokerRequestBatch.newBatch()
    // 調用doHandleStateChanges方法執行真正的狀態變更邏輯
    doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt)
    // Controller給相關Broker發送請求通知狀態變化
    controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
  } catch {
    case e: Throwable => error(s"Error while moving some partitions to $targetState state", e)
  }
}
}

三個參數說明:

  • partitions 是待執行狀態變更的目標分區列表
  • targetState 是目標狀態
  • partitionLeaderElectionStrategyOpt 是一個可選項,如果傳入了,就表示要執行 Leader 選舉。

doHandleStateChanges

doHandleStateChanges方法執行真正的狀態變更邏輯。

在這個方法中,可能需要用 為 partitions 選舉新的 Leader,最終將 partitions 的 Leader 信息返回

private def doHandleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
                        partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Unit = {
 val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
 // 檢查分區的狀態,如果沒有,分區的狀態設置為NonExistentPartition
 partitions.foreach(partition => partitionState.getOrElseUpdate(partition, NonExistentPartition))
 // 找出要執行非法狀態轉換的分區,記錄錯誤日志
 val (validPartitions, invalidPartitions) = partitions.partition(partition => isValidTransition(partition, targetState))
 invalidPartitions.foreach(partition => logInvalidTransition(partition, targetState))
 // 根據targetState進入到不同的case分支
 targetState match {
   case NewPartition =>
   ....  

   //4大分支
    
 }  

首先,這個方法會做狀態檢查工作。

檢查分區的狀態,如果沒有,分區的狀態設置為NonExistentPartition

接着,檢查哪些分區執行的狀態轉換不合法,如果當前的狀態不屬於targetState的前置依賴,則為不合法。然后為這些分區記錄相應的錯誤日志。

然后,就是重點和核心。

根據targetState進入到 case 分支。由於分區狀態只有 4 個,其中,只有 OnlinePartition 這一路的分支邏輯相對復雜,其他 3 路僅僅是將分區狀態設置成目標狀態而已。

先看簡單的3路。

targetState match {
 case NewPartition =>
   validPartitions.foreach { partition =>
     stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState with " +
       s"assigned replicas ${controllerContext.partitionReplicaAssignment(partition).mkString(",")}")
     partitionState.put(partition, NewPartition)
   }
 case OnlinePartition =>
   ....
   }
 case OfflinePartition =>
   validPartitions.foreach { partition =>
     stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
     partitionState.put(partition, OfflinePartition)
   }
 case NonExistentPartition =>
   validPartitions.foreach { partition =>
     stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
     partitionState.put(partition, NonExistentPartition)
   }
}

接下來,是負責的那一路,目標狀態是 OnlinePartition 的分支。

圖解:目標狀態是 OnlinePartition 的分支

流程圖如下:

在這里插入圖片描述

代碼如下:

case OnlinePartition =>
  // 獲取未初始化的NewPartition狀態下的所有分區
  val uninitializedPartitions = validPartitions.filter(partition => partitionState(partition) == NewPartition)
  // 獲取具備Leader選舉資格的分區列表
 // 只能為OnlinePartition和OfflinePartition狀態的分區選舉Leader
  val partitionsToElectLeader = validPartitions.filter(partition => partitionState(partition) == OfflinePartition || partitionState(partition) == OnlinePartition)

  // 處理所有的未初始化的NewPartition狀態下的所有分區
  if (uninitializedPartitions.nonEmpty) {
    // 初始化NewPartition狀態分區,在ZooKeeper中寫入Leader和ISR數據
    // Initialize leader and isr partition state in zookeeper
    val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions)
    successfulInitializations.foreach { partition =>
      stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
        s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
      partitionState.put(partition, OnlinePartition)
    }
  }

  // 處理所有的獲取具備Leader選舉資格的分區列表
  if (partitionsToElectLeader.nonEmpty) {
    // 為具備Leader選舉資格的分區推選Leader
    val successfulElections = electLeaderForPartitions(partitionsToElectLeader, partitionLeaderElectionStrategyOpt.get)
    // 將成功選舉Leader后的分區設置成OnlinePartition狀態
    successfulElections.foreach { partition =>
      stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
        s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
      partitionState.put(partition, OnlinePartition)
    }
  }

圖解:initializeLeaderAndIsrForPartitions初始化分區

處理所有的未初始化的NewPartition狀態下的所有分區,需要在 ZooKeeper 中,創建並寫入分區的znode節點數據。

znode節點的位置是/brokers/topics/partitions/,每個節點都要包含分區的 Leader 和 ISR 等數據。

ZK中partition狀態信息

/brokers/topics/[topic]/partitions/[0...N] 其中[0..N]表示partition索引號

/brokers/topics/[topic]/partitions/[partitionId]/state

Schema:

{
"controller_epoch": 表示kafka集群中的中央控制器選舉次數,
"leader": 表示該partition選舉leader的brokerId,
"version": 版本編號默認為1,
"leader_epoch": 該partition leader選舉次數,
"isr": [同步副本組brokerId列表]
}

Example:

{
"controller_epoch": 1,
"leader": 2,
"version": 1,
"leader_epoch": 0,
"isr": [2, 1]
}

在這里插入圖片描述

分區的Leader 和 ISR 的確定規則是:選擇存活副本列表的第一個副本作為 Leader;選擇存活副本列表作為 ISR。

具體的代碼如下:

private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicPartition]): Seq[TopicPartition] = {
   val successfulInitializations = mutable.Buffer.empty[TopicPartition]

   // 獲取每個分區的副本列表
   val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition))

   // 獲取每個分區的所有存活副本
   val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) =>
       val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))
       partition -> liveReplicasForPartition
   }

   // 按照有無存活副本對分區進行分組:有活副本的分區、無活副本的分區
   val (partitionsWithoutLiveReplicas, partitionsWithLiveReplicas) = liveReplicasPerPartition.partition { case (_, liveReplicas) => liveReplicas.isEmpty }

   partitionsWithoutLiveReplicas.foreach { case (partition, replicas) =>
     val failMsg = s"Controller $controllerId epoch ${controllerContext.epoch} encountered error during state change of " +
       s"partition $partition from New to Online, assigned replicas are " +
       s"[${replicas.mkString(",")}], live brokers are [${controllerContext.liveBrokerIds}]. No assigned " +
       "replica is alive."
     logFailedStateChange(partition, NewPartition, OnlinePartition, new StateChangeFailedException(failMsg))
   }

   // 為"有活副本的分區"確定Leader和ISR
   // Leader確認依據:存活副本列表的首個副本被認定為Leader
   // ISR確認依據:存活副本列表被認定為ISR
   val leaderIsrAndControllerEpochs = partitionsWithLiveReplicas.map { case (partition, liveReplicas) =>
     val leaderAndIsr = LeaderAndIsr(liveReplicas.head, liveReplicas.toList)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
     partition -> leaderIsrAndControllerEpoch
   }.toMap
   val createResponses = try {
     zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
   } catch {
     case e: Exception =>
       partitionsWithLiveReplicas.foreach { case (partition,_) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) }
       Seq.empty
   }
   createResponses.foreach { createResponse =>
     val code = createResponse.resultCode
     val partition = createResponse.ctx.get.asInstanceOf[TopicPartition]
     val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochs(partition)
     if (code == Code.OK) {
       controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
       controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(leaderIsrAndControllerEpoch.leaderAndIsr.isr,
         partition, leaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(partition), isNew = true)
       successfulInitializations += partition
     } else {
       logFailedStateChange(partition, NewPartition, OnlinePartition, code)
     }
   }
   successfulInitializations
 }

圖解:electLeaderForPartitions 選舉分區Leader

處理所有的獲取具備Leader選舉資格的分區列表,為具備 Leader 選舉資格的分區推選 Leader,代碼調用 electLeaderForPartitions 方法實現。

這個方法會不斷嘗試為多個分區選舉 Leader,直到所有分區都成功選出 Leader。

/**
* Repeatedly attempt to elect leaders for multiple partitions until there are no more remaining partitions to retry.
* @param partitions The partitions that we're trying to elect leaders for.
* @param partitionLeaderElectionStrategy The election strategy to use.
* @return The partitions that successfully had a leader elected.
*/
private def electLeaderForPartitions(partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): Seq[TopicPartition] = {
 val successfulElections = mutable.Buffer.empty[TopicPartition]
 var remaining = partitions

 //處理所有的獲取具備Leader選舉資格的分區列表,為具備 Leader 選舉資格的分區推選 Leader,代碼調用 electLeaderForPartitions 方法實現。
 //不斷嘗試為多個分區選舉 Leader,直到所有分區都成功選出 Leader。
 while (remaining.nonEmpty) {
   val (success, updatesToRetry, failedElections) = doElectLeaderForPartitions(partitions, partitionLeaderElectionStrategy)
   remaining = updatesToRetry
   successfulElections ++= success
   failedElections.foreach { case (partition, e) =>
     logFailedStateChange(partition, partitionState(partition), OnlinePartition, e)
   }
 }
 successfulElections
}

選舉 Leader 的核心代碼位於 doElectLeaderForPartitions 方法中。

由於分區leader選舉的代碼比較復雜,在介紹之前,作為鋪墊,為大家先介紹一下分區 Leader 選舉的場景及方法。

分區 Leader 選舉的場景及方法

分區online狀態和分區leader選舉有關,這里說說 分區 Leader 選舉的場景及方法。

在kafka中,每個主題可以有多個分區,每個分區又可以有多個副本。這多個副本中,只有一個是leader,而其他的都是follower副本。

注意:每個分區都必須選舉出 Leader副本 才能正常提供服務,沒有leader副本,分區無法提供服務。

總之,在kafka的集群中,會存在着多個主題topic,在每個topic中,又被划分為多個partition,為了防止數據不丟失,每個partition又有多個副本。

kafka主要的三種副本角色

kafka主要的三種副本角色:

  • 首領副本(leader):也就是leader主副本,每一個分區都有一個首領副本,為了保證數據一致性,全部的生產者與消費者的請求都會通過該副原本處理。
  • 跟隨者副本(follower):除了首領副本外的其余全部副本都是跟隨者副本,跟隨者副本不處理來自客戶端的任何請求,只負責從首領副本同步數據,保證與首領保持一致。若是首領副本發生崩潰,就會從這其中選舉出一個leader。
  • 首選首領副本:建立分區時指定的首選首領。若是不指定,則為分區的第一個副本。

follower須要從leader中同步數據,可是因為網絡或者其余緣由,致使數據阻塞,出現不一致的狀況,為了不這種狀況,follower會向leader發送請求信息,這些請求信息中包含了follower須要數據的偏移量offset,並且這些offset是有序的。

若是有follower向leader發送了請求1,接着發送請求2,請求3,那么再發送請求4,這時就意味着follower已經同步了前三條數據,不然不會發送請求4。leader經過跟蹤 每個follower的offset來判斷它們的復制進度。

默認的,若是follower與leader之間超過10s內沒有發送請求,或者說沒有收到請求數據,此時該follower就會被認為“不一樣步副本”, 而持續請求的副本就是“同步副本”。

當leader發生故障時,只有“同步副本”才能夠被選舉為leader。其中的請求超時時間能夠經過參數replica.lag.time.max.ms參數來配置。

負載均衡的最佳目標:

每一個分區的leader能夠分布到不一樣的broker中,盡量的達到最最佳的負載均衡效果。

img

因此會有一個首選首領,若是咱們設置參數auto.leader.rebalance.enable為true,那么它會檢查首選首領是不是真正的首領,若是不是,則會觸發選舉,讓首選首領成為首領。

啰嗦:Replica副本的幾個術語

1、assignments

這是分區的副本列表。
該列表有個專屬的名稱,叫 Assigned Replicas,簡稱 AR。當我們 創建主題之后,使用 kafka-topics 腳本查看主題時,應該可以看到名為 Replicas 的一列數 據。這列數據顯示的,就是主題下每個分區的 AR。assignments 參數類型是 Seq[Int]。這 揭示了一個重要的事實:AR 是有順序的,而且不一定和 ISR 的順序相同!

2、isr

ISR 在 Kafka 中很有名氣,它保存了分區所有與 Leader 副本保持同步的副本列表。
注意, Leader 副本自己也在 ISR 中。另外,作為 Seq[Int]類型的變量,isr 自身也是有順序的。

3、liveReplicas

從名字可以推斷出,它保存了該分區下所有處於存活狀態的副本。
怎么判斷副本是否存活 呢?可以根據 Controller 元數據緩存中的數據來判定。簡單來說,所有在運行中的 Broker 上的副本,都被認為是存活的。

4、uncleanLeaderElectionEnabled

在默認配置下,只要不是由 AdminClient 發起的 Leader 選舉,這個參數的值一般是 false,即 Kafka 不允許執行 Unclean Leader 選舉。
所謂的 Unclean Leader 選舉,是指 在 ISR 列表為空的情況下,Kafka 選擇一個非 ISR 副本作為新的 Leader。
由於存在丟失數 據的風險,目前,社區已經通過把 Broker 端參數 unclean.leader.election.enable 的默認 值設置為 false 的方式,禁止 Unclean Leader 選舉了。

代碼首先會順序搜索 AR 列表,並把第一個同時滿足以下兩個條件的副本作為新的 Leader 返回:

  1. 該副本是存活狀態,即副本所在的 Broker 依然在運行中;
  2. 該副本在 ISR 列表中。

分區 Leader 選舉有 4 類場景

// 分區Leader選舉策略接口
sealed trait PartitionLeaderElectionStrategy
// 離線分區Leader選舉策略
case object OfflinePartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
// 分區副本重分配Leader選舉策略
case object ReassignPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
// 分區Preferred副本Leader選舉策略
case object PreferredReplicaPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
// Broker Controlled關閉時Leader選舉策略
case object ControlledShutdownPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy

  1. OfflinePartitionLeaderElectionStrategy:

因為 Leader 副本下線而引發的分區 Leader 選舉。

  1. ReassignPartitionLeaderElectionStrategy:

因為執行分區副本重分配操作而引發的分 區 Leader 選舉。

  1. PreferredReplicaPartitionLeaderElectionStrategy:

因為執行 Preferred 副本 Leader 選舉而引發的分區 Leader 選舉。

  1. ControlledShutdownPartitionLeaderElectionStrategy:

因為正常關閉 Broker 而引發 的分區 Leader 選舉。

scala基礎

上面使用到了 case object,補充下scala基礎知識

Scala中class、object、case class、case object區別

/** class、object、case class、case object區別
  *
  * class: 類似Java中的class;
  * object: 類似java的單例對象,Scala不能定義靜態成員,用定義單例對象代之;
  * case class:被稱為樣例類,是一種特殊的類,常被用於模式匹配。
  *
  * 一、class 和 object 關系:
  * 1.單例對象不能帶參數,類可以
  * 2.當對象和類名一樣時,object被稱為伴生對象,class被稱為伴生類;
  * 3.類和伴生對象可以相互訪問其私有屬性,但是它們必須在一個源文件當中;
  * 4.類只會被編譯,不會被執行。要執行,必須在Object中。
  *
  * 二、case class 與 class 區別:
  * 1.初始化的時候可以不用new,也可以加上,但是普通類必須加new;
  * 2.默認實現了equals、hashCode方法;
  * 3.默認是可以序列化的,實現了Serializable;
  * 4.自動從scala.Product中繼承一些函數;
  * 5.case class 構造函數參數是public的,我們可以直接訪問;
  * 6.case class默認情況下不能修改屬性值;
  * 7.case class最重要的功能,支持模式匹配,這也是定義case class的重要原因。
  *
  * 三、case class 和 case object 區別:
  * 1.類中有參和無參,當類有參數的時候,用case class ,當類沒有參數的時候那么用case object。
  *
  * 四、當一個類被聲名為case class的時候,scala會幫助我們做下面幾件事情:
  * 1.構造器中的參數如果不被聲明為var的話,它默認的話是val類型的,但一般不推薦將構造器中的參數聲明為var
  * 2.自動創建伴生對象,同時在里面給我們實現子apply方法,使得我們在使用的時候可以不直接顯示地new對象
  * 3.伴生對象中同樣會幫我們實現unapply方法,從而可以將case class應用於模式匹配,關於unapply方法我們在后面的“提取器”那一節會重點講解
  * 4.實現自己的toString、hashCode、copy、equals方法
  * 除此之此,case class與其它普通的scala類沒有區別
  */

case class Iteblog(name:String)

object TestScala {

  def main(args: Array[String]): Unit = {

    val iteblog = new Iteblog("iteblog_hadoop")

    val iteblog2 = Iteblog("iteblog_hadoop")

    println(iteblog == iteblog2)

    println(iteblog.hashCode)

    println(iteblog2.hashCode)
  }

}

PartitionLeaderElectionAlgorithms

針對這 4 類場景,分區狀態機的 PartitionLeaderElectionAlgorithms 對象定義了 4 個方 法,分別負責為每種場景選舉 Leader 副本,這 4 種方法是:

  • offlinePartitionLeaderElection;
  • reassignPartitionLeaderElection;
  • preferredReplicaPartitionLeaderElection;
  • controlledShutdownPartitionLeaderElection。

具體的模式匹配代碼在doElectLeaderForPartitions方法中如下:

val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {
  case OfflinePartitionLeaderElectionStrategy =>
    // 離線分區Leader選舉策略
    leaderForOffline(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
  case ReassignPartitionLeaderElectionStrategy =>
    // 分區副本重分配Leader選舉策略
    leaderForReassign(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
  case PreferredReplicaPartitionLeaderElectionStrategy =>
    // 分區Preferred副本Leader選舉策略
    leaderForPreferredReplica(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
  case ControlledShutdownPartitionLeaderElectionStrategy =>
    // Broker Controlled關閉時Leader選舉策略
    leaderForControlledShutdown(validPartitionsForElection, shuttingDownBrokers).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
}

接下來,我們回到重點方法:doElectLeaderForPartitions的介紹。

重點方法:doElectLeaderForPartitions

step1:獲取可選舉Leader分區列表

首先,doElectLeaderForPartitions是從 ZooKeeper 中獲取給定partitions分區的 Leader、ISR 信息,並將結果封裝進名為 leaderIsrAndControllerEpochPerPartition 的容器中,代碼如下:

private def doElectLeaderForPartitions(partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy):
(Seq[TopicPartition], Seq[TopicPartition], Map[TopicPartition, Exception]) = {
  val getDataResponses = try {
  //step1:獲取可選舉Leader分區列表
    // 獲取ZooKeeper中給定partitions分區的znode節點數據
    zkClient.getTopicPartitionStatesRaw(partitions)
  } catch {
    case e: Exception =>
      return (Seq.empty, Seq.empty, partitions.map(_ -> e).toMap)
  }
  //選舉失敗分區列表map容器
  val failedElections = mutable.Map.empty[TopicPartition, Exception]
  //可選舉Leader分區列表map容器
  val leaderIsrAndControllerEpochPerPartition = mutable.Buffer.empty[(TopicPartition, LeaderIsrAndControllerEpoch)]
  // 遍歷每個分區的znode節點數據
  getDataResponses.foreach { getDataResponse =>
    val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
    val currState = partitionState(partition)
    // 如果成功拿到znode節點數據
    if (getDataResponse.resultCode == Code.OK) {
      val leaderIsrAndControllerEpochOpt = TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
      if (leaderIsrAndControllerEpochOpt.isEmpty) {
        val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
        failedElections.put(partition, exception)
      }
      // 將該分區加入到可選舉Leader分區列表
      leaderIsrAndControllerEpochPerPartition += partition -> leaderIsrAndControllerEpochOpt.get
    } else if (getDataResponse.resultCode == Code.NONODE) {
      val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
      // 如果沒有拿到znode節點數據,則將該分區加入到選舉失敗分區列表
      failedElections.put(partition, exception)
    } else {
      failedElections.put(partition, getDataResponse.resultException.get)
    }
  }
  
  ...
}

遍歷完這些分區之后,leaderIsrAndControllerEpochPerPartition 容器中是否包含可選舉 Leader 的分區。

step2,進行 Controller Epoch 的年代判斷

接着,進行 Controller Epoch 的年代判斷:

  • 節點數據包含 Leader 和 ISR 信息且節點數據的 Controller Epoch 值小於當前 Controller Epoch 值,那么,就將該分區加入到可選舉 Leader 分區列表validPartitionsForElection。

  • 倘若發現 Zookeeper 中保存的 Controller Epoch 值大於當前 Epoch 值,說明該分區已經被一個更新的 Controller 選舉過 Leader 了,將該分區放置到選舉失敗分區列表中invalidPartitionsForElection。

如果validPartitionsForElection為空,則終止本次選舉。


//接着,進行 Controller Epoch 的年代判斷:
// 節點數據包含 Leader 和 ISR 信息且節點數據的 Controller Epoch 值小於當前 Controller Epoch 值,
// 那么,就將該分區加入到可選舉 Leader 分區列表。
// 倘若 Controller Epoch 值大於當前Controller Epoch 值,
// 說明該分區已經被一個更新的 Controller 選舉過 Leader 了,此時必須終止本次 Leader 選舉,並將該分區放置到選舉失敗分區列表中
val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (partition, leaderIsrAndControllerEpoch) =>
  leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch
}

invalidPartitionsForElection.foreach { case (partition, leaderIsrAndControllerEpoch) =>
  val failMsg = s"aborted leader election for partition $partition since the LeaderAndIsr path was " +
    s"already written by another controller. This probably means that the current controller $controllerId went through " +
    s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
  failedElections.put(partition, new StateChangeFailedException(failMsg))
}
if (validPartitionsForElection.isEmpty) {
  return (Seq.empty, Seq.empty, failedElections.toMap)
}

step3:調用 PartitionLeaderElectionAlgorithms 的不同方法執行 Leader 選舉

這一步是根據給定的 PartitionLeaderElectionStrategy,調用 PartitionLeaderElectionAlgorithms 的不同方法執行 Leader 選舉

//step3:調用 PartitionLeaderElectionAlgorithms 的不同方法執行 Leader 選舉
//根據給定的 PartitionLeaderElectionStrategy,調用 PartitionLeaderElectionAlgorithms 的不同方法執行 Leader 選舉
val shuttingDownBrokers  = controllerContext.shuttingDownBrokerIds.toSet
val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {
  case OfflinePartitionLeaderElectionStrategy =>
    // 離線分區Leader選舉策略
    leaderForOffline(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
  case ReassignPartitionLeaderElectionStrategy =>
    // 分區副本重分配Leader選舉策略
    leaderForReassign(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
  case PreferredReplicaPartitionLeaderElectionStrategy =>
    // 分區Preferred副本Leader選舉策略
    leaderForPreferredReplica(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
  case ControlledShutdownPartitionLeaderElectionStrategy =>
    // Broker Controlled關閉時Leader選舉策略
    leaderForControlledShutdown(validPartitionsForElection, shuttingDownBrokers).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
}

這 4 種不同的策略定義了 4 個專屬的方法來進行 Leader 選舉。

大致的選擇 Leader 的規則,就是選擇副本集合中首個存活且處於 ISR 中的副本作為 Leader

以leaderForOffline為例子吧:

private def leaderForOffline(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]):
Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
....
  } ++ partitionsWithLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) => (partition, Option(leaderIsrAndControllerEpoch), false) }
  partitionsWithUncleanLeaderElectionState.map { case (partition, leaderIsrAndControllerEpochOpt, uncleanLeaderElectionEnabled) =>
   //獲取全部副本
    val assignment = controllerContext.partitionReplicaAssignment(partition)
   //獲取存活副本
    val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
    if (leaderIsrAndControllerEpochOpt.nonEmpty) {
      val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get
      val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
      //通過算法選舉一個leader
      val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext)
      val newLeaderAndIsrOpt = leaderOpt.map { leader =>
      val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
       else List(leader)
        leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr)
      }
      (partition, newLeaderAndIsrOpt, liveReplicas)
    } else {
      (partition, None, liveReplicas)
    }
  }
}

4種選舉leader的算法:

4種選舉leader的算法也類同,大致選擇副本集合中首個存活且處於 ISR 中的副本作為 Leader。

object PartitionLeaderElectionAlgorithms {
def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = {
  //返回第一個存活的、在isr中的副本
  //如果沒有找到,就找一個存活副本
  assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse {
    if (uncleanLeaderElectionEnabled) {
      val leaderOpt = assignment.find(liveReplicas.contains)
      if (!leaderOpt.isEmpty)
        controllerContext.stats.uncleanLeaderElectionRate.mark()
      leaderOpt
    } else {
      None
    }
  }
}

def reassignPartitionLeaderElection(reassignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
  //返回第一個存活的、在isr中的副本
  reassignment.find(id => liveReplicas.contains(id) && isr.contains(id))
}

def preferredReplicaPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
  assignment.headOption.filter(id => liveReplicas.contains(id) && isr.contains(id))
}

def controlledShutdownPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], shuttingDownBrokers: Set[Int]): Option[Int] = {
  assignment.find(id => liveReplicas.contains(id) && isr.contains(id) && !shuttingDownBrokers.contains(id))
}
}

step4:zk元數據和Controller 端元數據緩存信息的更新

再來看這個方法的最后一部分代碼,這一步主要是更新 ZooKeeper 節點數據,以及 Controller 端元數據緩存信息。

 // 將所有選舉失敗的分區全部加入到Leader選舉失敗分區列表
 partitionsWithoutLeaders.foreach { case (partition, leaderAndIsrOpt, recipients) =>
   val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
   failedElections.put(partition, new StateChangeFailedException(failMsg))
 }
 val recipientsPerPartition = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> recipients }.toMap
 val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> leaderAndIsrOpt.get }.toMap
 // 使用新選舉的Leader和ISR信息更新ZooKeeper上分區的znode節點數據
 val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
   adjustedLeaderAndIsrs, controllerContext.epoch)

 // 對於ZooKeeper znode節點數據更新成功的分區,封裝對應的Leader和ISR信息
 // 構建LeaderAndIsr請求,並將該請求加入到Controller待發送請求的集合中
 // 等待后續批量發送
 successfulUpdates.foreach { case (partition, leaderAndIsr) =>
   val replicas = controllerContext.partitionReplicaAssignment(partition)
   val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
   controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
   controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
     leaderIsrAndControllerEpoch, replicas, isNew = false)
 }
 // 返回選舉結果,包括成功選舉並更新ZooKeeper節點的分區、選舉失敗分區以及
 // ZooKeeper節點更新失敗的分區
 (successfulUpdates.keys.toSeq, updatesToRetry, failedElections.toMap ++ failedUpdates)

對於 ZooKeeper Znode 節點數據更新成功的那些分區,源碼會封裝對應的 Leader 和 ISR 信息,構建 LeaderAndIsr 請求,並將該請求加入到 Controller 待發送請求集合,等待后續統一發送。最后,方法返回選舉結果,包括成功選舉並更新 ZooKeeper 節點的分區列表、選舉失敗分區列表,以及 ZooKeeper 節點更新失敗的分區列表。

在這里插入圖片描述

參考文獻

https://www.cnblogs.com/boanxin/p/13696136.html

https://www.cnblogs.com/listenfwind/p/12465409.html

https://www.shangmayuan.com/a/5e15939288954d3cb3ad613e.html

https://my.oschina.net/u/3070368/blog/4338739

https://www.cnblogs.com/shimingjie/p/10374451.html

https://www.bbsmax.com/A/VGzlAONYJb/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM