kafka學習筆記(七)kafka的狀態機模塊


概述

 這一篇隨筆介紹kafka的狀態機模塊,Kafka 源碼中有很多狀態機和管理器,比如之前我們學過的 Controller 通道管理器 ControllerChannelManager、處理 Controller 事件的 ControllerEventManager,等等。這些管理器和狀態機,大多與各自的“宿主”組件關系密切,可以說是大小不同、功能各異。就比如 Controller 的這兩個管理器,必須要與 Controller 組件緊耦合在一起才能實現各自的功能。不過,Kafka 中還是有一些狀態機和管理器具有相對獨立的功能框架,不嚴重依賴使用方,也就是我在這個模塊為你精選的 TopicDeletionManager(主題刪除管理器)、ReplicaStateMachine(副本狀態機)和 PartitionStateMachine(分區狀態機)。TopicDeletionManager:負責對指定 Kafka 主題執行刪除操作,清除待刪除主題在集群上的各類“痕跡”。

ReplicaStateMachine:負責定義 Kafka 副本狀態、合法的狀態轉換,以及管理狀態之間的轉換。

PartitionStateMachine:負責定義 Kafka 分區狀態、合法的狀態轉換,以及管理狀態之間的轉換。

TopicDeletionManager

TopicDeletionManager.scala 這個源文件,包括 3 個部分。

DeletionClient 接口:負責實現刪除主題以及后續的動作,比如更新元數據等。這個接口里定義了 4 個方法,分別是 deleteTopic、deleteTopicDeletions、mutePartitionModifications 和 sendMetadataUpdate。我們后面再詳細學習它們的代碼。

ControllerDeletionClient 類:實現 DeletionClient 接口的類,分別實現了剛剛說到的那 4 個方法。

TopicDeletionManager 類:主題刪除管理器類,定義了若干個方法維護主題刪除前后集群狀態的正確性。比如,什么時候才能刪除主題、什么時候主題不能被刪除、主題刪除過程中要規避哪些操作,等等。

DeletionClient 接口及其實現

DeletionClient 接口定義的方法用於刪除主題,並將刪除主題這件事兒同步給其他 Broker。目前,DeletionClient 這個接口只有一個實現類,即 ControllerDeletionClient。我們看下這個實現類的代碼:

 1 class ControllerDeletionClient(controller: KafkaController, zkClient: KafkaZkClient) extends DeletionClient {
 2   // 刪除給定主題
 3   override def deleteTopic(topic: String, epochZkVersion: Int): Unit = {
 4     // 刪除/brokers/topics/<topic>節點
 5     zkClient.deleteTopicZNode(topic, epochZkVersion)
 6     // 刪除/config/topics/<topic>節點
 7     zkClient.deleteTopicConfigs(Seq(topic), epochZkVersion)
 8     // 刪除/admin/delete_topics/<topic>節點
 9     zkClient.deleteTopicDeletions(Seq(topic), epochZkVersion)
10   }
11   // 刪除/admin/delete_topics下的給定topic子節點
12   override def deleteTopicDeletions(topics: Seq[String], epochZkVersion: Int): Unit = {
13     zkClient.deleteTopicDeletions(topics, epochZkVersion)
14   }
15   // 取消/brokers/topics/<topic>節點數據變更的監聽
16   override def mutePartitionModifications(topic: String): Unit = {
17     controller.unregisterPartitionModificationsHandlers(Seq(topic))
18   }
19   // 向集群Broker發送指定分區的元數據更新請求
20   override def sendMetadataUpdate(partitions: Set[TopicPartition]): Unit = {
21     controller.sendUpdateMetadataRequest(
22       controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
23   }
24 }

這個類的構造函數接收兩個字段。同時,由於是 DeletionClient 接口的實現類,因而該類實現了 DeletionClient 接口定義的四個方法。先來說構造函數的兩個字段:KafkaController 實例和 KafkaZkClient 實例。KafkaController 實例,我們已經很熟悉了,就是 Controller 組件對象;而 KafkaZkClient 實例,就是 Kafka 與 ZooKeeper 交互的客戶端對象。接下來,我們再結合代碼看下 DeletionClient 接口實現類 ControllerDeletionClient 定義的 4 個方法。我來簡單介紹下這 4 個方法大致是做什么的。

1.deleteTopic它用於刪除主題在 ZooKeeper 上的所有“痕跡”。具體方法是,分別調用 KafkaZkClient 的 3 個方法去刪除 ZooKeeper 下 /brokers/topics/節點、/config/topics/節點和 /admin/delete_topics/節點。2.deleteTopicDeletions它用於刪除 ZooKeeper 下待刪除主題的標記節點。具體方法是,調用 KafkaZkClient 的 deleteTopicDeletions 方法,批量刪除一組主題在 /admin/delete_topics 下的子節點。注意,deleteTopicDeletions 這個方法名結尾的 Deletions,表示 /admin/delete_topics 下的子節點。所以,deleteTopic 是刪除主題,deleteTopicDeletions 是刪除 /admin/delete_topics 下的對應子節點。到這里,我們還要注意的一點是,這兩個方法里都有一個 epochZkVersion 的字段,代表期望的 Controller Epoch 版本號。如果你使用一個舊的 Epoch 版本號執行這些方法,ZooKeeper 會拒絕,因為和它自己保存的版本號不匹配。如果一個 Controller 的 Epoch 值小於 ZooKeeper 中保存的,那么這個 Controller 很可能是已經過期的 Controller。這種 Controller 就被稱為 Zombie Controller。epochZkVersion 字段的作用,就是隔離 Zombie Controller 發送的操作。

3.mutePartitionModifications它的作用是屏蔽主題分區數據變更監聽器,具體實現原理其實就是取消 /brokers/topics/節點數據變更的監聽。這樣當該主題的分區數據發生變更后,由於對應的 ZooKeeper 監聽器已經被取消了,因此不會觸發 Controller 相應的處理邏輯。那為什么要取消這個監聽器呢?其實,主要是為了避免操作之間的相互干擾。設想下,用戶 A 發起了主題刪除,而同時用戶 B 為這個主題新增了分區。此時,這兩個操作就會相互沖突,如果允許 Controller 同時處理這兩個操作,勢必會造成邏輯上的混亂以及狀態的不一致。為了應對這種情況,在移除主題副本和分區對象前,代碼要先執行這個方法,以確保不再響應用戶對該主題的其他操作。mutePartitionModifications 方法的實現原理很簡單,它會調用 unregisterPartitionModificationsHandlers,並接着調用 KafkaZkClient 的 unregisterZNodeChangeHandler 方法,取消 ZooKeeper 上對給定主題的分區節點數據變更的監聽。

4.sendMetadataUpdate它會調用 KafkaController 的 sendUpdateMetadataRequest 方法,給集群所有 Broker 發送更新請求,告訴它們不要再為已刪除主題的分區提供服務。

TopicDeletionManager 定義及初始化

有了這些鋪墊,我們再來看主題刪除管理器的主要入口:TopicDeletionManager 類。這個類的定義代碼,如下:

 1 class TopicDeletionManager(
 2   // KafkaConfig類,保存Broker端參數
 3   config: KafkaConfig, 
 4   // 集群元數據
 5   controllerContext: ControllerContext,
 6   // 副本狀態機,用於設置副本狀態
 7   replicaStateMachine: ReplicaStateMachine,
 8   // 分區狀態機,用於設置分區狀態
 9   partitionStateMachine: PartitionStateMachine,
10   // DeletionClient接口,實現主題刪除
11   client: DeletionClient) extends Logging {
12   this.logIdent = s"[Topic Deletion Manager ${config.brokerId}] "
13   // 是否允許刪除主題
14   val isDeleteTopicEnabled: Boolean = config.deleteTopicEnable
15   ......
16 }

該類主要的屬性有 6 個,我們分別來看看。

config:KafkaConfig 實例,可以用作獲取 Broker 端參數 delete.topic.enable 的值。該參數用於控制是否允許刪除主題,默認值是 true,即 Kafka 默認允許用戶刪除主題。

controllerContext:Controller 端保存的元數據信息。刪除主題必然要變更集群元數據信息,因此 TopicDeletionManager 需要用到 controllerContext 的方法,去更新它保存的數據。

replicaStateMachine 和 partitionStateMachine:副本狀態機和分區狀態機。它們各自負責副本和分區的狀態轉換,以保持副本對象和分區對象在集群上的一致性狀態。這兩個狀態機是后面兩講的重要知識點。

client:前面介紹的 DeletionClient 接口。TopicDeletionManager 通過該接口執行 ZooKeeper 上節點的相應更新。

isDeleteTopicEnabled:表明主題是否允許被刪除。它是 Broker 端參數 delete.topic.enable 的值,默認是 true,表示 Kafka 允許刪除主題。源碼中大量使用這個字段判斷主題的可刪除性。前面的 config 參數的主要目的就是設置這個字段的值。被設定之后,config 就不再被源碼使用了。

TopicDeletionManager 重要方法

最重要的當屬 resumeDeletions 方法。它是重啟主題刪除操作過程的方法。主題因為某些事件可能一時無法完成刪除,比如主題分區正在進行副本重分配等。一旦這些事件完成后,主題重新具備可刪除的資格。此時,代碼就需要調用 resumeDeletions 重啟刪除操作。這個方法之所以很重要,是因為它還串聯了 TopicDeletionManager 類的很多方法,如 completeDeleteTopic 和 onTopicDeletion 等。因此,你完全可以從 resumeDeletions 方法開始,逐漸深入到其他方法代碼的學習。那我們就先學習 resumeDeletions 的實現代碼吧。

 1 private def resumeDeletions(): Unit = {
 2   // 從元數據緩存中獲取要刪除的主題列表
 3   val topicsQueuedForDeletion = Set.empty[String] ++ controllerContext.topicsToBeDeleted
 4   // 待重試主題列表
 5   val topicsEligibleForRetry = mutable.Set.empty[String]
 6   // 待刪除主題列表
 7   val topicsEligibleForDeletion = mutable.Set.empty[String]
 8   if (topicsQueuedForDeletion.nonEmpty)
 9     info(s"Handling deletion for topics ${topicsQueuedForDeletion.mkString(",")}")
10   // 遍歷每個待刪除主題
11   topicsQueuedForDeletion.foreach { topic =>
12     // 如果該主題所有副本已經是ReplicaDeletionSuccessful狀態
13     // 即該主題已經被刪除  
14     if (controllerContext.areAllReplicasInState(topic, ReplicaDeletionSuccessful)) {
15       // 調用completeDeleteTopic方法完成后續操作即可
16       completeDeleteTopic(topic)
17       info(s"Deletion of topic $topic successfully completed")
18      // 如果主題刪除尚未開始並且主題當前無法執行刪除的話
19     } else if (!controllerContext.isAnyReplicaInState(topic, ReplicaDeletionStarted)) {
20       if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
21         // 把該主題加到待重試主題列表中用於后續重試
22         topicsEligibleForRetry += topic
23       }
24     }
25     // 如果該主題能夠被刪除
26     if (isTopicEligibleForDeletion(topic)) {
27       info(s"Deletion of topic $topic (re)started")
28       topicsEligibleForDeletion += topic
29     }
30   }
31   // 重試待重試主題列表中的主題刪除操作
32   if (topicsEligibleForRetry.nonEmpty) {
33     retryDeletionForIneligibleReplicas(topicsEligibleForRetry)
34   }
35   // 調用onTopicDeletion方法,對待刪除主題列表中的主題執行刪除操作
36   if (topicsEligibleForDeletion.nonEmpty) {
37     onTopicDeletion(topicsEligibleForDeletion)
38   }
39 }

通過代碼我們發現,這個方法首先從元數據緩存中獲取要刪除的主題列表,之后定義了兩個空的主題列表,分別保存待重試刪除主題和待刪除主題。然后,代碼遍歷每個要刪除的主題,去看它所有副本的狀態。如果副本狀態都是 ReplicaDeletionSuccessful,就表明該主題已經被成功刪除,此時,再調用 completeDeleteTopic 方法,完成后續的操作就可以了。對於那些刪除操作尚未開始,並且暫時無法執行刪除的主題,源碼會把這類主題加到待重試主題列表中,用於后續重試;如果主題是能夠被刪除的,就將其加入到待刪除列表中。最后,該方法調用 retryDeletionForIneligibleReplicas 方法,來重試待重試主題列表中的主題刪除操作。對於待刪除主題列表中的主題則調用 onTopicDeletion 刪除之。值得一提的是,retryDeletionForIneligibleReplicas 方法用於重試主題刪除。這是通過將對應主題副本的狀態,從 ReplicaDeletionIneligible 變更到 OfflineReplica 來完成的。這樣,后續再次調用 resumeDeletions 時,會嘗試重新刪除主題。

總結:在主題刪除過程中,Kafka 會調整集群中三個地方的數據:ZooKeeper、元數據緩存和磁盤日志文件。刪除主題時,ZooKeeper 上與該主題相關的所有 ZNode 節點必須被清除;Controller 端元數據緩存中的相關項,也必須要被處理,並且要被同步到集群的其他 Broker 上;而磁盤日志文件,更是要清理的首要目標。這三個地方必須要統一處理,就好似我們常說的原子性操作一樣。

ReplicaStateMachine

我們看下 ReplicaStateMachine 及其子類 ZKReplicaStateMachine 在代碼中是如何定義的,請看這兩個代碼片段:

 1 // ReplicaStateMachine抽象類定義
 2 abstract class ReplicaStateMachine(controllerContext: ControllerContext) extends Logging {
 3   ......
 4 }
 5 
 6 // ZkReplicaStateMachine具體實現類定義
 7 class ZkReplicaStateMachine(config: KafkaConfig, 
 8   stateChangeLogger: StateChangeLogger,
 9   controllerContext: ControllerContext,
10   zkClient: KafkaZkClient,
11   controllerBrokerRequestBatch: ControllerBrokerRequestBatch) 
12   extends ReplicaStateMachine(controllerContext) with Logging {
13   ......
14 }

KafkaController 對象在構建的時候,就會初始化一個 ZkReplicaStateMachine 實例,如下列代碼所示:

1 val replicaStateMachine: ReplicaStateMachine = new   
2   ZkReplicaStateMachine(config, stateChangeLogger, 
3     controllerContext, zkClient,
4     new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))

你可能會問:“如果一個 Broker 沒有被選舉為 Controller,它也會構建 KafkaController 對象實例嗎?”沒錯!所有 Broker 在啟動時,都會創建 KafkaController 實例,因而也會創建 ZKReplicaStateMachine 實例。每個 Broker 都會創建這些實例,並不代表每個 Broker 都會啟動副本狀態機。事實上,只有在 Controller 所在的 Broker 上,副本狀態機才會被啟動。具體的啟動代碼位於 KafkaController 的 onControllerFailover 方法。

副本狀態及狀態管理流程

副本狀態機一旦被啟動,就意味着它要行使它最重要的職責了:管理副本狀態的轉換。不過,在學習如何管理狀態之前,我們必須要弄明白,當前都有哪些狀態,以及它們的含義分別是什么。源碼中的 ReplicaState 定義了 7 種副本狀態。NewReplica:副本被創建之后所處的狀態。OnlineReplica:副本正常提供服務時所處的狀態。OfflineReplica:副本服務下線時所處的狀態。ReplicaDeletionStarted:副本被刪除時所處的狀態。ReplicaDeletionSuccessful:副本被成功刪除后所處的狀態。ReplicaDeletionIneligible:開啟副本刪除,但副本暫時無法被刪除時所處的狀態。NonExistentReplica:副本從副本狀態機被移除前所處的狀態。具體到代碼而言,ReplicaState 接口及其實現對象定義了每種狀態的序號,以及合法的前置狀態。我以 OnlineReplica 代碼為例進行說明:

 1 // ReplicaState接口
 2 sealed trait ReplicaState {
 3   def state: Byte
 4   def validPreviousStates: Set[ReplicaState] // 定義合法的前置狀態
 5 }
 6 
 7 // OnlineReplica狀態
 8 case object OnlineReplica extends ReplicaState {
 9   val state: Byte = 2
10   val validPreviousStates: Set[ReplicaState] = Set(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible)
11 }

狀態轉換方法定義

在詳細介紹 handleStateChanges 方法前,我稍微花點時間,給你簡單介紹下其他 7 個方法都是做什么用的。就像前面說過的,這些方法主要是起輔助的作用。只有清楚了這些方法的用途,你才能更好地理解 handleStateChanges 的實現邏輯。logFailedStateChange:僅僅是記錄一條錯誤日志,表明執行了一次無效的狀態變更。

logInvalidTransition:同樣也是記錄錯誤之用,記錄一次非法的狀態轉換。

logSuccessfulTransition:記錄一次成功的狀態轉換操作。

getTopicPartitionStatesFromZk:從 ZooKeeper 中獲取指定分區的狀態信息,包括每個分區的 Leader 副本、ISR 集合等數據。

doRemoveReplicasFromIsr:把給定的副本對象從給定分區 ISR 中移除。

removeReplicasFromIsr:調用 doRemoveReplicasFromIsr 方法,實現將給定的副本對象從給定分區 ISR 中移除的功能。

doHandleStateChanges:執行狀態變更和轉換操作的主力方法。接下來,我們會詳細學習它的源碼部分。

handleStateChanges 方法

handleStateChange 方法的作用是處理狀態的變更,是對外提供狀態轉換操作的入口方法。其方法如下:

 1 override def handleStateChanges(
 2   replicas: Seq[PartitionAndReplica], 
 3   targetState: ReplicaState): Unit = {
 4   if (replicas.nonEmpty) {
 5     try {
 6       // 清空Controller待發送請求集合
 7       controllerBrokerRequestBatch.newBatch()
 8       // 將所有副本對象按照Broker進行分組,依次執行狀態轉換操作
 9       replicas.groupBy(_.replica).foreach {
10         case (replicaId, replicas) =>
11           doHandleStateChanges(replicaId, replicas, targetState)
12       }
13       // 發送對應的Controller請求給Broker
14       controllerBrokerRequestBatch.sendRequestsToBrokers(
15         controllerContext.epoch)
16     } catch {
17       // 如果Controller易主,則記錄錯誤日志然后拋出異常
18       case e: ControllerMovedException =>
19         error(s"Controller moved to another broker when moving some replicas to $targetState state", e)
20         throw e
21       case e: Throwable => error(s"Error while moving some replicas to $targetState state", e)
22     }
23   }
24 }

代碼邏輯總體上分為兩步:第 1 步是調用 doHandleStateChanges 方法執行真正的副本狀態轉換;第 2 步是給集群中的相應 Broker 批量發送請求。在執行第 1 步的時候,它會將 replicas 按照 Broker ID 進行分組。舉個例子,如果我們使用 < 主題名,分區號,副本 Broker ID> 表示副本對象,假設 replicas 為集合(<test, 0,="" 0="">, <test, 0,="" 1="">, <test, 1,="" 0="">, <test, 1,="" 1="">),那么,在調用 doHandleStateChanges 方法前,代碼會將 replicas 按照 Broker ID 進行分組,即變成:Map(0 -> Set(<test, 0,="" 0="">, <test, 1,="" 0="">),1 -> Set(<test, 0,="" 1="">, <test, 1,="" 1="">))。待這些都做完之后,代碼開始調用 doHandleStateChanges 方法,執行狀態轉換操作。這個方法看着很長,其實都是不同的代碼分支。

我們可以發現,代碼的第 1 步,會嘗試獲取給定副本對象在 Controller 端元數據緩存中的當前狀態,如果沒有保存某個副本對象的狀態,代碼會將其初始化為 NonExistentReplica 狀態。第 2 步,代碼根據不同 ReplicaState 中定義的合法前置狀態集合以及傳入的目標狀態(targetState),將給定的副本對象集合划分成兩部分:能夠合法轉換的副本對象集合,以及執行非法狀態轉換的副本對象集合。doHandleStateChanges 方法會為后者中的每個副本對象記錄一條錯誤日志。第 3 步,代碼攜帶能夠執行合法轉換的副本對象集合,進入到不同的代碼分支。

由於當前 Kafka 為副本定義了 7 類狀態,因此,這里的代碼分支總共有 7 路。我挑選幾路最常見的狀態轉換路徑詳細說明下,包括副本被創建時被轉換到 NewReplica 狀態,副本正常工作時被轉換到 OnlineReplica 狀態,副本停止服務后被轉換到 OfflineReplica 狀態。至於剩下的記錄代碼,你可以在課后自行學習下,它們的轉換操作原理大致是相同的。

第 1 路:轉換到 NewReplica 狀態

首先,我們先來看第 1 路,即目標狀態是 NewReplica 的代碼。代碼如下:

 1 case NewReplica =>
 2   // 遍歷所有能夠執行轉換的副本對象
 3   validReplicas.foreach { replica =>
 4     // 獲取該副本對象的分區對象,即<主題名,分區號>數據
 5     val partition = replica.topicPartition
 6     // 獲取副本對象的當前狀態
 7     val currentState = controllerContext.replicaState(replica)
 8     // 嘗試從元數據緩存中獲取該分區當前信息
 9     // 包括Leader是誰、ISR都有哪些副本等數據
10     controllerContext.partitionLeadershipInfo.get(partition) match {
11       // 如果成功拿到分區數據信息
12       case Some(leaderIsrAndControllerEpoch) =>
13         // 如果該副本是Leader副本
14         if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) {
15           val exception = new StateChangeFailedException(s"Replica $replicaId for partition $partition cannot be moved to NewReplica state as it is being requested to become leader")
16           // 記錄錯誤日志。Leader副本不能被設置成NewReplica狀態
17           logFailedStateChange(replica, currentState, OfflineReplica, exception)
18         // 否則,給該副本所在的Broker發送LeaderAndIsrRequest
19         // 向它同步該分區的數據, 之后給集群當前所有Broker發送
20         // UpdateMetadataRequest通知它們該分區數據發生變更
21         } else {
22           controllerBrokerRequestBatch
23             .addLeaderAndIsrRequestForBrokers(
24               Seq(replicaId),
25               replica.topicPartition,
26               leaderIsrAndControllerEpoch,
27               controllerContext.partitionFullReplicaAssignment(
28                 replica.topicPartition),
29               isNew = true)
30           if (traceEnabled)
31             logSuccessfulTransition(
32               stateLogger, replicaId, 
33               partition, currentState, NewReplica)
34           // 更新元數據緩存中該副本對象的當前狀態為NewReplica
35           controllerContext.putReplicaState(replica, NewReplica)
36         }
37       // 如果沒有相應數據
38       case None =>
39         if (traceEnabled)
40           logSuccessfulTransition(
41             stateLogger, replicaId, 
42             partition, currentState, NewReplica)
43         // 僅僅更新元數據緩存中該副本對象的當前狀態為NewReplica即可
44         controllerContext.putReplicaState(replica, NewReplica)
45     }
46   }

這一路主要做的事情是,嘗試從元數據緩存中,獲取這些副本對象的分區信息數據,包括分區的 Leader 副本在哪個 Broker 上、ISR 中都有哪些副本,等等。如果找不到對應的分區數據,就直接把副本狀態更新為 NewReplica。否則,代碼就需要給該副本所在的 Broker 發送請求,讓它知道該分區的信息。同時,代碼還要給集群所有運行中的 Broker 發送請求,讓它們感知到新副本的加入。

第 2 路:轉換到 OnlineReplica 狀態

下面我們來看第 2 路,即轉換副本對象到 OnlineReplica。剛剛我說過,這是副本對象正常工作時所處的狀態。我們來看下要轉換到這個狀態,源碼都做了哪些事情:

 1 case OnlineReplica =>
 2   validReplicas.foreach { replica =>
 3     // 獲取副本所在分區
 4     val partition = replica.topicPartition
 5     // 獲取副本當前狀態
 6     val currentState = controllerContext.replicaState(replica)
 7     currentState match {
 8       // 如果當前狀態是NewReplica
 9       case NewReplica =>
10         // 從元數據緩存中拿到分區副本列表
11         val assignment = controllerContext
12           .partitionFullReplicaAssignment(partition)
13         // 如果副本列表不包含當前副本,視為異常情況
14         if (!assignment.replicas.contains(replicaId)) {
15           error(s"Adding replica ($replicaId) that is not part of the assignment $assignment")
16           // 將該副本加入到副本列表中,並更新元數據緩存中該分區的副本列表
17           val newAssignment = assignment.copy(
18             replicas = assignment.replicas :+ replicaId)
19           controllerContext.updatePartitionFullReplicaAssignment(
20             partition, newAssignment)
21         }
22       // 如果當前狀態是其他狀態
23       case _ =>
24         // 嘗試獲取該分區當前信息數據
25         controllerContext.partitionLeadershipInfo
26           .get(partition) match {
27           // 如果存在分區信息
28           // 向該副本對象所在Broker發送請求,令其同步該分區數據
29           case Some(leaderIsrAndControllerEpoch) =>
30             controllerBrokerRequestBatch
31               .addLeaderAndIsrRequestForBrokers(Seq(replicaId),
32                 replica.topicPartition,
33                 leaderIsrAndControllerEpoch,
34                 controllerContext
35                   .partitionFullReplicaAssignment(partition), 
36                 isNew = false)
37           case None =>
38         }
39     }
40     if (traceEnabled)
41       logSuccessfulTransition(
42         stateLogger, replicaId, 
43         partition, currentState, OnlineReplica)
44     // 將該副本對象設置成OnlineReplica狀態
45     controllerContext.putReplicaState(replica, OnlineReplica)
46   }

代碼依然會對副本對象進行遍歷,並依次執行下面的幾個步驟。

第 1 步,獲取元數據中該副本所屬的分區對象,以及該副本的當前狀態。

第 2 步,查看當前狀態是否是 NewReplica。如果是,則獲取分區的副本列表,並判斷該副本是否在當前的副本列表中,假如不在,就記錄錯誤日志,並更新元數據中的副本列表;如果狀態不是 NewReplica,就說明,這是一個已存在的副本對象,那么,源碼會獲取對應分區的詳細數據,然后向該副本對象所在的 Broker 發送 LeaderAndIsrRequest 請求,令其同步獲知,並保存該分區數據。

第 3 步,將該副本對象狀態變更為 OnlineReplica。至此,該副本處於正常工作狀態。

第 3 路:轉換到 OfflineReplica 狀態

最后,再來看下第 3 路分支。這路分支要將副本對象的狀態轉換成 OfflineReplica。我依然以代碼注釋的方式給出主要的代碼邏輯:

 1 case OfflineReplica =>
 2   validReplicas.foreach { replica =>
 3     // 向副本所在Broker發送StopReplicaRequest請求,停止對應副本
 4     controllerBrokerRequestBatch
 5       .addStopReplicaRequestForBrokers(Seq(replicaId), 
 6         replica.topicPartition, deletePartition = false)
 7   }
 8   // 將副本對象集合划分成有Leader信息的副本集合和無Leader信息的副本集合
 9   val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = 
10     validReplicas.partition { replica =>
11       controllerContext.partitionLeadershipInfo
12         .contains(replica.topicPartition)
13     }
14   // 對於有Leader信息的副本集合而言從,
15   // 它們對應的所有分區中移除該副本對象並更新ZooKeeper節點
16   val updatedLeaderIsrAndControllerEpochs = 
17     removeReplicasFromIsr(replicaId,  
18       replicasWithLeadershipInfo.map(_.topicPartition))
19   // 遍歷每個更新過的分區信息
20   updatedLeaderIsrAndControllerEpochs.foreach {
21     case (partition, leaderIsrAndControllerEpoch) =>
22       stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")
23       // 如果分區對應主題並未被刪除
24       if (!controllerContext.isTopicQueuedUpForDeletion(
25         partition.topic)) {
26         // 獲取該分區除給定副本以外的其他副本所在的Broker  
27         val recipients = controllerContext
28           .partitionReplicaAssignment(partition)
29           .filterNot(_ == replicaId)
30         // 向這些Broker發送請求更新該分區更新過的分區LeaderAndIsr數據
31         controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(
32           recipients,
33           partition,
34           leaderIsrAndControllerEpoch,
35           controllerContext.partitionFullReplicaAssignment(partition), 
36           isNew = false)
37       }
38       val replica = PartitionAndReplica(partition, replicaId)
39       val currentState = controllerContext.replicaState(replica)
40       if (traceEnabled)
41         logSuccessfulTransition(stateLogger, replicaId, 
42           partition, currentState, OfflineReplica)
43       // 設置該分區給定副本的狀態為OfflineReplica
44       controllerContext.putReplicaState(replica, OfflineReplica)
45   }
46   // 遍歷無Leader信息的所有副本對象
47   replicasWithoutLeadershipInfo.foreach { replica =>
48     val currentState = controllerContext.replicaState(replica)
49     if (traceEnabled)
50       logSuccessfulTransition(stateLogger, replicaId, 
51         replica.topicPartition, currentState, OfflineReplica)
52      // 向集群所有Broker發送請求,更新對應分區的元數據
53     controllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(
54       controllerContext.liveOrShuttingDownBrokerIds.toSeq,
55       Set(replica.topicPartition))
56     // 設置該分區給定副本的狀態為OfflineReplica
57     controllerContext.putReplicaState(replica, OfflineReplica)
58   }

首先,代碼會給所有符合狀態轉換的副本所在的 Broker,發送 StopReplicaRequest 請求,顯式地告訴這些 Broker 停掉其上的對應副本。Kafka 的副本管理器組件(ReplicaManager)負責處理這個邏輯。后面我們會用兩節課的時間專門討論 ReplicaManager 的實現,這里你只需要了解,StopReplica 請求被發送出去之后,這些 Broker 上對應的副本就停止工作了。其次,代碼根據分區是否保存了 Leader 信息,將副本集合划分成兩個子集:有 Leader 副本集合和無 Leader 副本集合。有無 Leader 信息並不僅僅包含 Leader,還有 ISR 和 controllerEpoch 等數據。不過,你大致可以認為,副本集合是根據有無 Leader 進行划分的。接下來,源碼會遍歷有 Leader 的子集合,向這些副本所在的 Broker 發送 LeaderAndIsrRequest 請求,去更新停止副本操作之后的分區信息,再把這些分區狀態設置為 OfflineReplica。最后,源碼遍歷無 Leader 的子集合,執行與上一步非常類似的操作。只不過,對於無 Leader 而言,因為我們沒有執行任何 Leader 選舉操作,所以給這些副本所在的 Broker 發送的就不是 LeaderAndIsrRequest 請求了,而是 UpdateMetadataRequest 請求,顯式去告知它們更新對應分區的元數據即可,然后再把副本狀態設置為 OfflineReplica。從這段描述中,我們可以知道,把副本狀態變更為 OfflineReplica 的主要邏輯,其實就是停止對應副本 + 更新遠端 Broker 元數據的操作。

PartitionStateMachine

代碼總共有 5 大部分。PartitionStateMachine:分區狀態機抽象類。它定義了諸如 startup、shutdown 這樣的公共方法,同時也給出了處理分區狀態轉換入口方法 handleStateChanges 的簽名。ZkPartitionStateMachine:PartitionStateMachine 唯一的繼承子類。它實現了分區狀態機的主體邏輯功能。和 ZkReplicaStateMachine 類似,ZkPartitionStateMachine 重寫了父類的 handleStateChanges 方法,並配以私有的 doHandleStateChanges 方法,共同實現分區狀態轉換的操作。PartitionState 接口及其實現對象:定義 4 類分區狀態,分別是 NewPartition、OnlinePartition、OfflinePartition 和 NonExistentPartition。除此之外,還定義了它們之間的流轉關系。PartitionLeaderElectionStrategy 接口及其實現對象:定義 4 類分區 Leader 選舉策略。你可以認為它們是發生 Leader 選舉的 4 種場景。PartitionLeaderElectionAlgorithms:分區 Leader 選舉的算法實現。既然定義了 4 類選舉策略,就一定有相應的實現代碼,PartitionLeaderElectionAlgorithms 就提供了這 4 類選舉策略的實現代碼。

每個 Broker 啟動時,都會創建對應的分區狀態機和副本狀態機實例,但只有 Controller 所在的 Broker 才會啟動它們。如果 Controller 變更到其他 Broker,老 Controller 所在的 Broker 要調用這些狀態機的 shutdown 方法關閉它們,新 Controller 所在的 Broker 調用狀態機的 startup 方法啟動它們。

分區狀態

既然 ZkPartitionStateMachine 是管理分區狀態轉換的,那么,我們至少要知道分區都有哪些狀態,以及 Kafka 規定的轉換規則是什么。這就是 PartitionState 接口及其實現對象做的事情。和 ReplicaState 類似,PartitionState 定義了分區的狀態空間以及流轉規則。我以 OnlinePartition 狀態為例,說明下代碼是如何實現流轉的:

1 sealed trait PartitionState {
2   def state: Byte // 狀態序號,無實際用途
3   def validPreviousStates: Set[PartitionState] // 合法前置狀態集合
4 }
5 
6 case object OnlinePartition extends PartitionState {
7   val state: Byte = 1
8   val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
9 }

如代碼所示,每個 PartitionState 都定義了名為 validPreviousStates 的集合,也就是每個狀態對應的合法前置狀態集。對於 OnlinePartition 而言,它的合法前置狀態集包括 NewPartition、OnlinePartition 和 OfflinePartition。在 Kafka 中,從合法狀態集以外的狀態向目標狀態進行轉換,將被視為非法操作。目前,Kafka 為分區定義了 4 類狀態。

NewPartition:分區被創建后被設置成這個狀態,表明它是一個全新的分區對象。處於這個狀態的分區,被 Kafka 認為是“未初始化”,因此,不能選舉 Leader。

OnlinePartition:分區正式提供服務時所處的狀態。

OfflinePartition:分區下線后所處的狀態。

NonExistentPartition:分區被刪除,並且從分區狀態機移除后所處的狀態。

分區 Leader 選舉的場景及方法

剛剛我們說了兩個狀態機的相同點,接下來,我們要學習的分區 Leader 選舉,可以說是 PartitionStateMachine 特有的功能了。每個分區都必須選舉出 Leader 才能正常提供服務,因此,對於分區而言,Leader 副本是非常重要的角色。既然這樣,我們就必須要了解 Leader 選舉什么流程,以及在代碼中是如何實現的。我們重點學習下選舉策略以及具體的實現方法代碼。

PartitionLeaderElectionStrategy

先明確下分區 Leader 選舉的含義,其實很簡單,就是為 Kafka 主題的某個分區推選 Leader 副本。那么,Kafka 定義了哪幾種推選策略,或者說,在什么情況下需要執行 Leader 選舉呢?這就是 PartitionLeaderElectionStrategy 接口要做的事情,請看下面的代碼:

 1 // 分區Leader選舉策略接口
 2 sealed trait PartitionLeaderElectionStrategy
 3 // 離線分區Leader選舉策略
 4 final case class OfflinePartitionLeaderElectionStrategy(
 5   allowUnclean: Boolean) extends PartitionLeaderElectionStrategy
 6 // 分區副本重分配Leader選舉策略  
 7 final case object ReassignPartitionLeaderElectionStrategy 
 8   extends PartitionLeaderElectionStrategy
 9 // 分區Preferred副本Leader選舉策略
10 final case object PreferredReplicaPartitionLeaderElectionStrategy 
11   extends PartitionLeaderElectionStrategy
12 // Broker Controlled關閉時Leader選舉策略
13 final case object ControlledShutdownPartitionLeaderElectionStrategy 
14   extends PartitionLeaderElectionStrategy

當前,分區 Leader 選舉有 4 類場景。

OfflinePartitionLeaderElectionStrategy:因為 Leader 副本下線而引發的分區 Leader 選舉。

ReassignPartitionLeaderElectionStrategy:因為執行分區副本重分配操作而引發的分區 Leader 選舉。

PreferredReplicaPartitionLeaderElectionStrategy:因為執行 Preferred 副本 Leader 選舉而引發的分區 Leader 選舉。

ControlledShutdownPartitionLeaderElectionStrategy:因為正常關閉 Broker 而引發的分區 Leader 選舉。

PartitionLeaderElectionAlgorithms

針對這 4 類場景,分區狀態機的 PartitionLeaderElectionAlgorithms 對象定義了 4 個方法,分別負責為每種場景選舉 Leader 副本,這 4 種方法是:

offlinePartitionLeaderElection;

reassignPartitionLeaderElection;

preferredReplicaPartitionLeaderElection;

controlledShutdownPartitionLeaderElection。

offlinePartitionLeaderElection 方法的邏輯是這 4 個方法中最復雜的,我們就先從它開始學起。

 1 def offlinePartitionLeaderElection(assignment: Seq[Int], 
 2   isr: Seq[Int], liveReplicas: Set[Int], 
 3   uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = {
 4   // 從當前分區副本列表中尋找首個處於存活狀態的ISR副本
 5   assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse {
 6     // 如果找不到滿足條件的副本,查看是否允許Unclean Leader選舉
 7     // 即Broker端參數unclean.leader.election.enable是否等於true
 8     if (uncleanLeaderElectionEnabled) {
 9       // 選擇當前副本列表中的第一個存活副本作為Leader
10       val leaderOpt = assignment.find(liveReplicas.contains)
11       if (leaderOpt.isDefined)
12         controllerContext.stats.uncleanLeaderElectionRate.mark()
13       leaderOpt
14     } else {
15       None // 如果不允許Unclean Leader選舉,則返回None表示無法選舉Leader
16     }
17   }
18 }

處理分區狀態轉換的方法

掌握了剛剛的這些知識之后,現在,我們正式來看 PartitionStateMachine 的工作原理。

handleStateChanges

如果用一句話概括 handleStateChanges 的作用,應該這樣說:handleStateChanges 把 partitions 的狀態設置為 targetState,同時,還可能需要用 leaderElectionStrategy 策略為 partitions 選舉新的 Leader,最終將 partitions 的 Leader 信息返回。其中,partitions 是待執行狀態變更的目標分區列表,targetState 是目標狀態,leaderElectionStrategy 是一個可選項,如果傳入了,就表示要執行 Leader 選舉。下面是 handleStateChanges 方法的完整代碼,我以注釋的方式給出了主要的功能說明:

 1 override def handleStateChanges(
 2     partitions: Seq[TopicPartition],
 3     targetState: PartitionState,
 4     partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
 5   ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
 6     if (partitions.nonEmpty) {
 7       try {
 8         // 清空Controller待發送請求集合,准備本次請求發送
 9         controllerBrokerRequestBatch.newBatch()
10         // 調用doHandleStateChanges方法執行真正的狀態變更邏輯
11         val result = doHandleStateChanges(
12           partitions,
13           targetState,
14           partitionLeaderElectionStrategyOpt
15         )
16         // Controller給相關Broker發送請求通知狀態變化
17         controllerBrokerRequestBatch.sendRequestsToBrokers(
18           controllerContext.epoch)
19         // 返回狀態變更處理結果
20         result
21       } catch {
22         // 如果Controller易主,則記錄錯誤日志,然后重新拋出異常
23         // 上層代碼會捕獲該異常並執行maybeResign方法執行卸任邏輯
24         case e: ControllerMovedException =>
25           error(s"Controller moved to another broker when moving some partitions to $targetState state", e)
26           throw e
27         // 如果是其他異常,記錄錯誤日志,封裝錯誤返回
28         case e: Throwable =>
29           error(s"Error while moving some partitions to $targetState state", e)
30           partitions.iterator.map(_ -> Left(e)).toMap
31       }
32     } else { // 如果partitions為空,什么都不用做
33       Map.empty
34     }
35   }

doHandleStateChanges

 1 private def doHandleStateChanges(
 2     partitions: Seq[TopicPartition],
 3     targetState: PartitionState,
 4     partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
 5   ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
 6     val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
 7     val traceEnabled = stateChangeLog.isTraceEnabled
 8     // 初始化新分區的狀態為NonExistentPartition
 9     partitions.foreach(partition => controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition))
10     // 找出要執行非法狀態轉換的分區,記錄錯誤日志
11     val (validPartitions, invalidPartitions) = controllerContext.checkValidPartitionStateChange(partitions, targetState)
12     invalidPartitions.foreach(partition => logInvalidTransition(partition, targetState))
13     // 根據targetState進入到不同的case分支
14     targetState match {
15       ......
16     }
17 }

這個方法首先會做狀態初始化的工作,具體來說就是,在方法調用時,不在元數據緩存中的所有分區的狀態,會被初始化為 NonExistentPartition。接着,檢查哪些分區執行的狀態轉換不合法,並為這些分區記錄相應的錯誤日志。之后,代碼攜合法狀態轉換的分區列表進入到 case 分支。由於分區狀態只有 4 個,因此,它的 case 分支代碼遠比 ReplicaStateMachine 中的簡單,而且,只有 OnlinePartition 這一路的分支邏輯相對復雜,其他 3 路僅僅是將分區狀態設置成目標狀態而已,所以,我們來深入研究下目標狀態是 OnlinePartition 的分支。

 1 case OnlinePartition =>
 2   // 獲取未初始化分區列表,也就是NewPartition狀態下的所有分區
 3   val uninitializedPartitions = validPartitions.filter(
 4     partition => partitionState(partition) == NewPartition)
 5   // 獲取具備Leader選舉資格的分區列表
 6   // 只能為OnlinePartition和OfflinePartition狀態的分區選舉Leader 
 7   val partitionsToElectLeader = validPartitions.filter(
 8     partition => partitionState(partition) == OfflinePartition ||
 9      partitionState(partition) == OnlinePartition)
10   // 初始化NewPartition狀態分區,在ZooKeeper中寫入Leader和ISR數據
11   if (uninitializedPartitions.nonEmpty) {
12     val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions)
13     successfulInitializations.foreach { partition =>
14       stateChangeLog.info(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
15         s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
16       controllerContext.putPartitionState(partition, OnlinePartition)
17     }
18   }
19   // 為具備Leader選舉資格的分區推選Leader
20   if (partitionsToElectLeader.nonEmpty) {
21     val electionResults = electLeaderForPartitions(
22       partitionsToElectLeader,
23       partitionLeaderElectionStrategyOpt.getOrElse(
24         throw new IllegalArgumentException("Election strategy is a required field when the target state is OnlinePartition")
25       )
26     )
27     electionResults.foreach {
28       case (partition, Right(leaderAndIsr)) =>
29         stateChangeLog.info(
30           s"Changed partition $partition from ${partitionState(partition)} to $targetState with state $leaderAndIsr"
31         )
32         // 將成功選舉Leader后的分區設置成OnlinePartition狀態
33         controllerContext.putPartitionState(
34           partition, OnlinePartition)
35       case (_, Left(_)) => // 如果選舉失敗,忽略之
36     }
37     // 返回Leader選舉結果
38     electionResults
39   } else {
40     Map.empty
41   }

第 1 步是為 NewPartition 狀態的分區做初始化操作,也就是在 ZooKeeper 中,創建並寫入分區節點數據。節點的位置是/brokers/topics//partitions/,每個節點都要包含分區的 Leader 和 ISR 等數據。而 Leader 和 ISR 的確定規則是:選擇存活副本列表的第一個副本作為 Leader;選擇存活副本列表作為 ISR。至於具體的代碼,可以看下 initializeLeaderAndIsrForPartitions 方法代碼片段的倒數第 5 行:

 1 private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicPartition]): Seq[TopicPartition] = {
 2   ......
 3     // 獲取每個分區的副本列表
 4     val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition))
 5     // 獲取每個分區的所有存活副本
 6     val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) =>
 7         val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))
 8         partition -> liveReplicasForPartition
 9     }
10     // 按照有無存活副本對分區進行分組
11     // 分為兩組:有存活副本的分區;無任何存活副本的分區
12     val (partitionsWithoutLiveReplicas, partitionsWithLiveReplicas) = liveReplicasPerPartition.partition { case (_, liveReplicas) => liveReplicas.isEmpty }
13     ......
14     // 為"有存活副本的分區"確定Leader和ISR
15     // Leader確認依據:存活副本列表的首個副本被認定為Leader
16     // ISR確認依據:存活副本列表被認定為ISR
17     val leaderIsrAndControllerEpochs = partitionsWithLiveReplicas.map { case (partition, liveReplicas) =>
18       val leaderAndIsr = LeaderAndIsr(liveReplicas.head, liveReplicas.toList)
19       ......
20     }.toMap
21     ......
22 }

第 2 步是為具備 Leader 選舉資格的分區推選 Leader,代碼調用 electLeaderForPartitions 方法實現。這個方法會不斷嘗試為多個分區選舉 Leader,直到所有分區都成功選出 Leader。選舉 Leader 的核心代碼位於 doElectLeaderForPartitions 方法中,該方法主要有 3 步。

這個方法大體分為 3 步。第 1 步是從 ZooKeeper 中獲取給定分區的 Leader、ISR 信息,並將結果封裝進名為 validLeaderAndIsrs 的容器中,代碼如下:

 

 1 // doElectLeaderForPartitions方法的第1部分
 2 val getDataResponses = try {
 3   // 批量獲取ZooKeeper中給定分區的znode節點數據
 4   zkClient.getTopicPartitionStatesRaw(partitions)
 5 } catch {
 6   case e: Exception =>
 7     return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty)
 8 }
 9 // 構建兩個容器,分別保存可選舉Leader分區列表和選舉失敗分區列表
10 val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
11 val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]
12 // 遍歷每個分區的znode節點數據
13 getDataResponses.foreach { getDataResponse =>
14   val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
15   val currState = partitionState(partition)
16   // 如果成功拿到znode節點數據
17   if (getDataResponse.resultCode == Code.OK) {
18     TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
19       // 節點數據中含Leader和ISR信息
20       case Some(leaderIsrAndControllerEpoch) =>
21         // 如果節點數據的Controller Epoch值大於當前Controller Epoch值
22         if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
23           val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " +
24             s"already written by another controller. This probably means that the current controller $controllerId went through " +
25             s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
26           // 將該分區加入到選舉失敗分區列表
27           failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
28         } else {
29           // 將該分區加入到可選舉Leader分區列表 
30           validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr
31         }
32       // 如果節點數據不含Leader和ISR信息
33       case None =>
34         val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
35         // 將該分區加入到選舉失敗分區列表
36         failedElections.put(partition, Left(exception))
37     }
38   // 如果沒有拿到znode節點數據,則將該分區加入到選舉失敗分區列表
39   } else if (getDataResponse.resultCode == Code.NONODE) {
40     val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
41     failedElections.put(partition, Left(exception))
42   } else {
43     failedElections.put(partition, Left(getDataResponse.resultException.get))
44   }
45 }
46 
47 if (validLeaderAndIsrs.isEmpty) {
48   return (failedElections.toMap, Seq.empty)
49 }

 

首先,代碼會批量讀取 ZooKeeper 中給定分區的所有 Znode 數據。之后,會構建兩個容器,分別保存可選舉 Leader 分區列表和選舉失敗分區列表。接着,開始遍歷每個分區的 Znode 節點數據,如果成功拿到 Znode 節點數據,節點數據包含 Leader 和 ISR 信息且節點數據的 Controller Epoch 值小於當前 Controller Epoch 值,那么,就將該分區加入到可選舉 Leader 分區列表。倘若發現 Zookeeper 中保存的 Controller Epoch 值大於當前 Epoch 值,說明該分區已經被一個更新的 Controller 選舉過 Leader 了,此時必須終止本次 Leader 選舉,並將該分區放置到選舉失敗分區列表中。遍歷完這些分區之后,代碼要看下 validLeaderAndIsrs 容器中是否包含可選舉 Leader 的分區。如果一個滿足選舉 Leader 的分區都沒有,方法直接返回。至此,doElectLeaderForPartitions 方法的第一大步完成。下面,我們看下該方法的第 2 部分代碼:

 1 // doElectLeaderForPartitions方法的第2部分
 2 // 開始選舉Leader,並根據有無Leader將分區進行分區
 3 val (partitionsWithoutLeaders, partitionsWithLeaders) = 
 4   partitionLeaderElectionStrategy match {
 5   case OfflinePartitionLeaderElectionStrategy(allowUnclean) =>
 6     val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(
 7       validLeaderAndIsrs,
 8       allowUnclean
 9     )
10     // 為OffinePartition分區選舉Leader
11     leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty)
12   case ReassignPartitionLeaderElectionStrategy =>
13     // 為副本重分配的分區選舉Leader
14     leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
15   case PreferredReplicaPartitionLeaderElectionStrategy =>
16     // 為分區執行Preferred副本Leader選舉
17     leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
18   case ControlledShutdownPartitionLeaderElectionStrategy =>
19     // 為因Broker正常關閉而受影響的分區選舉Leader
20     leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
21 }

這一步是根據給定的 PartitionLeaderElectionStrategy,調用 PartitionLeaderElectionAlgorithms 的不同方法執行 Leader 選舉,同時,區分出成功選舉 Leader 和未選出 Leader 的分區。前面說過了,這 4 種不同的策略定義了 4 個專屬的方法來進行 Leader 選舉。其實,如果你打開這些方法的源碼,就會發現它們大同小異。基本上,選擇 Leader 的規則,就是選擇副本集合中首個存活且處於 ISR 中的副本作為 Leader。現在,我們再來看這個方法的最后一部分代碼,這一步主要是更新 ZooKeeper 節點數據,以及 Controller 端元數據緩存信息。

 1 // doElectLeaderForPartitions方法的第3部分
 2 // 將所有選舉失敗的分區全部加入到Leader選舉失敗分區列表
 3 partitionsWithoutLeaders.foreach { electionResult =>
 4   val partition = electionResult.topicPartition
 5   val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
 6   failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
 7 }
 8 val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap
 9 val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
10 // 使用新選舉的Leader和ISR信息更新ZooKeeper上分區的znode節點數據
11 val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(
12   adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
13 // 對於ZooKeeper znode節點數據更新成功的分區,封裝對應的Leader和ISR信息
14 // 構建LeaderAndIsr請求,並將該請求加入到Controller待發送請求集合
15 // 等待后續統一發送
16 finishedUpdates.foreach { case (partition, result) =>
17   result.foreach { leaderAndIsr =>
18     val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
19     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
20     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
21     controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
22       leaderIsrAndControllerEpoch, replicaAssignment, isNew = false)
23   }
24 }
25 // 返回選舉結果,包括成功選舉並更新ZooKeeper節點的分區、選舉失敗分區以及
26 // ZooKeeper節點更新失敗的分區
27 (finishedUpdates ++ failedElections, updatesToRetry)

首先,將上一步中所有選舉失敗的分區,全部加入到 Leader 選舉失敗分區列表。然后,使用新選舉的 Leader 和 ISR 信息,更新 ZooKeeper 上分區的 Znode 節點數據。對於 ZooKeeper Znode 節點數據更新成功的那些分區,源碼會封裝對應的 Leader 和 ISR 信息,構建 LeaderAndIsr 請求,並將該請求加入到 Controller 待發送請求集合,等待后續統一發送。最后,方法返回選舉結果,包括成功選舉並更新 ZooKeeper 節點的分區列表、選舉失敗分區列表,以及 ZooKeeper 節點更新失敗的分區列表。

總結

以后關於kafka系列的總結大部分來自Geek Time的課件,大家可以自行關鍵字搜索。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM