一、前言
前面學習了Leader選舉的總體框架,接着來學習Zookeeper中默認的選舉策略,FastLeaderElection。
二、FastLeaderElection源碼分析
2.1 類的繼承關系
public class FastLeaderElection implements Election {}
說明:FastLeaderElection實現了Election接口,其需要實現接口中定義的lookForLeader方法和shutdown方法,其是標准的Fast Paxos算法的實現,各服務器之間基於TCP協議進行選舉。
2.2 類的內部類
FastLeaderElection有三個較為重要的內部類,分別為Notification、ToSend、Messenger。
1. Notification類

static public class Notification { /* * Format version, introduced in 3.4.6 */ public final static int CURRENTVERSION = 0x1; int version; /* * Proposed leader */ // 被推選的leader的id long leader; /* * zxid of the proposed leader */ // 被推選的leader的事務id long zxid; /* * Epoch */ // 推選者的選舉周期 long electionEpoch; /* * current state of sender */ // 推選者的狀態 QuorumPeer.ServerState state; /* * Address of sender */ // 推選者的id long sid; /* * epoch of the proposed leader */ // 被推選者的選舉周期 long peerEpoch; @Override public String toString() { return new String(Long.toHexString(version) + " (message format version), " + leader + " (n.leader), 0x" + Long.toHexString(zxid) + " (n.zxid), 0x" + Long.toHexString(electionEpoch) + " (n.round), " + state + " (n.state), " + sid + " (n.sid), 0x" + Long.toHexString(peerEpoch) + " (n.peerEpoch) "); } } static ByteBuffer buildMsg(int state, long leader, long zxid, long electionEpoch, long epoch) { byte requestBytes[] = new byte[40]; ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); /* * Building notification packet to send */ requestBuffer.clear(); requestBuffer.putInt(state); requestBuffer.putLong(leader); requestBuffer.putLong(zxid); requestBuffer.putLong(electionEpoch); requestBuffer.putLong(epoch); requestBuffer.putInt(Notification.CURRENTVERSION); return requestBuffer; }
說明:Notification表示收到的選舉投票信息(其他服務器發來的選舉投票信息),其包含了被選舉者的id、zxid、選舉周期等信息,其buildMsg方法將選舉信息封裝至ByteBuffer中再進行發送。
2. ToSend類

static public class ToSend { static enum mType {crequest, challenge, notification, ack} ToSend(mType type, long leader, long zxid, long electionEpoch, ServerState state, long sid, long peerEpoch) { this.leader = leader; this.zxid = zxid; this.electionEpoch = electionEpoch; this.state = state; this.sid = sid; this.peerEpoch = peerEpoch; } /* * Proposed leader in the case of notification */ //被推舉的leader的id long leader; /* * id contains the tag for acks, and zxid for notifications */ // 被推舉的leader的最大事務id long zxid; /* * Epoch */ // 推舉者的選舉周期 long electionEpoch; /* * Current state; */ // 推舉者的狀態 QuorumPeer.ServerState state; /* * Address of recipient */ // 推舉者的id long sid; /* * Leader epoch */ // 被推舉的leader的選舉周期 long peerEpoch; }
說明:ToSend表示發送給其他服務器的選舉投票信息,也包含了被選舉者的id、zxid、選舉周期等信息。
3. Messenger類
3.1 類的內部類
Messenger包含了WorkerReceiver和WorkerSender兩個內部類
1. WorkerReceiver

class WorkerReceiver implements Runnable { // 是否終止 volatile boolean stop; // 服務器之間的連接 QuorumCnxManager manager; WorkerReceiver(QuorumCnxManager manager) { this.stop = false; this.manager = manager; } public void run() { // 響應 Message response; while (!stop) { // 不終止 // Sleeps on receive try{ // 從recvQueue中取出一個選舉投票消息(從其他服務器發送過來) response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); // 無投票,跳過 if(response == null) continue; /* * If it is from an observer, respond right away. * Note that the following predicate assumes that * if a server is not a follower, then it must be * an observer. If we ever have any other type of * learner in the future, we'll have to change the * way we check for observers. */ if(!self.getVotingView().containsKey(response.sid)){ // 當前的投票者集合不包含服務器 // 獲取自己的投票 Vote current = self.getCurrentVote(); // 構造ToSend消息 ToSend notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), logicalclock, self.getPeerState(), response.sid, current.getPeerEpoch()); // 放入sendqueue隊列,等待發送 sendqueue.offer(notmsg); } else { // 包含服務器,表示接收到該服務器的選票消息 // Receive new message if (LOG.isDebugEnabled()) { LOG.debug("Receive new notification message. My id = " + self.getId()); } /* * We check for 28 bytes for backward compatibility */ // 檢查向后兼容性 if (response.buffer.capacity() < 28) { LOG.error("Got a short response: " + response.buffer.capacity()); continue; } // 若容量為28,則表示可向后兼容 boolean backCompatibility = (response.buffer.capacity() == 28); // 設置buffer中的position、limit等屬性 response.buffer.clear(); // Instantiate Notification and set its attributes // 創建接收通知 Notification n = new Notification(); // State of peer that sent this message // 推選者的狀態 QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING; switch (response.buffer.getInt()) { // 讀取狀態 case 0: ackstate = QuorumPeer.ServerState.LOOKING; break; case 1: ackstate = QuorumPeer.ServerState.FOLLOWING; break; case 2: ackstate = QuorumPeer.ServerState.LEADING; break; case 3: ackstate = QuorumPeer.ServerState.OBSERVING; break; default: continue; } // 獲取leader的id n.leader = response.buffer.getLong(); // 獲取zxid n.zxid = response.buffer.getLong(); // 獲取選舉周期 n.electionEpoch = response.buffer.getLong(); n.state = ackstate; // 設置服務器的id n.sid = response.sid; if(!backCompatibility){ // 不向后兼容 n.peerEpoch = response.buffer.getLong(); } else { // 向后兼容 if(LOG.isInfoEnabled()){ LOG.info("Backward compatibility mode, server id=" + n.sid); } // 獲取選舉周期 n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid); } /* * Version added in 3.4.6 */ // 確定版本號 n.version = (response.buffer.remaining() >= 4) ? response.buffer.getInt() : 0x0; /* * Print notification info */ if(LOG.isInfoEnabled()){ printNotification(n); } /* * If this server is looking, then send proposed leader */ if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ // 本服務器為LOOKING狀態 // 將消息放入recvqueue中 recvqueue.offer(n); /* * Send a notification back if the peer that sent this * message is also looking and its logical clock is * lagging behind. */ if((ackstate == QuorumPeer.ServerState.LOOKING) // 推選者服務器為LOOKING狀態 && (n.electionEpoch < logicalclock)){ // 選舉周期小於邏輯時鍾 // 創建新的投票 Vote v = getVote(); // 構造新的發送消息(本服務器自己的投票) ToSend notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), logicalclock, self.getPeerState(), response.sid, v.getPeerEpoch()); // 將發送消息放置於隊列,等待發送 sendqueue.offer(notmsg); } } else { // 推選服務器狀態不為LOOKING /* * If this server is not looking, but the one that sent the ack * is looking, then send back what it believes to be the leader. */ // 獲取當前投票 Vote current = self.getCurrentVote(); if(ackstate == QuorumPeer.ServerState.LOOKING){ // 為LOOKING狀態 if(LOG.isDebugEnabled()){ LOG.debug("Sending new notification. My id = " + self.getId() + " recipient=" + response.sid + " zxid=0x" + Long.toHexString(current.getZxid()) + " leader=" + current.getId()); } ToSend notmsg; if(n.version > 0x0) { // 版本號大於0 // 構造ToSend消息 notmsg = new ToSend( ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), self.getPeerState(), response.sid, current.getPeerEpoch()); } else { // 版本號不大於0 // 構造ToSend消息 Vote bcVote = self.getBCVote(); notmsg = new ToSend( ToSend.mType.notification, bcVote.getId(), bcVote.getZxid(), bcVote.getElectionEpoch(), self.getPeerState(), response.sid, bcVote.getPeerEpoch()); } // 將發送消息放置於隊列,等待發送 sendqueue.offer(notmsg); } } } } catch (InterruptedException e) { System.out.println("Interrupted Exception while waiting for new message" + e.toString()); } } LOG.info("WorkerReceiver is down"); } }
說明:WorkerReceiver實現了Runnable接口,是選票接收器。其會不斷地從QuorumCnxManager中獲取其他服務器發來的選舉消息,並將其轉換成一個選票,然后保存到recvqueue中,在選票接收過程中,如果發現該外部選票的選舉輪次小於當前服務器的,那么忽略該外部投票,同時立即發送自己的內部投票。其是將QuorumCnxManager的Message轉化為FastLeaderElection的Notification。
其中,WorkerReceiver的主要邏輯在run方法中,其首先會從QuorumCnxManager中的recvQueue隊列中取出其他服務器發來的選舉消息,消息封裝在Message數據結構中。然后判斷消息中的服務器id是否包含在可以投票的服務器集合中,若不是,則會將本服務器的內部投票發送給該服務器,其流程如下
if(!self.getVotingView().containsKey(response.sid)){ // 當前的投票者集合不包含服務器 // 獲取自己的投票 Vote current = self.getCurrentVote(); // 構造ToSend消息 ToSend notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), logicalclock, self.getPeerState(), response.sid, current.getPeerEpoch()); // 放入sendqueue隊列,等待發送 sendqueue.offer(notmsg); }
若包含該服務器,則根據消息(Message)解析出投票服務器的投票信息並將其封裝為Notification,然后判斷當前服務器是否為LOOKING,若為LOOKING,則直接將Notification放入FastLeaderElection的recvqueue(區別於recvQueue)中。然后判斷投票服務器是否為LOOKING狀態,並且其選舉周期小於當前服務器的邏輯時鍾,則將本(當前)服務器的內部投票發送給該服務器,否則,直接忽略掉該投票。其流程如下
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ // 本服務器為LOOKING狀態 // 將消息放入recvqueue中 recvqueue.offer(n); /* * Send a notification back if the peer that sent this * message is also looking and its logical clock is * lagging behind. */ if((ackstate == QuorumPeer.ServerState.LOOKING) // 推選者服務器為LOOKING狀態 && (n.electionEpoch < logicalclock)){ // 選舉周期小於邏輯時鍾 // 創建新的投票 Vote v = getVote(); // 構造新的發送消息(本服務器自己的投票) ToSend notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), logicalclock, self.getPeerState(), response.sid, v.getPeerEpoch()); // 將發送消息放置於隊列,等待發送 sendqueue.offer(notmsg); } }
若本服務器的狀態不為LOOKING,則會根據投票服務器中解析的version信息來構造ToSend消息,放入sendqueue,等待發送,起流程如下
else { // 本服務器狀態不為LOOKING /* * If this server is not looking, but the one that sent the ack * is looking, then send back what it believes to be the leader. */ // 獲取當前投票 Vote current = self.getCurrentVote(); if(ackstate == QuorumPeer.ServerState.LOOKING){ // 為LOOKING狀態 if(LOG.isDebugEnabled()){ LOG.debug("Sending new notification. My id = " + self.getId() + " recipient=" + response.sid + " zxid=0x" + Long.toHexString(current.getZxid()) + " leader=" + current.getId()); } ToSend notmsg; if(n.version > 0x0) { // 版本號大於0 // 構造ToSend消息 notmsg = new ToSend( ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), self.getPeerState(), response.sid, current.getPeerEpoch()); } else { // 版本號不大於0 // 構造ToSend消息 Vote bcVote = self.getBCVote(); notmsg = new ToSend( ToSend.mType.notification, bcVote.getId(), bcVote.getZxid(), bcVote.getElectionEpoch(), self.getPeerState(), response.sid, bcVote.getPeerEpoch()); } // 將發送消息放置於隊列,等待發送 sendqueue.offer(notmsg); } }
2. WorkerSender

class WorkerSender implements Runnable { // 是否終止 volatile boolean stop; // 服務器之間的連接 QuorumCnxManager manager; // 構造器 WorkerSender(QuorumCnxManager manager){ // 初始化屬性 this.stop = false; this.manager = manager; } public void run() { while (!stop) { // 不終止 try { // 從sendqueue中取出ToSend消息 ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); // 若為空,則跳過 if(m == null) continue; // 不為空,則進行處理 process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } /** * Called by run() once there is a new message to send. * * @param m message to send */ void process(ToSend m) { // 構建消息 ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch); // 發送消息 manager.toSend(m.sid, requestBuffer); } }
說明:WorkerSender也實現了Runnable接口,為選票發送器,其會不斷地從sendqueue中獲取待發送的選票,並將其傳遞到底層QuorumCnxManager中,其過程是將FastLeaderElection的ToSend轉化為QuorumCnxManager的Message。
3.2 類的屬性
protected class Messenger { // 選票發送器 WorkerSender ws; // 選票接收器 WorkerReceiver wr; }
說明:Messenger中維護了一個WorkerSender和WorkerReceiver,分別表示選票發送器和選票接收器。
3.3 類的構造函數
Messenger(QuorumCnxManager manager) { // 創建WorkerSender this.ws = new WorkerSender(manager); // 新創建線程 Thread t = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); // 設置為守護線程 t.setDaemon(true); // 啟動 t.start(); // 創建WorkerReceiver this.wr = new WorkerReceiver(manager); // 創建線程 t = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); // 設置為守護線程 t.setDaemon(true); // 啟動 t.start(); }
說明:會啟動WorkerSender和WorkerReceiver,並設置為守護線程。
2.3 類的屬性

public class FastLeaderElection implements Election { // 日志 private static final Logger LOG = LoggerFactory.getLogger(FastLeaderElection.class); /** * Determine how much time a process has to wait * once it believes that it has reached the end of * leader election. */ // 完成Leader選舉之后需要等待時長 final static int finalizeWait = 200; /** * Upper bound on the amount of time between two consecutive * notification checks. This impacts the amount of time to get * the system up again after long partitions. Currently 60 seconds. */ // 兩個連續通知檢查之間的最大時長 final static int maxNotificationInterval = 60000; /** * Connection manager. Fast leader election uses TCP for * communication between peers, and QuorumCnxManager manages * such connections. */ // 管理服務器之間的連接 QuorumCnxManager manager; // 選票發送隊列,用於保存待發送的選票 LinkedBlockingQueue<ToSend> sendqueue; // 選票接收隊列,用於保存接收到的外部投票 LinkedBlockingQueue<Notification> recvqueue; // 投票者 QuorumPeer self; Messenger messenger; // 邏輯時鍾 volatile long logicalclock; /* Election instance */ // 推選的leader的id long proposedLeader; // 推選的leader的zxid long proposedZxid; // 推選的leader的選舉周期 long proposedEpoch; // 是否停止選舉 volatile boolean stop; }
說明:其維護了服務器之間的連接(用於發送消息)、發送消息隊列、接收消息隊列、推選者的一些信息(zxid、id)、是否停止選舉流程標識等。
2.4 類的構造函數
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){ // 字段賦值 this.stop = false; this.manager = manager; // 初始化其他信息 starter(self, manager); }
說明:構造函數中初始化了stop字段和manager字段,並且調用了starter函數,其源碼如下
private void starter(QuorumPeer self, QuorumCnxManager manager) { // 賦值,對Leader和投票者的ID進行初始化操作 this.self = self; proposedLeader = -1; proposedZxid = -1; // 初始化發送隊列 sendqueue = new LinkedBlockingQueue<ToSend>(); // 初始化接收隊列 recvqueue = new LinkedBlockingQueue<Notification>(); // 創建Messenger,會啟動接收器和發送器線程 this.messenger = new Messenger(manager); }
說明:其完成在構造函數中未完成的部分,如會初始化FastLeaderElection的sendqueue和recvqueue,並且啟動接收器和發送器線程。
2.5 核心函數分析
1. sendNotifications函數
private void sendNotifications() { for (QuorumServer server : self.getVotingView().values()) { // 遍歷投票參與者集合 long sid = server.id; // 構造發送消息 ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock, QuorumPeer.ServerState.LOOKING, sid, proposedEpoch); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } // 將發送消息放置於隊列 sendqueue.offer(notmsg); } }
說明:其會遍歷所有的參與者投票集合,然后將自己的選票信息發送至上述所有的投票者集合,其並非同步發送,而是將ToSend消息放置於sendqueue中,之后由WorkerSender進行發送。
2. totalOrderPredicate函數
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); if(self.getQuorumVerifier().getWeight(newId) == 0){ // 使用計票器判斷當前服務器的權重是否為0 return false; } /* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ // 1. 判斷消息里的epoch是不是比當前的大,如果大則消息中id對應的服務器就是leader // 2. 如果epoch相等則判斷zxid,如果消息里的zxid大,則消息中id對應的服務器就是leader // 3. 如果前面兩個都相等那就比較服務器id,如果大,則其就是leader return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
說明:該函數將接收的投票與自身投票進行PK,查看是否消息中包含的服務器id是否更優,其按照epoch、zxid、id的優先級進行PK。
3. termPredicate函數
protected boolean termPredicate( HashMap<Long, Vote> votes, Vote vote) { HashSet<Long> set = new HashSet<Long>(); /* * First make the views consistent. Sometimes peers will have * different zxids for a server depending on timing. */ for (Map.Entry<Long,Vote> entry : votes.entrySet()) { // 遍歷已經接收的投票集合 if (vote.equals(entry.getValue())){ // 將等於當前投票的項放入set set.add(entry.getKey()); } } //統計set,查看投某個id的票數是否超過一半 return self.getQuorumVerifier().containsQuorum(set); }
說明:該函數用於判斷Leader選舉是否結束,即是否有一半以上的服務器選出了相同的Leader,其過程是將收到的選票與當前選票進行對比,選票相同的放入同一個集合,之后判斷選票相同的集合是否超過了半數。
4. checkLeader函數
protected boolean checkLeader( HashMap<Long, Vote> votes, long leader, long electionEpoch){ boolean predicate = true; /* * If everyone else thinks I'm the leader, I must be the leader. * The other two checks are just for the case in which I'm not the * leader. If I'm not the leader and I haven't received a message * from leader stating that it is leading, then predicate is false. */ if(leader != self.getId()){ // 自己不為leader if(votes.get(leader) == null) predicate = false; // 還未選出leader else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false; // 選出的leader還未給出ack信號,其他服務器還不知道leader } else if(logicalclock != electionEpoch) { // 邏輯時鍾不等於選舉周期 predicate = false; } return predicate; }
說明:該函數檢查是否已經完成了Leader的選舉,此時Leader的狀態應該是LEADING狀態。
5. lookForLeader函數

public Vote lookForLeader() throws InterruptedException { try { self.jmxLeaderElectionBean = new LeaderElectionBean(); MBeanRegistry.getInstance().register( self.jmxLeaderElectionBean, self.jmxLocalPeerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); self.jmxLeaderElectionBean = null; } if (self.start_fle == 0) { self.start_fle = System.currentTimeMillis(); } try { HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = finalizeWait; synchronized(this){ // 更新邏輯時鍾,每進行一輪選舉,都需要更新邏輯時鍾 logicalclock++; // 更新選票 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); // 想其他服務器發送自己的選票 sendNotifications(); /* * Loop in which we exchange notifications until we find a leader */ while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ // 本服務器狀態為LOOKING並且還未選出leader /* * Remove next notification from queue, times out after 2 times * the termination time */ // 從recvqueue接收隊列中取出投票 Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */ if(n == null){ // 如果沒有收到足夠多的選票,則發送選票 if(manager.haveDelivered()){ // manager已經發送了所有選票消息 // 向所有其他服務器發送消息 sendNotifications(); } else { // 還未發送所有消息 // 連接其他每個服務器 manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } else if(self.getVotingView().containsKey(n.sid)) { // 投票者集合中包含接收到消息中的服務器id /* * Only proceed if the vote comes from a replica in the * voting view. */ switch (n.state) { // 確定接收消息中的服務器狀態 case LOOKING: // If notification > current, replace and send messages out if (n.electionEpoch > logicalclock) { // 其選舉周期大於邏輯時鍾 // 重新賦值邏輯時鍾 logicalclock = n.electionEpoch; recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { // 選出較優的服務器 // 更新選票 updateProposal(n.leader, n.zxid, n.peerEpoch); } else { // 無法選出較優的服務器 // 更新選票 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } // 發送消息 sendNotifications(); } else if (n.electionEpoch < logicalclock) { // 選舉周期小於邏輯時鍾,不做處理 if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock)); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { // 等於,並且能選出較優的服務器 // 更新選票 updateProposal(n.leader, n.zxid, n.peerEpoch); // 發送消息 sendNotifications(); } if(LOG.isDebugEnabled()){ LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } // recvset用於記錄當前服務器在本輪次的Leader選舉中收到的所有外部投票 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) { // 若能選出leader // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ // 遍歷已經接收的投票集合 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ // 能夠選出較優的服務器 recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break; case FOLLOWING: case LEADING: // 處於LEADING狀態 /* * Consider all notifications from the same epoch * together. */ if(n.electionEpoch == logicalclock){ // 與邏輯時鍾相等 // 將該服務器和選票信息放入recvset中 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(ooePredicate(recvset, outofelection, n)) { // 判斷是否完成了leader選舉 // 設置本服務器的狀態 self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); // 創建投票信息 Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } /* * Before joining an established ensemble, verify * a majority is following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if(ooePredicate(outofelection, outofelection, n)) { synchronized(this){ logicalclock = n.electionEpoch; self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid); break; } } else { LOG.warn("Ignoring notification from non-cluster member " + n.sid); } } return null; } finally { try { if(self.jmxLeaderElectionBean != null){ MBeanRegistry.getInstance().unregister( self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; } }
說明:該函數用於開始新一輪的Leader選舉,其首先會將邏輯時鍾自增,然后更新本服務器的選票信息(初始化選票),之后將選票信息放入sendqueue等待發送給其他服務器,其流程如下
synchronized(this){ // 更新邏輯時鍾,每進行一輪新的leader選舉,都需要更新邏輯時鍾 logicalclock++; // 更新選票(初始化選票) updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); // 向其他服務器發送自己的選票(已更新的選票) sendNotifications();
之后每台服務器會不斷地從recvqueue隊列中獲取外部選票。如果服務器發現無法獲取到任何外部投票,就立即確認自己是否和集群中其他服務器保持着有效的連接,如果沒有連接,則馬上建立連接,如果已經建立了連接,則再次發送自己當前的內部投票,其流程如下
// 從recvqueue接收隊列中取出投票 Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */ if(n == null){ // 無法獲取選票 if(manager.haveDelivered()){ // manager已經發送了所有選票消息(表示有連接) // 向所有其他服務器發送消息 sendNotifications(); } else { // 還未發送所有消息(表示無連接) // 連接其他每個服務器 manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); }
在發送完初始化選票之后,接着開始處理外部投票。在處理外部投票時,會根據選舉輪次來進行不同的處理。
· 外部投票的選舉輪次大於內部投票。若服務器自身的選舉輪次落后於該外部投票對應服務器的選舉輪次,那么就會立即更新自己的選舉輪次(logicalclock),並且清空所有已經收到的投票,然后使用初始化的投票來進行PK以確定是否變更內部投票。最終再將內部投票發送出去。
· 外部投票的選舉輪次小於內部投票。若服務器接收的外選票的選舉輪次落后於自身的選舉輪次,那么Zookeeper就會直接忽略該外部投票,不做任何處理。
· 外部投票的選舉輪次等於內部投票。此時可以開始進行選票PK,如果消息中的選票更優,則需要更新本服務器內部選票,再發送給其他服務器。
之后再對選票進行歸檔操作,無論是否變更了投票,都會將剛剛收到的那份外部投票放入選票集合recvset中進行歸檔,其中recvset用於記錄當前服務器在本輪次的Leader選舉中收到的所有外部投票,然后開始統計投票,統計投票是為了統計集群中是否已經有過半的服務器認可了當前的內部投票,如果確定已經有過半服務器認可了該投票,然后再進行最后一次確認,判斷是否又有更優的選票產生,若無,則終止投票,然后最終的選票,其流程如下
if (n.electionEpoch > logicalclock) { // 其選舉周期大於邏輯時鍾 // 重新賦值邏輯時鍾 logicalclock = n.electionEpoch; // 清空所有接收到的所有選票 recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { // 進行PK,選出較優的服務器 // 更新選票 updateProposal(n.leader, n.zxid, n.peerEpoch); } else { // 無法選出較優的服務器 // 更新選票 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } // 發送本服務器的內部選票消息 sendNotifications(); } else if (n.electionEpoch < logicalclock) { // 選舉周期小於邏輯時鍾,不做處理,直接忽略 if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock)); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { // PK,選出較優的服務器 // 更新選票 updateProposal(n.leader, n.zxid, n.peerEpoch); // 發送消息 sendNotifications(); } if(LOG.isDebugEnabled()){ LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } // recvset用於記錄當前服務器在本輪次的Leader選舉中收到的所有外部投票 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) { // 若能選出leader // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ // 遍歷已經接收的投票集合 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ // 選票有變更,比之前提議的Leader有更好的選票加入 // 將更優的選票放在recvset中 recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { // 表示之前提議的Leader已經是最優的 // 設置服務器狀態 self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); // 最終的選票 Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch); // 清空recvqueue隊列的選票 leaveInstance(endVote); // 返回選票 return endVote; } }
若選票中的服務器狀態為FOLLOWING或者LEADING時,其大致步驟會判斷選舉周期是否等於邏輯時鍾,歸檔選票,是否已經完成了Leader選舉,設置服務器狀態,修改邏輯時鍾等於選舉周期,返回最終選票,其流程如下
if(n.electionEpoch == logicalclock){ // 與邏輯時鍾相等 // 將該服務器和選票信息放入recvset中 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(ooePredicate(recvset, outofelection, n)) { // 已經完成了leader選舉 // 設置本服務器的狀態 self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); // 最終的選票 Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); // 清空recvqueue隊列的選票 leaveInstance(endVote); return endVote; } } /* * Before joining an established ensemble, verify * a majority is following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if(ooePredicate(outofelection, outofelection, n)) { // 已經完成了leader選舉 synchronized(this){ // 設置邏輯時鍾 logicalclock = n.electionEpoch; // 設置狀態 self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } // 最終選票 Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); // 清空recvqueue隊列的選票 leaveInstance(endVote); // 返回選票 return endVote; }
三、總結
本篇博文詳細分析了FastLeaderElection的算法,其是ZooKeeper的核心部分,結合前面的理論學習部分(點擊這里可查看),可以比較輕松的理解其具體過程。也謝謝各位園友的觀看~