在上一篇文章中我們大致瀏覽了zookeeper的啟動過程,並且提到在Zookeeper的啟動過程中leader選舉是非常重要而且最復雜的一個環節。那么什么是leader選舉呢?zookeeper為什么需要leader選舉呢?zookeeper的leader選舉的過程又是什么樣子的?本文的目的就是解決這三個問題。
首先我們來看看什么是leader選舉。其實這個很好理解,leader選舉就像總統選舉一樣,每人一票,獲得多數票的人就當選為總統了。在zookeeper集群中也是一樣,每個節點都會投票,如果某個節點獲得超過半數以上的節點的投票,則該節點就是leader節點了。
國家選舉總統是為了選一個最高統帥,治理國家。那么zookeeper集群選舉的目的又是什么呢?其實這個要清楚明白的解釋還是挺復雜的。我們可以簡單點想這個問題:我們有一個zookeeper集群,有好幾個節點。每個節點都可以接收請求,處理請求。那么,如果這個時候分別有兩個客戶端向兩個節點發起請求,請求的內容是修改同一個數據。比如客戶端c1,請求節點n1,請求是set a = 1; 而客戶端c2,請求節點n2,請求內容是set a = 2;
那么最后a是等於1還是等於2呢? 這在一個分布式環境里是很難確定的。解決這個問題有很多辦法,而zookeeper的辦法是,我們選一個總統出來,所有的這類決策都提交給總統一個人決策,那之前的問題不就沒有了么。
那我們現在的問題就是怎么來選擇這個總統呢? 在現實中,選擇總統是需要宣講拉選票的,那么在zookeeper的世界里這又如何處理呢?我們還是show code吧。
在QuorumPeer的startLeaderElection方法里包含leader選舉的邏輯。Zookeeper默認提供了4種選舉方式,默認是第4種: FastLeaderElection。
我們先假設我們這是一個嶄新的集群,嶄新的集群的選舉和之前運行過一段時間的選舉是有稍許不同的,后面會提及。
節點狀態: 每個集群中的節點都有一個狀態 LOOKING, FOLLOWING, LEADING, OBSERVING。都屬於這4種,每個節點啟動的時候都是LOOKING狀態,如果這個節點參與選舉但最后不是leader,則狀態是FOLLOWING,如果不參與選舉則是OBSERVING,leader的狀態是LEADING。
開始這個選舉算法前,每個節點都會在zoo.cfg上指定的監聽端口啟動監聽(server.1=127.0.0.1:20881:20882),這里的20882就是這里用於選舉的端口。
在FastLeaderElection里有一個Manager的內部類,這個類里有啟動了兩個線程:WorkerReceiver, WorkerSender。為什么說選舉這部分復雜呢,我覺得就是這些線程就像左右互搏一樣,非常難以理解。顧名思義,這兩個線程一個是處理從別的節點接收消息的,一個是向外發送消息的。對於外面的邏輯接收和發送的邏輯都是異步的。
這里配置好了,QuorumPeer的run方法就開始執行了,這里實現的是一個簡單的狀態機。因為現在是LOOKING狀態,所以進入LOOKING的分支,調用選舉算法開始選舉了:
setCurrentVote(makeLEStrategy().lookForLeader());
而在lookForLeader里主要是干什么呢?首先我們會更新一下一個叫邏輯時鍾的東西,這也是在分布式算法里很重要的一個概念,但是在這里先不介紹,可以參考后面的論文。然后決定我要投票給誰。不過zookeeper這里的選舉真直白,每個節點都選自己(汗),選我,選我,選我...... 然后向其他節點廣播這個選舉信息。這里實際上並沒有真正的發送出去,只是將選舉信息放到由WorkerSender管理的一個隊列里。
synchronized(this){ //邏輯時鍾 logicalclock++; //getInitLastLoggedZxid(), getPeerEpoch()這里先不關心是什么,后面會討論 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } //getInitId() 即是獲取選誰,id就是myid里指定的那個數字,所以說一定要唯一 private long getInitId(){ if(self.getQuorumVerifier().getVotingMembers().containsKey(self.getId())) return self.getId(); else return Long.MIN_VALUE; } //發送選舉信息,異步發送 sendNotifications();
現在我們去看看怎么把投票信息投遞出去。這個邏輯在WorkerSender里,WorkerSender從sendqueue里取出投票,然后交給QuorumCnxManager發送。因為前面發送投票信息的時候是向集群所有節點發送,所以當然也包括自己這個節點,所以QuorumCnxManager的發送邏輯里會判斷,如果這個要發送的投票信息是發送給自己的,則不發送了,直接進入接收隊列。
public void toSend(Long sid, ByteBuffer b) { if (self.getId() == sid) { b.position(0); addToRecvQueue(new Message(b.duplicate(), sid)); } else { //發送給別的節點,判斷之前是不是發送過 if (!queueSendMap.containsKey(sid)) { //這個SEND_CAPACITY的大小是1,所以如果之前已經有一個還在等待發送,則會把之前的一個刪除掉,發送新的 ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY); queueSendMap.put(sid, bq); addToSendQueue(bq, b); } else { ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); if(bq != null){ addToSendQueue(bq, b); } else { LOG.error("No queue for server " + sid); } } //這里是真正的發送邏輯了 connectOne(sid); } }
connectOne就是真正發送了。在發送之前會先把自己的id和選舉地址發送過去。然后判斷要發送節點的id是不是比自己的id大,如果大則不發送了。如果要發送又是啟動兩個線程:SendWorker,RecvWorker(這種一個進程內許多不同種類的線程,各自干活的狀態真的很難理解)。發送邏輯還算簡單,就是從剛才放到那個queueSendMap里取出,然后發送。並且發送的時候將發送出去的東西放到一個lastMessageSent的map里,如果queueSendMap里是空的,就發送lastMessageSent里的東西,確保對方一定收到了。
看完了SendWorker的邏輯,再來看看數據接收的邏輯吧。還記得前面提到的有個Listener在選舉端口上啟動了監聽么,現在這里應該接收到數據了。我們可以看到receiveConnection方法。在這里,如果接收到的的信息里的id比自身的id小,則斷開連接,並嘗試發送消息給這個id對應的節點(當然,如果已經有SendWorker在往這個節點發送數據,則不用了)。
如果接收到的消息的id比當前的大,則會有RecvWorker接收數據,RecvWorker會將接收到的數據放到recvQueue里。
而FastLeaderElection的WorkerReceiver線程里會不斷地從這個recvQueue里讀取Message處理。在WorkerReceiver會處理一些協議上的事情,比如消息格式等。除此之外還會看看接收到的消息是不是來自投票成員。如果是投票成員,則會看看這個消息里的狀態,如果是LOOKING狀態並且當前的邏輯時鍾比投票消息里的邏輯時鍾要高,則會發個通知過去,告訴誰是leader。在這里,剛剛啟動的嶄新集群,所以邏輯時鍾基本上都是相同的,所以這里還沒判斷出誰是leader。不過在這里我們注意到如果當前節點的狀態是LOOKING的話,接收邏輯會將接收到的消息放到FastLeaderElection的recvqueue里。而在FastLeaderElection會從這個recvqueue里讀取東西。
這里就是選舉的主要邏輯了:totalOrderPredicate
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
1. 判斷消息里的epoch是不是比當前的大,如果大則消息里id對應的server我就承認它是leader
2. 如果epoch相等則判斷zxid,如果消息里的zxid比我的大我就承認它是leader
3. 如果前面兩個都相等那就比較一下server id吧,如果比我的大我就承認它是leader。
關於前面兩個東西暫時我們不去關心它,對於新啟動的集群這兩者都是相等的。
那這樣看來server id的大小也是leader選舉的一環啊(有的人生下來注定就不平凡,這都是命啊)。
最后我們來看看,很多文章所介紹的,如果超過一半的人說它是leader,那它就是leader的邏輯吧
private boolean termPredicate( HashMap<Long, Vote> votes, Vote vote) { HashSet<Long> set = new HashSet<Long>(); //遍歷已經收到的投票集合,將等於當前投票的集合取出放到set中 for (Map.Entry<Long,Vote> entry : votes.entrySet()) { if (self.getQuorumVerifier().getVotingMembers().containsKey(entry.getKey()) && vote.equals(entry.getValue())){ set.add(entry.getKey()); } } //統計set,也就是投某個id的票數是否超過一半 return self.getQuorumVerifier().containsQuorum(set); } public boolean containsQuorum(Set<Long> ackSet) { return (ackSet.size() > half); }
最后一關:如果選的是自己,則將自己的狀態更新為LEADING,否則根據type,要么是FOLLOWING,要么是OBSERVING。
到這里選舉就結束了。
這里介紹的是一個新集群啟動時候的選舉過程,啟動的時候就是根據zoo.cfg里的配置,向各個節點廣播投票,一般都是選投自己。然后收到投票后就會進行進行判斷。如果某個節點收到的投票數超過一半,那么它就是leader了。
了解了這個過程,我們來看看另外一個問題:
一個集群有3台機器,掛了一台后的影響是什么?掛了兩台呢?
掛了一台:掛了一台后就是收不到其中一台的投票,但是有兩台可以參與投票,按照上面的邏輯,它們開始都投給自己,后來按照選舉的原則,兩個人都投票給其中一個,那么就有一個節點獲得的票等於2,2 > (3/2)=1 的,超過了半數,這個時候是能選出leader的。
掛了兩台: 掛了兩台后,怎么弄也只能獲得一張票, 1 不大於 (3/2)=1的,這樣就無法選出一個leader了。
在前面介紹時,為了簡單我假設的是這是一個嶄新的剛啟動的集群,這樣的集群與工作一段時間后的集群有什么不同呢?不同的就是epoch和zxid這兩個參數。在新啟動的集群里這兩個一般是相等的,而工作一段時間后這兩個參數有可能有的節點落后其他節點,至於是為什么,這個還要在后面的存儲和處理額胡斷請求的文章里介紹。
* 關於邏輯時鍾,我們的分布式大牛Leslie Lamport曾寫過一篇論文:Time, Clocks, and the Ordering of Events in a Distributed System