Zookeeper快速領導者選舉原理
本文略長,更適合在電腦端觀看,可以收藏或直接關注微信公眾號:1點25
人類選舉的基本原理
正常情況下,選舉是一定要投票的。
我們應該都經歷過投票,在投票時我們可能會將票投給和我們關系比較好的人,如果你和幾個候選人都比較熟,這種情況下你會將選票投給你認為能力比較強的人,如果你和幾個候選人都不熟,並且你自己也是候選人的話,這時你應該會認為你是這些候選人里面最厲害的那個人,大家都應該選你,這時你就會去和別人交流以獲得別人的投票,但是很有可能在交流的過程中,你發現了比你更厲害的人,這時你如果臉皮不是那么厚的話,你應該會改變你的決定,去投你覺得更厲害的人,最終你將得到在你心中認為最厲害的人,且將票投給他,選票將會放在投票中,最后從投票箱中進行統計,獲得票數最多的人當選。
在這樣一個選舉過程中我們提煉出四個基本概念:
- 個人能力:投我認為能力最強的人,這是投票的基本規則
- 改票:能力最強的人是逐漸和其他人溝通之后的結果,類似改票,先投給A,但是后來發現B更厲害,則改為投B
- 投票箱:所有人公用一個投票箱
- 領導者:獲得投票數最多的人為領導者
Zookeeper選舉的基本原理
Zookeeper集群模式下才需要選舉。
Zookeeper的選舉和人類的選舉邏輯類似,Zookeeper需要實現上面人類選舉的四個基本概念;
- 個人能力:Zookeeper是一個數據庫,集群中節點的數據越新就代表此節點能力越強,而在Zookeeper中可以通事務id(zxid)來表示數據的新舊,一個節點最新的zxid越大則該節點的數據越新。所以Zookeeper選舉時會根據zxid的大小來作為投票的基本規則。
- 改票:Zookeeper集群中的某一個節點在開始進行選舉時,首先認為自己的數據是最新的,會先投自己一票,並且把這張選票發送給其他服務器,這張選票里包含了兩個重要信息:zxid和sid,sid表示這張選票投的服務器id,zxid表示這張選票投的服務器上最大的事務id,同時也會接收到其他服務器的選票,接收到其他服務器的選票后,可以根據選票信息中的zxid來與自己當前所投的服務器上的最大zxid來進行比較,如果其他服務器的選票中的zxid較大,則表示自己當前所投的機器數據沒有接收到的選票所投的服務器上的數據新,所以本節點需要改票,改成投給和剛剛接收到的選票一樣。
- 投票箱:Zookeeper集群中會有很多節點,和人類選舉不一樣,Zookeeper集群並不會單獨去維護一個投票箱應用,而是在每個節點內存里利用一個數組來作為投票箱。每個節點里都有一個投票箱,節點會將自己的選票以及從其他服務器接收到的選票放在這個投票箱中。因為集群節點是相互交互的,並且選票的PK規則是一致的,所以每個節點里的這個投票箱所存儲的選票都會是一樣的,這樣也可以達到公用一個投票箱的目的。
- 領導者:Zookeeper集群中的每個節點,開始進行領導選舉后,會不斷的接收其他節點的選票,然后進行選票PK,將自己的選票修改為投給數據最新的節點,這樣就保證了,每個節點自己的選票代表的都是自己暫時所認為的數據最新的節點,再因為其他服務器的選票都會存儲在投票箱內,所以可以根據投票箱里去統計是否有超過一半的選票和自己選擇的是同一個節點,都認為這個節點的數據最新,一旦整個集群里超過一半的節點都認為某一個節點上的數據最新,則該節點就是領導者。
通過對四個概念的在Zookeeper中的解析,也同時介紹了一下Zookeeper領導者選舉的基本原理,只是說選舉過程中還有更多的細節需要我們了解,下面我結合源碼來給大家詳細的分析一下Zookeeper的快速領導者選舉原理。
領導者選舉入口
ZooKeeperServer表示單機模式中的一個zkServer。
QuoruPeer表示集群模式中的一個zkServer。
QuoruPeer類定義如下:
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
定義表明QuorumPeer是一個ZooKeeperThread,表示是一個線程。
當集群中的某一個台zkServer啟動時QuorumPeer類的start方法將被調用。
public synchronized void start() { loadDataBase(); // 1 cnxnFactory.start(); // 2 startLeaderElection(); // 3 super.start(); // 4 }
- zkServer中有一個內存數據庫對象ZKDatabase, zkServer在啟動時需要將已被持久化的數據加載進內存中,也就是加載至ZKDatabase。
- 這一步會開啟一個線程來接收客戶端請求,但是需要注意,這一步執行完后雖然成功開啟了一個線程,並且也可以接收客戶端線程,但是因為現在zkServer還沒有經過初始化,實際上把請求拒絕掉,知道zkServer初始化完成才能正常的接收請求。
- 這個方法名很有誤導性,這個方法並沒有真正的開始領導選舉,而是進行一些初始化
- 繼續啟動,包括進行領導者選舉、zkServer初始化。
領導者選舉策略
上文QuorumPeer類的startLeaderElection會進行領導者選舉初始化。
首先,領導者選舉在Zookeeper中有3種實現:
其中LeaderElection、AuthFastLeaderElection已經被標為過期,不建議使用,所以現在用的都是快速領導者選舉FastLeaderElection,我們着重來介紹FastLeaderElection。
快速領導者選舉
快速領導者選舉實現架構如下圖:
傳輸層初始化
從架構圖我們可以發現,快速領導者選舉實現架構分為兩層:應用層和傳輸層。所以初始化核心就是初始化傳輸層。
初始化步驟:
- 初始化QuorumCnxManager
- 初始化QuorumCnxManager.Listener
- 運行QuorumCnxManager.Listener
- 運行QuorumCnxManager
- 返回FastLeaderElection對象
QuorumCnxManager介紹
QuorumCnxManager就是傳輸層實現,QuorumCnxManager中幾個重要的屬性:
- ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap
- ConcurrentHashMap<Long, SendWorker> senderWorkerMap
- ArrayBlockingQueue<Message> recvQueue
- QuorumCnxManager.Listener
傳輸層的每個zkServer需要發送選票信息給其他服務器,這些選票信息來至應用層,在傳輸層中將會按服務器id分組保存在queueSendMap中。
傳輸層的每個zkServer需要發送選票信息給其他服務器,SendWorker就是封裝了Socket的發送器,而senderWorkerMap就是用來記錄其他服務器id以及對應的SendWorker的。
傳輸層的每個zkServer將接收其他服務器發送的選票信息,這些選票會保存在recvQueue中,以提供給應用層使用。
QuorumCnxManager.Listener負責開啟socket監聽。
細化后的架構圖如下:
服務器之間連接問題
在集群啟動時,一台服務器需要去連另外一台服務器,從而建立Socket用來進行選票傳輸。那么如果現在A服務器去連B服務器,同時B服務器也去連A服務器,那么就會導致建立了兩條Socket,我們知道Socket是雙向的,Socket的雙方是可以相互發送和接收數據的,那么現在A、B兩台服務器建立兩條Socket是沒有意義的,所以ZooKeeper在實現時做了限制,只允許服務器ID較大者去連服務器ID較小者,小ID服務器去連大ID服務器會被拒絕,偽代碼如下:
if (對方服務器id < 本服務器id) { closeSocket(sock); // 關閉這條socket connectOne(sid); // 由本服務器去連對方服務器 } else { // 繼續建立連接 }
SendWorker、RecvWorker介紹
上文介紹到了SendWorker,它是zkServer用來向其他服務器發送選票信息的。
類結構如下:
class SendWorker extends ZooKeeperThread { Long sid; Socket sock; RecvWorker recvWorker; volatile boolean running = true; DataOutputStream dout; }
它封裝了socket並且是一個線程,實際上SendWorker的底層實現是:SendWorker線程會不停的從queueSendMap中獲取選票信息然后發送到Socket上。
基於同樣的思路,我們還需要一個線程從Socket上獲取數據然后添加到recvQueue中,這就是RecvWorker的功能。
所以架構可以演化為下圖,通過這個架構,選舉應用層直接從recvQueue中獲取選票,或者選票添加到queueSendMap中既可以完成選票發送:
應用層初始化
FastLeaderElection類介紹
FastLeaderElection類是快速領導者選舉實現的核心類,這個類有三個重要的屬性:
- LinkedBlockingQueue<ToSend> sendqueue;
- LinkedBlockingQueue<Notification> recvqueue;
- Messenger messenger;
- Messenger.WorkerSender
- Messenger.WorkerReceiver
服務器在進行領導者選舉時,在發送選票時也會同時接受其他服務器的選票,FastLeaderElection類也提供了和傳輸層類似的實現,將待發送的選票放在sendqueue中,由Messenger.WorkerSender發送到傳輸層queueSendMap中。
同樣,由Messenger.WorkerReceiver負責從傳輸層獲取數據並放入recvqueue中。
這樣在應用層了,只需要將待發送的選票信息添加到sendqueue中即可完成選票信息發送,或者從recvqueue中獲取元素即可得到選票信息。
在構造FastLeaderElection對象時,會對sendqueue、recvqueue隊列進行初始化,並且運行Messenger.WorkerSender與Messenger.WorkerReceiver線程。
此時架構圖如下:
到這里,QuorumPeer類的startLeaderElection方法已經執行完成,完成了傳輸層和應用層的初始化。
快速領導者選舉實現
QuorumPeer類的start方法前三步分析完,接下來我們來看看第四步:
super.start();
QuorumPeer類是一個ZooKeeperThread線程,上述代碼實際就是運行一個線程,相當於運行QuorumPeer類中的run方法,這個方法也是集群模式下Zkserver啟動最核心的方法。
總結一下QuorumPeer類的start方法:
- 加載持久化數據到內存
- 初始化領導者選舉策略
- 初始化快速領導者選舉傳輸層
- 初始化快速領導者選舉應用層
- 開啟主線程
主線程開啟之后,QuorumPeer類的start方法即執行完成,這時回到上層代碼可以看到主線程會被join住:
quorumPeer.start(); // 開啟線程 quorumPeer.join(); // join線程
接下來我們着重來分析一下主線程內的邏輯。
主線程
在主線程里,會有一個主循環(Main loop),主循環偽代碼如下:
while (服務是否正在運行) { switch (當前服務器狀態) { case LOOKING: // 領導者選舉 setCurrentVote(makeLEStrategy().lookForLeader()); break; case OBSERVING: try { // 初始化為觀察者 } catch (Exception e) { LOG.warn("Unexpected exception",e ); } finally { observer.shutdown(); setPeerState(ServerState.LOOKING); } break; case FOLLOWING: try { // 初始化為跟隨者 } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setPeerState(ServerState.LOOKING); } break; case LEADING: try { // 初始化為領導者 } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { leader.shutdown("Forcing shutdown"); setPeerState(ServerState.LOOKING); } break; } }
這個偽代碼實際上非常非常重要,大家細心的多看幾遍。
根據偽代碼可以看到,當服務器狀態為LOOKING時會進行領導者選舉,所以我們着重來看領導者選舉。
lookForLeader
當服務器狀態為LOOKING時會調用FastLeaderElection類的lookForLeader方法,這就是領導者選舉的應用層。
1.初始化一個投票箱
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
2.更新選票,將票投給自己
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
3.發送選票
sendNotifications();
4.不斷獲取其他服務器的投票信息,直到選出Leader
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ // 從recvqueue中獲取接收到的投票信息 Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); if (獲得的投票為空) { // 連接其他服務器 } else { // 處理投票 } }
5.連接其他服務器
因為在這一步之前,都只進行了服務器的初始化,並沒有真正的去與其他服務器建立連接,所以在這里建立連接。
6.處理投票
判斷接收到的投票所對應的服務器的狀態,也就是投此票的服務器的狀態:
switch (n.state) { case LOOKING: // PK選票、過半機制驗證等 break; case OBSERVING: // 觀察者節點不應該發起投票,直接忽略 break; case FOLLOWING: case LEADING: // 如果接收到跟隨者或領導者節點的選票,則可以認為當前集群已經存在Leader了,直接return,退出lookForLeader方法。 }
7. PK選票
if (接收到的投票的選舉周期 > 本服務器當前的選舉周期) { // 修改本服務器的選舉周期為接收到的投票的選舉周期 // 清空本服務器的投票箱(表示選舉周期落后,重新開始投票) // 比較接收到的選票所選擇的服務器與本服務器的數據誰更新,本服務器將選票投給數據較新者 // 發送選票 } else if(接收到的投票的選舉周期 < 本服務器當前的選舉周期){ // 接收到的投票的選舉周期落后了,本服務器直接忽略此投票 } else if(選舉周期一致) { // 比較接收到的選票所選擇的服務器與本服務器當前所選擇的服務器的數據誰更新,本服務器將選票投給數據較新者 // 發送選票 }
8.過半機制驗證
本服務器的選票經過不停的PK會將票投給數據更新的服務器,PK完后,將接收到的選票以及本服務器自己所投的選票放入投票箱中,然后從投票箱中統計出與本服務器當前所投服務器一致的選票數量,判斷該選票數量是否超過集群中所有跟隨者的一半(選票數量 > 跟隨者數量/2),如果滿足這個過半機制就選出了一個准Leader。
9.最終確認
選出准Leader之后,再去獲取其他服務器的選票,如果獲取到的選票所代表的服務器的數據比准Leader更新,則准Leader卸職,繼續選舉。如果沒有准Leader更新,則繼續獲取投票,直到沒有獲取到選票,則選出了最終的Leader。
Leader確定后,其他服務器的角色也確定好了。
領導選舉完成后
上文主線程小節有一段非常重要的偽代碼,這段偽代碼達到了一個非常重要的功能,就是:
ZooKeeper集群在進行領導者選舉的過程中不能對外提供服務
根據偽代碼我們可以發現,只有當集群中服務器的角色確定了之后,while才會進行下一次循環,當進入下一次循環后,就會根據服務器的角色進入到對應的初始化邏輯,初始化完成之后才能對外提供服務。
總結
希望本文對大家理解Zookeeper領導者選舉原理有所幫助,看源碼不易,碼字畫圖更不容易呀...