kafka leader選舉機制原理


kafka在所有broker中選出一個controller,所有Partition的Leader選舉都由controller決定。controller會將Leader的改變直接通過RPC的方式(比Zookeeper Queue的方式更高效)通知需為此作出響應的Broker。同時controller也負責增刪Topic以及Replica的重新分配。

當有broker fari over controller的處理過程如下:

1.Controller在Zookeeper注冊Watch,一旦有Broker宕機(這是用宕機代表任何讓系統認為其die的情景,包括但不限於機器斷電,網絡不可用,GC導致的Stop The World,進程crash等),其在Zookeeper對應的znode會自動被刪除,Zookeeper會fire Controller注冊的watch,Controller讀取最新的幸存的Broker
2.Controller決定set_p,該集合包含了宕機的所有Broker上的所有Partition
3.對set_p中的每一個Partition
3.1 從/brokers/topics/[topic]/partitions/[partition]/state讀取該Partition當前的ISR
3.2 決定該Partition的新Leader。如果當前ISR中有至少一個Replica還幸存,則選擇其中一個作為新Leader,新的ISR則包含當前ISR中所有幸存的Replica (選舉算法的實現類似於微軟的PacificA)。否則選擇該Partition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數據丟失)。如果該Partition的所有Replica都宕機了,則將新的Leader設置為-1。
3.3 將新的Leader,ISR和新的leader_epoch及controller_epoch寫入/brokers/topics/[topic]/partitions/[partition]/state。注意,該操作只有其version在3.1至3.3的過程中無變化時才會執行,否則跳轉到3.1
4. 直接通過RPC向set_p相關的Broker發送LeaderAndISRRequest命令。Controller可以在一個RPC操作中發送多個命令從而提高效率。

 

 

 

 

 

LeaderAndIsrRequest響應過程

 

1.若請求中controllerEpoch小於當前最新的controllerEpoch,則直接返回ErrorMapping.StaleControllerEpochCode。2.對於請求中partitionStateInfos中的每一個元素,即((topic, partitionId), partitionStateInfo):
2.1 若partitionStateInfo中的leader epoch大於當前ReplicManager中存儲的(topic, partitionId)對應的partition的leader epoch,則:
2.1.1 若當前brokerid(或者說replica id)在partitionStateInfo中,則將該partition及partitionStateInfo存入一個名為partitionState的HashMap中
2.1.2否則說明該Broker不在該Partition分配的Replica list中,將該信息記錄於log中2.2否則將相應的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中
3.篩選出partitionState中Leader與當前Broker ID相等的所有記錄存入partitionsTobeLeader中,其它記錄存入partitionsToBeFollower中。
4.若partitionsTobeLeader不為空,則對其執行makeLeaders方。
5.若partitionsToBeFollower不為空,則對其執行makeFollowers方法
6.若highwatermak線程還未啟動,則將其啟動,並將hwThreadInitialized設為true。
7.關閉所有Idle狀態的Fetcher。
LeaderAndIsrRequest處理過程如下圖所示
  對於收到的LeaderAndIsrRequest,Broker主要通過ReplicaManager的becomeLeaderOrFollower處理,流程如下:

 

 

 

 

如何處理所有Replica都不工作

 

  上文提到,在ISR中至少有一個follower時,Kafka可以確保已經commit的數據不丟失,但如果某個Partition的所有Replica都宕機了,就無法保證數據不丟失了。這種情況下有兩種可行的方案:
1.等待ISR中的任一個Replica“活”過來,並且選它作為Leader
2.選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader
  這就需要在可用性和一致性當中作出一個簡單的折衷。如果一定要等待ISR中的Replica“活”過來,那不可用的時間就可能會相對較長。而且如果ISR中的所有Replica都無法“活”過來了,或者數據都丟失了,這個Partition將永遠不可用。選擇第一個“活”過來的Replica作為Leader,而這個Replica不是ISR中的Replica,那即使它並不保證已經包含了所有已commit的消息,它也會成為Leader而作為consumer的數據源(前文有說明,所有讀寫都由Leader完成)。Kafka0.8.*使用了第二種方式。根據Kafka的文檔,在以后的版本中,Kafka支持用戶通過配置選擇這兩種方式中的一種,從而根據不同的使用場景選擇高可用性還是強一致性。 unclean.leader.election.enable 參數決定使用哪種方案,默認是true,采用第二種方案 
參考:
1. http://www.jasongj.com/2015/04/24/KafkaColumn2/
2. http://www.jasongj.com/2015/06/08/KafkaColumn3/
3. http://orchome.com/22
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM