zookeeper集群
配置多個實例共同構成一個集群對外提供服務以達到水平擴展的目的,每個服務器上的數據是相同的,每一個服務器均可以對外提供讀和寫的服務,這點和redis是相同的,即對客戶端來講每個服務器都是平等的。
這篇主要分析leader的選擇機制,zookeeper提供了三種方式:
- LeaderElection
- AuthFastLeaderElection
- FastLeaderElection
默認的算法是FastLeaderElection,所以這篇主要分析它的選舉機制。
選擇機制中的概念
服務器ID
比如有三台服務器,編號分別是1,2,3。
編號越大在選擇算法中的權重越大。
數據ID
服務器中存放的最大數據ID.
值越大說明數據越新,在選舉算法中數據越新權重越大。
邏輯時鍾
或者叫投票的次數,同一輪投票過程中的邏輯時鍾值是相同的。每投完一次票這個數據就會增加,然后與接收到的其它服務器返回的投票信息中的數值相比,根據不同的值做出不同的判斷。
選舉狀態
- LOOKING,競選狀態。
- FOLLOWING,隨從狀態,同步leader狀態,參與投票。
- OBSERVING,觀察狀態,同步leader狀態,不參與投票。
- LEADING,領導者狀態。
選舉消息內容
在投票完成后,需要將投票信息發送給集群中的所有服務器,它包含如下內容。
- 服務器ID
- 數據ID
- 邏輯時鍾
- 選舉狀態
選舉流程圖
因為每個服務器都是獨立的,在啟動時均從初始狀態開始參與選舉,下面是簡易流程圖。
選舉狀態圖
描述Leader選擇過程中的狀態變化,這是假設全部實例中均沒有數據,假設服務器啟動順序分別為:A,B,C。
源碼分析
QuorumPeer
主要看這個類,只有LOOKING狀態才會去執行選舉算法。每個服務器在啟動時都會選擇自己做為領導,然后將投票信息發送出去,循環一直到選舉出領導為止。
public void run() {
//.......
try {
while (running) {
switch (getPeerState()) {
case LOOKING:
if (Boolean.getBoolean("readonlymode.enabled")) {
//...
try {
//投票給自己...
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
//...
} finally {
//...
}
} else {
try {
//...
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
//...
}
}
break;
case OBSERVING:
//...
break;
case FOLLOWING:
//...
break;
case LEADING:
//...
break;
}
}
} finally {
//...
}
}
FastLeaderElection
它是zookeeper默認提供的選舉算法,核心方法如下:具體的可以與本文上面的流程圖對照。
public Vote lookForLeader() throws InterruptedException {
//...
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
//給自己投票
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//將投票信息發送給集群中的每個服務器
sendNotifications();
//循環,如果是競選狀態一直到選舉出結果
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
//沒有收到投票信息
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
//...
}
//收到投票信息
else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
switch (n.state) {
case LOOKING:
// 判斷投票是否過時,如果過時就清除之前已經接收到的信息
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
//更新投票信息
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
//發送投票信息
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
//忽略
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
//更新投票信息
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//判斷是否投票結束
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
//忽略
break;
case FOLLOWING:
case LEADING:
//如果是同一輪投票
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//判斷是否投票結束
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
//記錄投票已經完成
outofelection.put(n.sid, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
//忽略
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
//...
}
}
判斷是否已經勝出
默認是采用投票數大於半數則勝出的邏輯。
選舉流程簡述
目前有5台服務器,每台服務器均沒有數據,它們的編號分別是1,2,3,4,5,按編號依次啟動,它們的選擇舉過程如下:
- 服務器1啟動,給自己投票,然后發投票信息,由於其它機器還沒有啟動所以它收不到反饋信息,服務器1的狀態一直屬於Looking。
- 服務器2啟動,給自己投票,同時與之前啟動的服務器1交換結果,由於服務器2的編號大所以服務器2勝出,但此時投票數沒有大於半數,所以兩個服務器的狀態依然是LOOKING。
- 服務器3啟動,給自己投票,同時與之前啟動的服務器1,2交換信息,由於服務器3的編號最大所以服務器3勝出,此時投票數正好大於半數,所以服務器3成為領導者,服務器1,2成為小弟。
- 服務器4啟動,給自己投票,同時與之前啟動的服務器1,2,3交換信息,盡管服務器4的編號大,但之前服務器3已經勝出,所以服務器4只能成為小弟。
- 服務器5啟動,后面的邏輯同服務器4成為小弟。