Kafka partition 副本遷移與broker上下線
1 前言
Controller 在初始化時,會利用 ZK 的 watch 機制注冊很多不同類型的監聽器,當監聽的事件被觸發時,Controller 就會觸發相應的操作。
Controller 在初始化時,會注冊多種類型的監聽器,主要有以下幾種:
- l 監聽 /admin/reassign_partitions 節點,用於分區副本遷移的監聽;
- l 監聽 /isr_change_notification 節點,用於 Partition Isr 變動的監聽,;
- l 監聽 /admin/preferred_replica_election 節點,用於需要進行 Partition 最優 leader 選舉的監聽;
- l 監聽 /brokers/topics 節點,用於 Topic 新建的監聽;
- l 監聽 /brokers/topics/TOPIC_NAME 節點,用於 Topic Partition 擴容的監聽;
- l 監聽 /admin/delete_topics 節點,用於 Topic 刪除的監聽;
- l 監聽 /brokers/ids 節點,用於 Broker 上下線的監聽。
2 Partition 副本遷移整體流程
Partition 的副本遷移實際上就是將分區的副本重新分配到不同的代理節點上,如果 zk 中新副本的集合與 Partition 原來的副本集合相同,那么這個副本就不需要重新分配了。
Partition 的副本遷移是通過監聽 zk 的 /admin/reassign_partitions 節點觸發的,Kafka 也向用戶提供相應的腳本工具進行副本遷移,副本遷移的腳本使用方法如下所示:

在調用腳本向 zk 提交 Partition 的遷移計划時,遷移計划更新到 zk 前需要進行一步判斷,如果該節點(寫入遷移計划的節點)已經存在,即副本遷移還在進行,那么本次副本遷移計划是無法提交的,實現的邏輯如下所示:

2.1 ZK PartitionsReassignedListener 副本遷移處理
在 zk 的 /admin/reassign_partitions 節點數據有變化時,就會觸發 PartitionsReassignedListener 的 doHandleDataChange() 方法,實現如下:

如果 Partition 出現下面的情況,將不會進行副本遷移,直接將 Partition 的遷移計划從 ZK 移除:
- l 這個 Partition 的 reassignment 之前已經存在, 即正在遷移中;
- l 這個 Partition 新分配的 replica 與之前的 replicas 相同;
- l 這個 Partition 所有新分配 replica 都已經 dead;
- l 這個 Partition 已經被設置了刪除標志。
對於可以進行副本遷移的 Partition 集合,這里將會調用 Kafka Controller 的 initiateReassignReplicasForTopicPartition() 方法對每個 Partition 進行處理。
2.2 副本遷移初始化
進行了前面的判斷后,這個 Partition 滿足了可以遷移的條件,Controller 會首先初始化副本遷移的流程,實現如下所示

對於副本遷移流程初始化如下:
- l 通過 watchIsrChangesForReassignedPartition() 方法監控這個 Partition 的 LeaderAndIsr 變化,如果有新的副本數據同步完成,那么 leader 會將其加到 isr 中更新到 zk 中,這時候 Controller 是可以接收到相關的信息通知的;
- l 將正在遷移的 Partition 添加到 partitionsBeingReassigned 中,它會記錄當前正在遷移的 Partition 列表;
- l 將要遷移的 Topic 設置為非法刪除刪除狀態,在這個狀態的 Topic 是無法進行刪除的;
- l 調用 onPartitionReassignment(),進行副本遷移。
在第一步中,會向這個 Partition 注冊一個額外的監聽器,監聽其 LeaderAndIsr 信息變化,如下所示:

如果該 Partition 的 LeaderAndIsr 信息有變動,那么就會觸發這個 listener 的 doHandleDataChange() 方法:
- l 首先檢查這個 Partition 是否在還在遷移中,不在的話直接結束流程,因為這個監聽器本來就是為了 Partition 副本遷移而服務的;
- l 從 zk 獲取最新的 leader 和 isr 信息,如果新分配的副本全部都在 isr 中,那么就再次觸發 controller 的 onPartitionReassignment() 方法,再次調用時實際上已經證明了這個 Partition 的副本遷移已經完成,否則的話就會不進行任何處理,等待新分配的所有副本遷移完成。
2.3 副本遷移
Partition 副本遷移真正實際處理是在 Controller 的 onPartitionReassignment() 方法完成的,在看這個方法之前,先介紹幾個基本的概念(假設一個 Partition 原來的 replica 是 {1、2、3},新分配的副本列表是:{2、3、4}):
- RAR = Reassigned replicas,即新分配的副本列表,也就是 {2、3、4};
- OAR = Original list of replicas for partition,即這個 Partition 原來的副本列表,也就是 {1、2、3};
- AR = current assigned replicas,該 Partition 當前的副本列表,這個會隨着階段的不同而變化;
- RAR-OAR:需要創建、數據同步的新副本,也就是 {4};
- OAR-RAR:需要刪除的副本,也就是{1}
這個方法的實現如下所示:

這個方法整體分為以下12個步驟:
- l 把 AR = OAR+RAR ({1、2、3、4})更新到 zk 及本地 Controller 緩存中;
- l 發送 LeaderAndIsr 給 AR 中每一個副本,並且會強制更新 zk 中 leader 的 epoch;
- l 創建需要新建的副本(【RAR-OAR】,即 {4}),將其狀態設置為 NewReplica;
- l 等待直到 RAR({2、3、4}) 中的所有副本都在 ISR 中;
- l 把 RAR({2、3、4}) 中的所有副本設置為 OnReplica 狀態;
- l 將緩存中 AR 更新為 RAR(重新分配的副本列表,即 {2、3、4});
- l 如果 leader 不在 RAR 中, 就從 RAR 選擇對應的 leader, 然后發送 LeaderAndIsr 請求;如果不需要,那么只會更新 leader epoch,然后發送 LeaderAndIsr 請求; 在發送 LeaderAndIsr 請求前設置了 AR=RAR, 這將確保了 leader 在 isr 中不會添加任何 【RAR-OAR】中的副本(old replica,即 {1});
- l 將【OAR-RAR】({1})中的副本設置為 OfflineReplica 狀態,OfflineReplica 狀態的變化,將會從 ISR 中刪除【OAR-RAR】的副本,更新到 zk 中並發送 LeaderAndIsr 請求給 leader,通知 leader isr 變動。之后再發送 StopReplica 請求(delete=false)給【OAR-RAR】中的副本;
- l 將【OAR-RAR】中的副本設置為 NonExistentReplica 狀態。這將發送 StopReplica 請求(delete=true)給【OAR-RAR】中的副本,這些副本將會從本地上刪除數據;
- l 在 zk 中更新 AR 為 RAR;
- l 更新 zk 中路徑 【/admin/reassign_partitions】信息,移除已經成功遷移的 Partition;
- l leader 選舉之后,這個 replica 和 isr 信息將會變動,發送 metadata 更新給所有的 broker。
上面的流程簡單來說,就是先創建新的 replica,開始同步數據,等待所有新的分配都加入到了 isr 中后,開始進行 leader 選舉(需要的情況下),下線不需要的副本(OAR-RAR),下線完成后將 Partition 的最新 AR (即 RAR)信息更新到 zk 中,最后發送相應的請求給 broker,到這里一個 Partition 的副本遷移算是完成了。
3 Broker上下線
每台 Broker 在上線時,都會與 ZK 建立一個建立一個 session,並在 /brokers/ids 下注冊一個節點,節點名字就是 broker id,這個節點是臨時節點,該節點內部會有這個 Broker 的詳細節點信息。Controller 會監聽 /brokers/ids 這個路徑下的所有子節點,如果有新的節點出現,那么就代表有新的 Broker 上線,如果有節點消失,就代表有 broker 下線,Controller 會進行相應的處理,Kafka 就是利用 ZK 的這種 watch 機制及臨時節點的特性來完成集群 Broker 的上下線。
3.1 ZK 回調處理
BrokerChangeListener 是監聽 /brokers/ids 節點的監聽器,當該節點有變化時會觸發 doHandleChildChange() 方法,具體實現如下:

這里需要重點關注 doHandleChildChange() 方法的實現,該方法處理邏輯如下:
- l 從 ZK 獲取當前的 Broker 列表(curBrokers)及 broker id 的列表(curBrokerIds);
- l 獲取當前 Controller 中緩存的 broker id 列表(liveOrShuttingDownBrokerIds);
- l 獲取新上線 broker id 列表:newBrokerIds = curBrokerIds – liveOrShuttingDownBrokerIds;
- l 獲取掉線的 broker id 列表:deadBrokerIds = liveOrShuttingDownBrokerIds – curBrokerIds;
- l 對於新上線的 broker,先在 ControllerChannelManager 中添加該 broker(即建立與該 Broker 的連接、初始化相應的發送線程和請求隊列),最后 Controller 調用 onBrokerStartup() 上線該 Broker;
- l 對於掉線的 broker,先在 ControllerChannelManager 中移除該 broker(即關閉與 Broker 的連接、關閉相應的發送線程和清空請求隊列),最后 Controller 調用 onBrokerFailure() 下線該 Broker。
3.2 broker上線
一台 Broker 上線主要有以下兩步:
- l 在 Controller Channel Manager 中添加該 Broker 節點,主要的內容是:Controller 建立與該 Broker 的連接、初始化相應的請求發送線程與請求隊列;
- l 調用 Controller 的 onBrokerStartup() 方法上線該節點。
ontroller Channel Manager 添加 Broker 的實現如下,這里就不重復講述了,前面講述 Controller 服務初始化的文章已經講述過這部分的內容。下面再看下 Controller 如何在 onBrokerStartup() 方法中實現 Broker 上線操作的,具體實現如下所示:

onBrokerStartup() 方法在實現的邏輯上分為以下幾步:
- l 調用
sendUpdateMetadataRequest()方法向當前集群所有存活的 Broker 發送 Update Metadata 請求,這樣的話其他的節點就會知道當前的 Broker 已經上線了; - l 獲取當前節點分配的所有的 Replica 列表,並將其狀態轉移為 OnlineReplica 狀態;
- l 觸發 PartitionStateMachine 的
triggerOnlinePartitionStateChange()方法,為所有處於 NewPartition/OfflinePartition 狀態的 Partition 進行 leader 選舉,如果 leader 選舉成功,那么該 Partition 的狀態就會轉移到 OnlinePartition 狀態,否則狀態轉移失敗; - l 如果副本遷移中有新的 Replica 落在這台新上線的節點上,那么開始執行副本遷移操作;
- l 如果之前由於這個 Topic 設置為刪除標志,但是由於其中有 Replica 掉線而導致無法刪除,這里在節點啟動后,嘗試重新執行刪除操作。
到此為止,一台 Broker 算是真正加入到了 Kafka 的集群中,在上述過程中,涉及到 leader 選舉的操作,都會觸發 LeaderAndIsr 請求及 Metadata 請求的發送。
3.3 broker下線
一台 Broker 掉線后主要有以下兩步:
l 首先在 Controller Channel Manager 中移除該 Broker 節點,主要的內容是:關閉 Controller 與 Broker 的連接和相應的請求發送線程,並清空請求隊列;
l 調用 Controller 的 onBrokerFailure() 方法下線該節點。
Controller Channel Manager 下線 Broker 的處理如下所示:

在 Controller Channel Manager 處理完掉線的 Broker 節點后,下面 KafkaController 將會調用 onBrokerFailure() 進行相應的處理,其實現如下:

Controller 對於掉線 Broker 的處理過程主要有以下幾步:
- l 首先找到 Leader 在該 Broker 上所有 Partition 列表,然后將這些 Partition 的狀態全部轉移為 OfflinePartition 狀態;
- l 觸發 PartitionStateMachine 的 triggerOnlinePartitionStateChange() 方法,為所有處於 NewPartition/OfflinePartition 狀態的 Partition 進行 Leader 選舉,如果 Leader 選舉成功,那么該 Partition 的狀態就會遷移到 OnlinePartition 狀態,否則狀態轉移失敗(Broker 上線/掉線、Controller 初始化時都會觸發這個方法);
- l 獲取在該 Broker 上的所有 Replica 列表,將其狀態轉移成 OfflineReplica 狀態;
- l 過濾出設置為刪除、並且有副本在該節點上的 Topic 列表,先將該 Replica 的轉移成 ReplicaDeletionIneligible 狀態,然后再將該 Topic 標記為非法刪除,即因為有 Replica 掉線導致該 Topic 無法刪除;
- l 如果 leader 在該 Broker 上所有 Partition 列表不為空,證明有 Partition 的 leader 需要選舉,在最后一步會觸發全局 metadata 信息的更新。
到這里,一台掉線的 Broker 算是真正下線完成了。
3.4 主動關閉broker
Controller 在接收這個關閉服務的請求,通過 shutdownBroker() 方法進行處理,實現如下所示:


上述方法的處理邏輯如下:
- l 先將要下線的 Broker 添加到 shuttingDownBrokerIds 集合中,該集合記錄了當前正在進行關閉的 broker 列表;
- l 獲取副本在該節點上的所有 Partition 的列表集合;
- l 遍歷上述 Partition 列表進行處理:如果該 Partition 的 leader 是要下線的節點,那么通過 PartitionStateMachine 進行狀態轉移(OnlinePartition –> OnlinePartition)觸發 leader 選舉,使用的 leader 選舉方法是 ControlledShutdownLeaderSelector,它會選舉 isr 中第一個沒有正在關閉的 Replica 作為 leader,否則拋出 StateChangeFailedException 異常;
- l 否則的話,即要下線的節點不是 leader,那么就向要下線的節點發送 StopReplica 請求停止副本同步,並將該副本設置為 OfflineReplica 狀態,這里對 Replica 進行處理的原因是為了讓要下線的機器關閉副本同步流程,這樣 Kafka 服務才能正常關閉。
參考資料:
http://matt33.com/2018/06/16/partition-reassignment/
http://matt33.com/2018/06/17/broker-online-offline/
