Kafka partition 副本遷移與broker上下線


Kafka partition 副本遷移與broker上下線

1 前言

Controller 在初始化時,會利用 ZK 的 watch 機制注冊很多不同類型的監聽器,當監聽的事件被觸發時,Controller 就會觸發相應的操作。

Controller 在初始化時,會注冊多種類型的監聽器,主要有以下幾種:

  • l  監聽 /admin/reassign_partitions 節點,用於分區副本遷移的監聽;
  • l  監聽 /isr_change_notification 節點,用於 Partition Isr 變動的監聽,;
  • l  監聽 /admin/preferred_replica_election 節點,用於需要進行 Partition 最優 leader 選舉的監聽;
  • l  監聽 /brokers/topics 節點,用於 Topic 新建的監聽;
  • l  監聽 /brokers/topics/TOPIC_NAME 節點,用於 Topic Partition 擴容的監聽;
  • l  監聽 /admin/delete_topics 節點,用於 Topic 刪除的監聽;
  • l  監聽 /brokers/ids 節點,用於 Broker 上下線的監聽。

2 Partition 副本遷移整體流程

Partition 的副本遷移實際上就是將分區的副本重新分配到不同的代理節點上,如果 zk 中新副本的集合與 Partition 原來的副本集合相同,那么這個副本就不需要重新分配了。

Partition 的副本遷移是通過監聽 zk 的 /admin/reassign_partitions 節點觸發的,Kafka 也向用戶提供相應的腳本工具進行副本遷移,副本遷移的腳本使用方法如下所示:

 

在調用腳本向 zk 提交 Partition 的遷移計划時,遷移計划更新到 zk 前需要進行一步判斷,如果該節點(寫入遷移計划的節點)已經存在,即副本遷移還在進行,那么本次副本遷移計划是無法提交的,實現的邏輯如下所示:

 

 

2.1 ZK PartitionsReassignedListener 副本遷移處理

在 zk 的 /admin/reassign_partitions 節點數據有變化時,就會觸發 PartitionsReassignedListener 的 doHandleDataChange() 方法,實現如下:

 

 

如果 Partition 出現下面的情況,將不會進行副本遷移,直接將 Partition 的遷移計划從 ZK 移除:

  • l  這個 Partition 的 reassignment 之前已經存在, 即正在遷移中;
  • l  這個 Partition 新分配的 replica 與之前的 replicas 相同;
  • l  這個 Partition 所有新分配 replica 都已經 dead;
  • l  這個 Partition 已經被設置了刪除標志。

對於可以進行副本遷移的 Partition 集合,這里將會調用 Kafka Controller 的 initiateReassignReplicasForTopicPartition() 方法對每個 Partition 進行處理。

2.2 副本遷移初始化

進行了前面的判斷后,這個 Partition 滿足了可以遷移的條件,Controller 會首先初始化副本遷移的流程,實現如下所示

 

 

對於副本遷移流程初始化如下:

  • l  通過 watchIsrChangesForReassignedPartition() 方法監控這個 Partition 的 LeaderAndIsr 變化,如果有新的副本數據同步完成,那么 leader 會將其加到 isr 中更新到 zk 中,這時候 Controller 是可以接收到相關的信息通知的;
  • l  將正在遷移的 Partition 添加到 partitionsBeingReassigned 中,它會記錄當前正在遷移的 Partition 列表;
  • l  將要遷移的 Topic 設置為非法刪除刪除狀態,在這個狀態的 Topic 是無法進行刪除的;
  • l  調用 onPartitionReassignment(),進行副本遷移。

在第一步中,會向這個 Partition 注冊一個額外的監聽器,監聽其 LeaderAndIsr 信息變化,如下所示:

 

 

如果該 Partition 的 LeaderAndIsr 信息有變動,那么就會觸發這個 listener 的 doHandleDataChange() 方法:

  • l  首先檢查這個 Partition 是否在還在遷移中,不在的話直接結束流程,因為這個監聽器本來就是為了 Partition 副本遷移而服務的;
  • l  從 zk 獲取最新的 leader 和 isr 信息,如果新分配的副本全部都在 isr 中,那么就再次觸發 controller 的 onPartitionReassignment() 方法,再次調用時實際上已經證明了這個 Partition 的副本遷移已經完成,否則的話就會不進行任何處理,等待新分配的所有副本遷移完成。

2.3 副本遷移

Partition 副本遷移真正實際處理是在 Controller 的 onPartitionReassignment() 方法完成的,在看這個方法之前,先介紹幾個基本的概念(假設一個 Partition 原來的 replica 是 {1、2、3},新分配的副本列表是:{2、3、4}):

  • RAR = Reassigned replicas,即新分配的副本列表,也就是 {2、3、4};
  • OAR = Original list of replicas for partition,即這個 Partition 原來的副本列表,也就是 {1、2、3};
  • AR = current assigned replicas,該 Partition 當前的副本列表,這個會隨着階段的不同而變化;
  • RAR-OAR:需要創建、數據同步的新副本,也就是 {4};
  • OAR-RAR:需要刪除的副本,也就是{1}

這個方法的實現如下所示:

 

 

這個方法整體分為以下12個步驟:

  • l  把 AR = OAR+RAR ({1、2、3、4})更新到 zk 及本地 Controller 緩存中;
  • l  發送 LeaderAndIsr 給 AR 中每一個副本,並且會強制更新 zk 中 leader 的 epoch;
  • l  創建需要新建的副本(【RAR-OAR】,即 {4}),將其狀態設置為 NewReplica;
  • l  等待直到 RAR({2、3、4}) 中的所有副本都在 ISR 中;
  • l  把 RAR({2、3、4}) 中的所有副本設置為 OnReplica 狀態;
  • l  將緩存中 AR 更新為 RAR(重新分配的副本列表,即 {2、3、4});
  • l  如果 leader 不在 RAR 中, 就從 RAR 選擇對應的 leader, 然后發送 LeaderAndIsr 請求;如果不需要,那么只會更新 leader epoch,然后發送 LeaderAndIsr 請求; 在發送 LeaderAndIsr 請求前設置了 AR=RAR, 這將確保了 leader 在 isr 中不會添加任何 【RAR-OAR】中的副本(old replica,即 {1});
  • l  將【OAR-RAR】({1})中的副本設置為 OfflineReplica 狀態,OfflineReplica 狀態的變化,將會從 ISR 中刪除【OAR-RAR】的副本,更新到 zk 中並發送 LeaderAndIsr 請求給 leader,通知 leader isr 變動。之后再發送 StopReplica 請求(delete=false)給【OAR-RAR】中的副本;
  • l  將【OAR-RAR】中的副本設置為 NonExistentReplica 狀態。這將發送 StopReplica 請求(delete=true)給【OAR-RAR】中的副本,這些副本將會從本地上刪除數據;
  • l  在 zk 中更新 AR 為 RAR;
  • l  更新 zk 中路徑 【/admin/reassign_partitions】信息,移除已經成功遷移的 Partition;
  • l  leader 選舉之后,這個 replica 和 isr 信息將會變動,發送 metadata 更新給所有的 broker。

上面的流程簡單來說,就是先創建新的 replica,開始同步數據,等待所有新的分配都加入到了 isr 中后,開始進行 leader 選舉(需要的情況下),下線不需要的副本(OAR-RAR),下線完成后將 Partition 的最新 AR (即 RAR)信息更新到 zk 中,最后發送相應的請求給 broker,到這里一個 Partition 的副本遷移算是完成了。

3 Broker上下線

每台 Broker 在上線時,都會與 ZK 建立一個建立一個 session,並在 /brokers/ids 下注冊一個節點,節點名字就是 broker id,這個節點是臨時節點,該節點內部會有這個 Broker 的詳細節點信息。Controller 會監聽 /brokers/ids 這個路徑下的所有子節點,如果有新的節點出現,那么就代表有新的 Broker 上線,如果有節點消失,就代表有 broker 下線,Controller 會進行相應的處理,Kafka 就是利用 ZK 的這種 watch 機制及臨時節點的特性來完成集群 Broker 的上下線。

3.1 ZK 回調處理

BrokerChangeListener 是監聽 /brokers/ids 節點的監聽器,當該節點有變化時會觸發 doHandleChildChange() 方法,具體實現如下:

 

 

這里需要重點關注 doHandleChildChange() 方法的實現,該方法處理邏輯如下:

  • l  從 ZK 獲取當前的 Broker 列表(curBrokers)及 broker id 的列表(curBrokerIds);
  • l  獲取當前 Controller 中緩存的 broker id 列表(liveOrShuttingDownBrokerIds);
  • l  獲取新上線 broker id 列表:newBrokerIds = curBrokerIds – liveOrShuttingDownBrokerIds;
  • l  獲取掉線的 broker id 列表:deadBrokerIds = liveOrShuttingDownBrokerIds – curBrokerIds;
  • l  對於新上線的 broker,先在 ControllerChannelManager 中添加該 broker(即建立與該 Broker 的連接、初始化相應的發送線程和請求隊列),最后 Controller 調用 onBrokerStartup() 上線該 Broker;
  • l  對於掉線的 broker,先在 ControllerChannelManager 中移除該 broker(即關閉與 Broker 的連接、關閉相應的發送線程和清空請求隊列),最后 Controller 調用 onBrokerFailure() 下線該 Broker。

3.2 broker上線

一台 Broker 上線主要有以下兩步:

  • l  在 Controller Channel Manager 中添加該 Broker 節點,主要的內容是:Controller 建立與該 Broker 的連接、初始化相應的請求發送線程與請求隊列;
  • l  調用 Controller 的 onBrokerStartup() 方法上線該節點。

 

ontroller Channel Manager 添加 Broker 的實現如下,這里就不重復講述了,前面講述 Controller 服務初始化的文章已經講述過這部分的內容。下面再看下 Controller 如何在 onBrokerStartup() 方法中實現 Broker 上線操作的,具體實現如下所示:

 

 

onBrokerStartup() 方法在實現的邏輯上分為以下幾步:

  • l  調用 sendUpdateMetadataRequest() 方法向當前集群所有存活的 Broker 發送 Update Metadata 請求,這樣的話其他的節點就會知道當前的 Broker 已經上線了;
  • l  獲取當前節點分配的所有的 Replica 列表,並將其狀態轉移為 OnlineReplica 狀態;
  • l  觸發 PartitionStateMachine 的 triggerOnlinePartitionStateChange() 方法,為所有處於 NewPartition/OfflinePartition 狀態的 Partition 進行 leader 選舉,如果 leader 選舉成功,那么該 Partition 的狀態就會轉移到 OnlinePartition 狀態,否則狀態轉移失敗;
  • l  如果副本遷移中有新的 Replica 落在這台新上線的節點上,那么開始執行副本遷移操作;
  • l  如果之前由於這個 Topic 設置為刪除標志,但是由於其中有 Replica 掉線而導致無法刪除,這里在節點啟動后,嘗試重新執行刪除操作。

到此為止,一台 Broker 算是真正加入到了 Kafka 的集群中,在上述過程中,涉及到 leader 選舉的操作,都會觸發 LeaderAndIsr 請求及 Metadata 請求的發送。

3.3 broker下線

一台 Broker 掉線后主要有以下兩步:

l  首先在 Controller Channel Manager 中移除該 Broker 節點,主要的內容是:關閉 Controller 與 Broker 的連接和相應的請求發送線程,並清空請求隊列;

l  調用 Controller 的 onBrokerFailure() 方法下線該節點。

Controller Channel Manager 下線 Broker 的處理如下所示:

 

 

在 Controller Channel Manager 處理完掉線的 Broker 節點后,下面 KafkaController 將會調用 onBrokerFailure() 進行相應的處理,其實現如下:

 

 

Controller 對於掉線 Broker 的處理過程主要有以下幾步:

  • l  首先找到 Leader 在該 Broker 上所有 Partition 列表,然后將這些 Partition 的狀態全部轉移為 OfflinePartition 狀態;
  • l  觸發 PartitionStateMachine 的 triggerOnlinePartitionStateChange() 方法,為所有處於 NewPartition/OfflinePartition 狀態的 Partition 進行 Leader 選舉,如果 Leader 選舉成功,那么該 Partition 的狀態就會遷移到 OnlinePartition 狀態,否則狀態轉移失敗(Broker 上線/掉線、Controller 初始化時都會觸發這個方法);
  • l  獲取在該 Broker 上的所有 Replica 列表,將其狀態轉移成 OfflineReplica 狀態;
  • l  過濾出設置為刪除、並且有副本在該節點上的 Topic 列表,先將該 Replica 的轉移成 ReplicaDeletionIneligible 狀態,然后再將該 Topic 標記為非法刪除,即因為有 Replica 掉線導致該 Topic 無法刪除;
  • l  如果 leader 在該 Broker 上所有 Partition 列表不為空,證明有 Partition 的 leader 需要選舉,在最后一步會觸發全局 metadata 信息的更新。

到這里,一台掉線的 Broker 算是真正下線完成了。

3.4 主動關閉broker

Controller 在接收這個關閉服務的請求,通過 shutdownBroker() 方法進行處理,實現如下所示:

 

 

上述方法的處理邏輯如下:

  • l  先將要下線的 Broker 添加到 shuttingDownBrokerIds 集合中,該集合記錄了當前正在進行關閉的 broker 列表;
  • l  獲取副本在該節點上的所有 Partition 的列表集合;
  • l  遍歷上述 Partition 列表進行處理:如果該 Partition 的 leader 是要下線的節點,那么通過 PartitionStateMachine 進行狀態轉移(OnlinePartition –> OnlinePartition)觸發 leader 選舉,使用的 leader 選舉方法是 ControlledShutdownLeaderSelector,它會選舉 isr 中第一個沒有正在關閉的 Replica 作為 leader,否則拋出 StateChangeFailedException 異常;
  • l  否則的話,即要下線的節點不是 leader,那么就向要下線的節點發送 StopReplica 請求停止副本同步,並將該副本設置為 OfflineReplica 狀態,這里對 Replica 進行處理的原因是為了讓要下線的機器關閉副本同步流程,這樣 Kafka 服務才能正常關閉。

 

參考資料:

http://matt33.com/2018/06/16/partition-reassignment/

http://matt33.com/2018/06/17/broker-online-offline/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM