1.概述
平時在使用Kafka的時候,可能關注的更多的是Kafka系統層面的。今天來給大家剖析一下Kafka的控制器,了解一下Kafka控制器的選舉流程。
2.內容
Kafka控制器,其實就是一個Kafka系統的Broker。它除了具有一般Broker的功能之外,還具有選舉主題分區Leader節點的功能。在啟動Kafka系統時,其中一個Broker會被選舉為控制器,負責管理主題分區和副本狀態,還會執行分區重新分配的管理任務。
如果在Kafka系統運行過程中,當前的控制器出現故障導致不可用,那么Kafka系統會從其他正常運行的Broker中重新選舉出新的控制器。
2.1 控制器啟動順序
在Kafka集群中,每個Broker在啟動時會實例化一個KafkaController類。該類會執行一系列業務邏輯,選舉出主題分區的Leader節點,步驟如下:
- 第一個啟動的代理節點,會在Zookeeper系統里面創建一個臨時節點/controller,並寫入該節點的注冊信息,使該節點成為控制器;
- 其他的代理節點陸續啟動時,也會嘗試在Zookeeper系統中創建/controller節點,但是由於/controller節點已經存在,所以會拋出“創建/controller節點失敗異常”的信息。創建失敗的代理節點會根據返回的結果,判斷出在Kafka集群中已經有一個控制器被成功創建了,所以放棄創建/controller節點,這樣就確保了Kafka集群控制器的唯一性;
- 其他的代理節點,會在控制器上注冊相應的監聽器,各個監聽器負責監聽各自代理節點的狀態變化。當監聽到節點狀態發生變化時,會觸發相應的監聽函數進行處理。
2.2 如何查看控制器優先級 ?
控制器創建的優先級是按照Kafka系統代理節點成功啟動的順序來創建的。用戶可以通過改變Kafka系統代理節點的啟動順序,來查看控制器的創建優先級。之后,可以在Zookeeper系統中查看/controller臨時節點的內容,例如:
# 進入Zookeeper集群 [hadoop@dn1 bin]$ zkCli.sh -server dn1:2181 # 執行查看命令 [zk: dn1:2181(CONNECTED) 1] get /controller
成功執行命令后,可以看到代理節點0(即dn1節點)上成功創建了控制器,如下圖所示:

當前啟動順序為:dn1、dn2、dn3,修改啟動順序為:dn3、dn1、dn2。再次查看Zookeeper系統中執行“get /controller”命令,輸出結果如下圖所示:

2.3 切換控制器所屬的代理節點
當控制器被關閉或者與Zookeeper系統斷開連接時,Zookeeper系統上的臨時節點就會被清除。Kafka集群中的監聽器會接收到變更通知,各個代理節點會嘗試到Zookeeper系統中創建一個控制器的臨時節點。第一個成功在Zookeeper系統中創建的代理節點,將會成為新的控制器。每個新選舉出來的控制器,會在Zookeeper系統中獲取一個遞增的controller_epoch值。
3.主題分區Leader節點的選舉過程
選舉控制器的核心思路是:各個代理節點公平競爭搶占Zookeeper系統中創建/controller臨時節點,最先創建成功的代理節點會成為控制器,並擁有選舉主題分區Leader節點的功能。選舉流程如下圖所示:

當Kafka系統實例化KafkaController類時,主題分區Leader節點的選舉流程便會開始。其中涉及的核心類包含KafkaController、ZookeeperLeaderElector、LeaderChangeListener、SessionExpirationListener。
- KafkaController:在實例化ZookeeperLeaderElector類時,分別設置了兩個關鍵的回調函數,即onControllerFailover和onControllerResignation;
- ZookeeperLeaderElector:實現主題分區的Leader節點選舉功能,但是它並不會處理“代理節點與Zookeeper系統之間出現的會話超時”這種情況,它主要負責創建元數據存儲路徑、實例化變更監聽器等,並通過訂閱數據變更監聽器來實時監聽數據的變化,進而開始執行選舉Leader的邏輯;
- LeaderChangeListener:如果節點數據發送變化,則Kafka系統中的其他代理節點可能已經成為Leader,接着Kafka控制器會調用onResigningAsLeader函數。當Kafka代理節點宕機或者被人為誤刪除時,則處於該節點上的Leader會被重新選舉,通過調用onResigningAsLeader函數重新選擇其他正常運行的代理節點成為新的Leader;
- SessionExpirationListener:當Kafka系統的代理節點和Zookeeper系統建立連接后,SessionExpirationListener中的handleNewSession函數會被調用,對於Zookeeper系統中會話過期的連接,會先進行一次判斷。
4.注冊分區和副本狀態機
Kafka系統的控制器主要負責管理主題、分區和副本。 Kafka系統在操作主題、分區和副本時,控制器會在Zookeeper系統的/brokers/topics節點,以及其子節點路徑上注冊一系列的監聽器。 使用Kafka應用接口或者是Kafka系統腳本創建一個主題時,服務端會將創建后的結果返回給客戶端。當客戶端收到創建成功的提示時,其實服務端並沒有實際創建主題,而只是在Zookeeper系統的/brokers/topics節點中創建了該主題對應的子節點名稱。
代理節點調用onBecomingLeader()函數實際上調用的是onControllerFailover()函數,所以在控制器調用onControllerFailover()函數時,會在初始化階段分別創建分區狀態機和副本狀態機。代碼如下所示:
def onControllerFailover() { if(isRunning) { info("Broker %d starting become controller state transition".format(config.brokerId)) readControllerEpochFromZookeeper() incrementControllerEpoch(zkUtils.zkClient) // 在/brokers/topics節點注冊監聽器 registerReassignedPartitionsListener() registerIsrChangeNotificationListener() registerPreferredReplicaElectionListener() partitionStateMachine.registerListeners() // 注冊分區狀態機 replicaStateMachine.registerListeners() // 注冊副本狀態機 initializeControllerContext() // 在控制器初始化之后,在狀態機啟動之前,需要發送更新元數據請求 sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) replicaStateMachine.startup() // 啟動副本狀態機 partitionStateMachine.startup() // 啟動分區狀態機 // 在自動故障轉移中為所有主題注冊分區更改監聽器 controllerContext.allTopics.foreach(topic => partitionStateMachine. registerPartitionChangeListener(topic)) info("Broker %d is ready to serve as the new controller with epoch %d". format(config.brokerId, epoch)) maybeTriggerPartitionReassignment() maybeTriggerPreferredReplicaElection() if (config.autoLeaderRebalanceEnable) { info("starting the partition rebalance scheduler") autoRebalanceScheduler.startup() autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, 5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS) } deleteTopicManager.start() } else info("Controller has been shut down, aborting startup/failover") }
主題的分區狀態機通過registerListeners()函數,在Zookeeper系統中的/brokers/topics節點上注冊了TopicChangeListener和DeleteTopicListener兩個監聽器。創建一個主題時,主題信息、主題分區和副本會被寫到Zookeeper系統的/brokers/topics節點中,這就會觸發分區和副本狀態機注冊監聽器。
5.總結
Kafka系統整體來說,調試還算方便。下載Kafka源代碼,導入到IDE中,就可以啟動整個Kafka系統了,可以通過DEBUG的方式來親自了解控制器的執行流程。
6.結束語
這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!
另外,博主出書了《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那里點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。
