zk集群運行過程中,服務器選舉的源碼剖析
在zk服務器集群啟動過程中,經QuorumPeerMain中,不光會創建ZooKeeperServer對象,同時會生成QuorumPeer對象,代表了ZooKeeper集群中的一台機器。在整個機器運行期間,負責維護該機器的運行狀態,同時會根據情況發起Leader選舉。下圖是 《從PAXOS到ZOOKEEPER分布式一致性原理與實踐》的服務器啟動流程

QuorumPeer是一個獨立的線程,維護着zk機器的狀態。
@Override
public synchronized void start() {
loadDataBase();
cnxnFactory.start();
startLeaderElection();
super.start();
}
本次主要介紹的是選舉相關的內容,至於其他操作可以看其他博客。之后的行文都是從startLeaderElection中衍生出來的。
基本概念:
SID:服務器ID,用來標示ZooKeeper集群中的機器,每台機器不能重復,和myid的值一直
ZXID:事務ID
Vote: 選票,具體的數據結構后面有
Quorum:過半機器數
選舉輪次:logicalclock,zk服務器Leader選舉的輪次
服務器類型:
在zk中,引入了Leader、Follwer和Observer三種角色。zk集群中的所有機器通過一個Leader選舉過程來選定一台被稱為Leader的機器,Leader服務器為客戶端提供讀和寫服務。Follower和Observer都能夠提供讀服務,唯一的區別在於,Observer機器不參與Leader選舉過程,也不參與寫操作的過半寫成功策略。因此,Observer存在的意義是:在不影響寫性能的情況下提升集群的讀性能。
服務器狀態:
+ LOOKING:Leader選舉階段
+ FOLLOWING:Follower服務器和Leader保持同步狀態
+ LEADING:Leader服務器作為主進程領導狀態。
+ OBSERVING:觀察者狀態,表明當前服務器是Observer,不參與投票
選舉的目的就是選擇出合適的Leader機器,由Leader機器決定事務性的Proposal處理過程,實現類兩階段提交協議(具體是ZAB協議)
QuorumPeer維護集群機器狀態
QuorumPeer的職責就是不斷地檢測當前的zk機器的狀態,執行對應的邏輯,簡單來說,就是根據服務所處的不同狀態執行不同的邏輯。刪除了一部分邏輯后,代碼如下:
@Override
public void run() {
setName("QuorumPeer" + "[myid=" + getId() + "]" +
cnxnFactory.getLocalAddress());
try {
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
try {
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
}
}
} finally {
LOG.warn("QuorumPeer main thread exited");
}
}
當機器處於LOOKING狀態時,QuorumPeer會進行選舉,但是具體的邏輯並不是由QuorumPeer來負責的,整體的投票過程獨立出來了,從邏輯執行的角度看,整個過程設計到兩個主要的環節:
- 與其他的zk集群通信的過程
- 實現具體的選舉算法
而QuorumPeer中默認使用的選舉算法是FastLeaderElection,之后的分析也是基於FastLeaderElection而言的。
選舉過程中的整體架構
在集群啟動的過程中,QuorumPeer會根據配置實現不同的選舉策略 this.electionAlg = createElectionAlgorithm(electionType);
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
switch (electionAlgorithm) {
case 3:
QuorumCnxManager qcm = new QuorumCnxManager(this);
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
如果ClientCnxn是zk客戶端中處理IO請求的管理器,QuorumCnxManager是zk集群間負責選舉過程中網絡IO的管理器,在每台服務器啟動的時候,都會啟動一個QuorumCnxManager,用來維持各台服務器之間的網絡通信。
對於每一台zk機器,都需要建立一個TCP的端口監聽,在QuorumCnxManager中交給Listener來處理,使用的是Socket的阻塞式IO(默認監聽的端口是3888,是在config文件里面設置的)。在兩兩相互連接的過程中,為了避免兩台機器之間重復地創建TCP連接,zk制定了連接的規則:只允許SID打的服務器主動和其他服務器建立連接。實現的方式也比較簡單,在receiveConnection中,服務器會對比與自己建立連接的服務器的SID,判斷是否接受請求,如果自己的SID更大,那么會斷開連接,然后自己主動去和遠程服務器建立連接。這段邏輯是由Listener來做的,且Listener獨立線程,receivedConnection,建立連接后的示意圖:

QuorumCnxManager是連接的管家,具體的TCP連接交給了Listener,但是對於選票的管理,內部還維護了一系列的隊列:
- recvQueue:消息接收隊列,用來存放那些從其他服務器接收到的消息,單獨的隊列
- 分組隊列(quorumCnxManager中將zk集群中的每台機器按照SID單獨分組形成隊列集合):
- queueSendMap:消息發送隊列,用於保存待發送的消息。
new ConcurrentHashMap<Long, ArrayBlockingQueue按照SID分組,分別為每台機器分配一個單獨隊列,保證各台機器之間的消息發放互不影響>(); - senderWorderMap:發送器集合。每個SendWorder消息發送器,都對應一台遠程zk服務器,負責消息的發放。
- lastMessageSent:最近發送過的消息,按照SID分組
- queueSendMap:消息發送隊列,用於保存待發送的消息。
基本的通信流程如下:

以上內容主要是建立各台zk服務器之間的連接通信過程,具體的選舉策略zk抽象成了
public interface Election {
public Vote lookForLeader() throws InterruptedException;
public void shutdown();
}
FastLeaderElection選舉算法
上面說過QuorumPeer檢測到當前服務器的狀態是LOOKING的時候,就會進行新一輪的選舉,通過 setCurrentVote(makeLEStrategy().lookForLeader());也就是FastLeaderElection的lookForLeader來進行初始選擇,實現的方式也很簡單,主要的邏輯在FastLeaderElection.lookForLeader中實現:

基本流程先說明一下:
- QuorumPeer會輪詢檢查當前服務器狀態,如果發現State是LOOKING,調用Election的lookForLeader來開始新一輪的選舉
- FastLeaderElection會首先將logicallock++,表示新的一輪選舉開始了
- 構造初始的選票,Vote的內容就是選自己,然后通知zk集群中的其他機器
- FastLeaderElection會一直輪詢查狀態,只要是LOOKING態,就會從recvqueue中獲取其他服務器同步的選票信息,為了方便說明,記錄為n
- 根據n的票選信息狀態,做相關的操作
- LOOKING: 都處於無Leader態,比較一下選票的優劣,看是否更新自己的選票,如果更新了就同時通知給其他服務器
- FOLLOWING、LEADING:說明集群中已經有Leader存在,更新一下自己的狀態,結束本輪投票
- OBSERVING:這票沒什么卵用,直接舍棄(OBSERVER是不參與投票的)
根據上面的流程,可以大概說明一下FasterLeaderElection確定選票更優的策略:
- 如果外部投票中被推舉的Leader服務器選舉輪次大於自身的輪次,那么就更新選票
- 如果選舉輪次一致,就對比兩者的ZXID,ZAB協議中ZXID越大的留存的信息也越多,因此如果ZXID大於自己的,那么就更新選票
- 如果ZXID也一致,對比兩者的SID,SID大,則優先級高
總結:
以上就是zk的默認選票流程,按照ZAB協議的兩種狀態分析:
- 初始化的時候,處於同一輪次進行投票直到投票選擇出一個Leader
- 崩潰恢復階段:
- Leader服務器掛了,那么經歷的和初始化流程類似的過程,選擇Leader
- Follower服務器掛了,那么自己在執行選舉的過程中,會收到其他服務器給的Leader選票信息,也可以確定Leader所屬
