前言
在過去的兩節里已經分析了選舉過程中的一些實體類和網絡IO相關的機制與源碼,這一節將會對zookeeper選舉的核心類FastLeaderElection進行分析。
FastLeaderEleaction基本結構

可以看到FastLeaderElection的基本結構還是比較清晰的,主要從新的成員變量類和內部類來分析下FastLeaderElection的基本結構。
Notification
/**
* Notifications are messages that let other peers know that
* a given peer has changed its vote, either because it has
* joined leader election or because it learned of another
* peer with higher zxid or same zxid and higher server id
*/
static public class Notification {
從Notification的注釋我們能看到,它的目的就是通知其他peer修改了選票。從Notification的成員變量可以看,Notification基本和Vote類一致。但是在Notification類里有一個version用來標記當前Notification的version,可能是為了用來做不同版本zk之間通信來做一些邏輯處理,這部分目前沒看到有什么實際的使用。
ToSend
ToSend主體和Vote類也一致,但是ToSend類多了一個sid,用來判斷發給哪個server,為了要包裝這樣一個類,我的想法是方便在FastLeaderElection處理業務邏輯的便利。
Messenger
從代碼結構中可以看到,Messenger主要分為WorkerReceiver和WorkerSender兩個子類。
WorkerReceiver
/**
* Receives messages from instance of QuorumCnxManager on
* method run(), and processes such messages.
*/
從注釋可以看到,WorkerReceiver的目的就是為了從peer接收消息並進行處理的。Workerreceiver繼承了ZookeeperThread,所以也是一個單獨的處理任務的線程。它的run方法代碼比較長,從參考2里取了一張流程圖來表示,並對關鍵部分解析一下。

可以看到,receiver在收到消息后,會去判斷是否是observer發來的消息,如果是observer直接給它同步就可以了,如果是非observer的peer,就去看自己的狀態是否是LOOKING,如果自己是LOOING,且對方的選舉周期比自己小,那么就給對方同步自己的提議;如果自己不是LOOKING,但對方是LOOKING,那么就把之前的投票結果發給對方。
/*
* If it is from an observer, respond right away.
* Note that the following predicate assumes that
* if a server is not a follower, then it must be
* an observer. If we ever have any other type of
* learner in the future, we'll have to change the
* way we check for observers.
*/
if(!self.getVotingView().containsKey(response.sid)){//votingview是有投票資格的peer列表,沒在列表里代表是observer
Vote current = self.getCurrentVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());
sendqueue.offer(notmsg);
}
可以看到,如果是observer的消息,那么直接生成一條notification類型的信息發送給對應的peer就可以。
/*
* If this server is looking, then send proposed leader
*/
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){//如果自己是LOOKING狀態
recvqueue.offer(n);//把消息放入recvqueue
/*
* Send a notification back if the peer that sent this
* message is also looking and its logical clock is
* lagging behind.
*/
if((ackstate == QuorumPeer.ServerState.LOOKING)//發送方也是LOOKING
&& (n.electionEpoch < logicalclock.get())){
Vote v = getVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch());
sendqueue.offer(notmsg);//發送自己的Vote給對方
}
} else {
/*
* If this server is not looking, but the one that sent the ack
* is looking, then send back what it believes to be the leader.
*/
Vote current = self.getCurrentVote();//如果自己不是LOOKING,那么就生成自己認為的當前的Vote情況
if(ackstate == QuorumPeer.ServerState.LOOKING){
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id = " +
self.getId() + " recipient=" +
response.sid + " zxid=0x" +
Long.toHexString(current.getZxid()) +
" leader=" + current.getId());
}
ToSend notmsg;
if(n.version > 0x0) {//這里根據version生成不同的消息,但是version具體的作用還是不太清除
notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());
} else {
Vote bcVote = self.getBCVote();
notmsg = new ToSend(
ToSend.mType.notification,
bcVote.getId(),
bcVote.getZxid(),
bcVote.getElectionEpoch(),
self.getPeerState(),
response.sid,
bcVote.getPeerEpoch());
}
sendqueue.offer(notmsg);//把要發送的消息放入sendqueue
}
}
這里邏輯蠻清晰的,但是有一點要強調一下,FastLeaderElection中也有收發隊列,上一篇講的網絡IO里也有收發隊列,他們是怎么配合工作的呢。看下WorkerReceiver的run方法的開頭就可以看到
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
這樣就很清晰了,FastLeaderElection的WorkerReceiver里的網絡IO的receiver從IO的隊列中取出,然后放到FastLeaderElection的接收隊列中。這就是一個兩層隊列的關系,IO中的隊列專門用來處理底層byte的處理及一些基礎邏輯,然后設計到算法的邏輯在FastLeaderElection的中處理,並在FastLeaderElection的隊列中生產消費。簡單點說就是FastLeaderElection的隊列是以來網絡IO的隊列的。
WorkerSender
WorkerSender的邏輯就比較簡單了。
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);//從發送隊列中取出
if(m == null) continue;
process(m);//放到網絡io的放松隊列中
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
選舉方法分析
FastLeaderElection的主要選舉邏輯在lookForLeader方法里,先通過分析lookForLeader來看下選舉的主要流程。這里面有許多還沒有分析的方法,可以先看大致的邏輯,然后針對具體的方法進行分析。
/**
* Starts a new round of leader election. Whenever our QuorumPeer
* changes its state to LOOKING, this method is invoked, and it
* sends notifications to all other peers.
*/
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(//注冊JMX監控
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();//初始化選舉時間
}
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();//recvset是本輪選舉收到的選票集合,按sid分
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();//FOLLOWING和LEADING的peer發來的選票
int notTimeout = finalizeWait;
synchronized(this){
logicalclock.incrementAndGet();//增加選舉輪次
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());//初始化選票,投自己
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();//給每個peer發送自己的提議
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){//交互的選舉過程開始
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);//從接收隊列中取出通知
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){//這時選舉還沒有結束,而接收隊列一個通知都沒有,就代表需要去連接peer主動獲取他們的vote信息
if(manager.haveDelivered()){//如果已經發送過消息,即所有sid對應的發送隊列都空了
sendNotifications();//重發一遍
} else {
manager.connectAll();//如果發送隊列里還有消息,代表可能連接斷開了,那就重連一次
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);//這里從接收隊列取出通知的等待時間是會加長的,
LOG.info("Notification time out: " + notTimeout);
}
else if(self.getVotingView().containsKey(n.sid)) {//是其他peer發來的消息
/*
* Only proceed if the vote comes from a replica in the
* voting view.
*/
switch (n.state) {
case LOOKING://是looking狀態的
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {//對方的epoch比自己高
logicalclock.set(n.electionEpoch);//更新自己的logicalLock
recvset.clear();//清除之前收到的選票(這些選票一定是跟自己更新前的logicalClock一個epoch,不然之前就被清掉了)
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {//對方的選票比自己的本身的初始vote要“好”
updateProposal(n.leader, n.zxid, n.peerEpoch);//換成自己的選票
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());//不然把自己現在的選票發給對方
}
sendNotifications();//給大家更新一份自己的選票
} else if (n.electionEpoch < logicalclock.get()) {//對方epoch比自己低就不用管,等對方收到peer的通知就知道了,對方自己回去走上面的邏輯去更新
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {//如果epoch相同,那么就比較自己當前的vote和對方的vote信息,對方比自己“好”
updateProposal(n.leader, n.zxid, n.peerEpoch);//更新提議
sendNotifications();//發送通知
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));//更新接受的選票集合
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {//如果自己投的票的sid被選為leader
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){//接收隊列還有消息
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){//接收隊列新的消息比自己投的還要好(注意,這時候已經認為自己投的是leader了)
recvqueue.put(n);//再把消息放進接收隊列,為啥這樣做?我的想法是因為因為有網絡的延遲,所以可能出現一種情況就是比如集群里有一台機器的選票沒有發過來,但是它的選票是最優的,在其他的完成選舉后,它的選票發來了,但是這時候當前server的狀態還沒有改掉,於是就把這個選票再放回去,下次取出來的時候就在switch的其他邏輯里處理了
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {//當前接收隊列里沒有其他邏輯了
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());//設置自己的狀態
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);//清空接收隊列
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock.get()){//選舉周期是同一個
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));//
if(ooePredicate(recvset, outofelection, n)) {//判斷選舉是否成功,即leader是否選出即leader是否認為自己是leader(leader可能失效)
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());//設置狀態
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));//更新follower或leader的選票集合
if(ooePredicate(outofelection, outofelection, n)) {
synchronized(this){
logicalclock.set(n.electionEpoch);//更新epoch
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);//卸載jmx
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;//幫助GC
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}
大致的邏輯是這樣,在網上看資料的時候看到了兩張圖講的蠻好的,貼在這里,可以按照這個邏輯再看一下。


其實無論是從流程圖還是代碼都可以看到,如果進行了一些更新之后發現沒有達到ooePredicate的要求,也就是說支持某一個sid的選票沒有過半或者選舉出來的leader不合格(epoch不對或者狀態不對等),那么server自己的狀態不會修改,這樣在下一次循環里又會重新連接其他server或者重新接受選票進行選舉。
選舉的邏輯在代碼分析里已經講的比較詳細了,再把里面具體的方法過一下。
updateProposal
synchronized void updateProposal(long leader, long zxid, long epoch){
if(LOG.isDebugEnabled()){
LOG.debug("Updating proposal: " + leader + " (newleader), 0x"
+ Long.toHexString(zxid) + " (newzxid), " + proposedLeader
+ " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)");
}
proposedLeader = leader;
proposedZxid = zxid;
proposedEpoch = epoch;
}
很簡單,把自己propose的leader信息更新。
getInitId(), getInitLastLoggedZxid(), getPeerEpoch()
private long getInitId(){
if(self.getLearnerType() == LearnerType.PARTICIPANT)
return self.getId();
else return Long.MIN_VALUE;
}
返回自己的sid。
private long getInitLastLoggedZxid(){
if(self.getLearnerType() == LearnerType.PARTICIPANT)
return self.getLastLoggedZxid();
else return Long.MIN_VALUE;
}
返回自己最大的zxid。
private long getPeerEpoch(){
if(self.getLearnerType() == LearnerType.PARTICIPANT)
try {
return self.getCurrentEpoch();
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
else return Long.MIN_VALUE;
}
邏輯同樣很簡單。獲取到自己的epoch。
totalOrderPredicate
正如在lookForLeader中看的,這個方法是用來比較選票的優劣的。
/**
* Check if a pair (server id, zxid) succeeds our
* current vote.
*
* @param id Server identifier
* @param zxid Last zxid observed by the issuer of this vote
*/
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
如注釋所言,三種情況,1. epoch高;2. epoch一樣zxid大;3. epoch和zxid都一樣,sid大。
sendNotification
/**
* Send notifications to all peers upon a change in our vote
*/
private void sendNotifications() {
for (QuorumServer server : self.getVotingView().values()) {//遍歷peer
long sid = server.id;
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch);
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
sendqueue.offer(notmsg);//給每個peer發送自己的vote信息
}
}
termPredicate
/**
* Termination predicate. Given a set of votes, determines if
* have sufficient to declare the end of the election round.
*
* @param votes Set of votes
* @param l Identifier of the vote received last
* @param zxid zxid of the the vote received last
*/
protected boolean termPredicate(
HashMap<Long, Vote> votes,
Vote vote) {
HashSet<Long> set = new HashSet<Long>();
/*
* First make the views consistent. Sometimes peers will have
* different zxids for a server depending on timing.
*/
for (Map.Entry<Long,Vote> entry : votes.entrySet()) {//看所有選票里投的sid和vote一致的
if (vote.equals(entry.getValue())){
set.add(entry.getKey());
}
}
return self.getQuorumVerifier().containsQuorum(set);//用校驗器校驗
}
termPredicate是用來判斷vote是否是選出的leader選票的。
ooePredicate,checkLeader
/**
* In the case there is a leader elected, and a quorum supporting
* this leader, we have to check if the leader has voted and acked
* that it is leading. We need this check to avoid that peers keep
* electing over and over a peer that has crashed and it is no
* longer leading.
*
* @param votes set of votes
* @param leader leader id
* @param electionEpoch epoch id
*/
protected boolean checkLeader(
HashMap<Long, Vote> votes,
long leader,
long electionEpoch){
boolean predicate = true;
/*
* If everyone else thinks I'm the leader, I must be the leader.
* The other two checks are just for the case in which I'm not the
* leader. If I'm not the leader and I haven't received a message
* from leader stating that it is leading, then predicate is false.
*/
if(leader != self.getId()){//自己不是leader
if(votes.get(leader) == null) predicate = false;//如果leader投過票
else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false;//或者leader的狀態不是leading,那么認為這個投票是無效的(如注釋,只要我自己不是leader,且我沒有收到leader給我發它是leader,那么就不行)
} else if(logicalclock.get() != electionEpoch) {//我自己是leader,但是epoch不對,說明我曾經掛過
predicate = false;
}
return predicate;
}
/**
* This predicate checks that a leader has been elected. It doesn't
* make a lot of sense without context (check lookForLeader) and it
* has been separated for testing purposes.
*
* @param recv map of received votes
* @param ooe map containing out of election votes (LEADING or FOLLOWING)
* @param n Notification
* @return
*/
protected boolean ooePredicate(HashMap<Long,Vote> recv,
HashMap<Long,Vote> ooe,
Notification n) {
return (termPredicate(recv, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state))
&& checkLeader(ooe, n.leader, n.electionEpoch));//過半的校驗加上leader的檢查
}
到這里差不多選舉的算法代碼部分邏輯就清晰了,主要的部分和在zab思考那一節里講的一致,但是實現工程代碼還是多考慮到了很多網絡丟失或者別的情況帶來的一些異常,邏輯還是比較復雜的。
思考
- 在fastleaderelection中,有一個currentVote變量保存的是從第一輪到現在為止自己決定的最終的投票,一般是用來做通知,而在動態地投票過程中,臨時生成的投票都是用getVote new出來的,並不會使用currentVote。
- 有一段代碼邏輯值得仔細去思考
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){//接收隊列還有消息
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){//接收隊列新的消息比自己投的還要好(注意,這時候已經認為自己投的是leader了)
recvqueue.put(n);//再把消息放進接收隊列,為啥這樣做?我的想法是因為因為有網絡的延遲,所以可能出現一種情況就是比如集群里有一台機器的選票沒有發過來,但是它的選票是最優的,在其他的完成選舉后,它的選票發來了,但是這時候當前server的狀態還沒有改掉,於是就把這個選票再放回去,下次取出來的時候就在switch的其他邏輯里處理了
這種異常情況具體描述就是ABCDE五台server都進行選舉,它們的epoch和zxid相互網絡一切正常,A在lookForLeader的looking狀態處理時發現選舉了D,然后這個時候E發送的消息來到了queue里,這時A去檢查queue里發現這個投票居然比選出來的leader還要好,但是leader已經選出來了不能改了,於是就放回去,下一輪循壞在處理,因為下一次來的時候自己已經是following的狀態了,在switch的following處理邏輯里,下一次這個選票其實啥邏輯都不會走,會變成一張"廢票"。這種延遲的策略還是比較機智的。
- 選舉的檢查
其實主要就是過半檢查和leader的有效性檢查。
上面說的推遲一輪之后,那張選票為啥會變成廢票呢,就是因為過不了選舉的檢查策略。
