前言
ZooKeeper對Zab協議的實現有自己的主備模型,即Leader和learner(Observer + Follower),有如下幾種情況需要進行領導者的選舉工作
- 情形1: 集群在啟動的過程中,需要選舉Leader
- 情形2: 集群正常啟動后,leader因故障掛掉了,需要選舉Leader
- 情形3: 集群中的Follower數量不足以通過半數檢驗,Leader會掛掉自己,選舉新leader
- 情景4: 集群正常運行,新增加1個Follower
本篇博文,從這四個方面進行源碼的追蹤閱讀
程序入口
QuorumPeer.java
相當於集群中的每一個節點server,在它的start()
方法中,完成當前節點的啟動工作,源碼如下:
// todo 進入了 QuorumPeer(意為仲裁人數)類中,可以把這個類理解成集群中的某一個點
@Override
public synchronized void start() {
// todo 從磁盤中加載數據到內存中
loadDataBase();
// todo 啟動上下文的這個工廠,他是個線程類, 接受客戶端的請求
cnxnFactory.start();
// todo 開啟leader的選舉工作
startLeaderElection();
// todo 確定服務器的角色, 啟動的就是當前類的run方法在900行
super.start();
}
第一個loadDataBase();
目的是將數據從集群中恢復到內存中
第二個cnxnFactory.start();
是當前的節點可以接受來自客戶端(java代碼,或者控制台)發送過來的連接請求
第三個 startLeaderElection();
開啟leader的選舉工作, 但是其實他是初始化了一系列的輔助類,用來輔助leader的選舉,並非真正在選舉
當前類,quorumPeer
繼承了ZKThread,它本身就是一個線程類, super.start();
就是啟動它的run方法,在他的Run方法中有一個while循環,一開始在程序啟動的階段,所有的節點的默認值都是Looking
,於是會進入這個分支中,在這個分之中會進行真正的leader選舉工作
小結
從程序的入口介紹中,可以看出本篇文章在會着重看下startLeaderElection();
做了哪些工作? 以及在looking
分支中如何選舉leader
情形1:集群在啟動的過程中,選舉新Leader
進入 startLeaderElection();
方法,源碼如下, 他主要做了兩件事
- 對本類
QuorumPeer.java
維護的變量(volatile private Vote currentVote;
)初始化 createElectionAlgorithm()
創建一個leader選舉的方法
其實到現在,就剩下一個算法沒過期了,就是
fastLeaderElection
// TODO 開啟投票選舉Leader的工作
synchronized public void startLeaderElection() {
try {
// todo 創建了一個封裝了投票結果對象 包含myid 最大的zxid 第幾輪Leader
// todo 先投票給自己
// todo 跟進它的構造函數
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
for (QuorumServer p : getView().values()) {
if (p.id == myid) {
myQuorumAddr = p.addr;
break;
}
}
if (myQuorumAddr == null) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
// todo 創建一個領導者選舉的算法,這個算法還剩下一個唯一的實現 快速選舉
this.electionAlg = createElectionAlgorithm(electionType);
}
繼續跟進 createElectionAlgorithm(electionType)
, 在這個方法中做了如下三件大事
- 創建了
QuorumCnxnManager
- 創建
Listenner
- 創建
FastLeaderElection
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
// todo 創建CnxnManager 上下文的管理器
qcm = createCnxnManager();
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
// todo 在這里將listener 開啟
listener.start();
// todo 實例化領導者選舉的算法
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
准備選舉環境
QuorumManager
上圖是QuorumCnxManager的類圖,看一下,它有6個內部類, 其中的除了Message
外其他都是可以單獨運行的線程類
這個類有着舉足輕重的作用,它是集群中全體節點共享輔助類, 那到底有什么作用呢? 我不賣關子直接說,因為leader的選舉是通過投票決議出來的,既然要相互投票,那集群中的各個點就得兩兩之間建立連接,這個QuorumCnxManager
就負責維護集群中的各個點的通信
它維護了兩種隊列,源碼在下面,第一個隊列被存入了ConcurrentHashMap
中 key就是節點的myid(或者說是serverId),值可以理解成存儲它往其他服務器發送投票的隊列
第二個隊列是收到的其他服務器發送過來的msg
// todo key=serverId(myid) value = 保存着當前服務器向其他服務器發送消息的隊列
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
// todo 接收到的所有數據都在這個隊列中
public final ArrayBlockingQueue<Message> recvQueue;
如上圖是手繪的QuorumCnxManager.java
的體系圖,最直觀的可以看到它內部的三條線程類,那三條線程類的run()方法又分別做了什么呢?
SendWorker的run(), 可以看到它根據sid取出了當前節點對應的隊列,然后將隊列中的數據往外發送
public void run() {
threadCnt.incrementAndGet();
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
LOG.debug("Attempting to send lastMessage to sid=" + sid);
send(b);
}
}
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}
try {
while (running && !shutdown && sock != null) {
ByteBuffer b = null;
try {
// todo 取出任務所在的隊列
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq != null) {
// todo 將bq,添加進sendQueue
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for " +
"server " + sid);
break;
}
if(b != null){
lastMessageSent.put(sid, b);
// todo
send(b);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue",
e);
}
}
RecvWorker的run方法,接受到了msg,然后將msg存入了recvQueue
隊列中
public void run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
/**
* Reads the first int to determine the length of the
* message
*/
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException(
"Received packet with invalid packet: "
+ length);
}
/**
* Allocates a new ByteBuffer to receive the message
*/
// todo 從數組中把數據讀取到數組中
byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
// todo 將數組包裝成ByteBuf
ByteBuffer message = ByteBuffer.wrap(msgArray);
// todo 添加到RecvQueue中
addToRecvQueue(new Message(message.duplicate(), sid));
}
]
Listenner的run(),它會使用我們在配置文件中配置的集群鍵通信使用的端口(如上圖的3888)建立彼此之間的連接
還能發現,集群中各個點之間的通信使用的傳統socket通信
InetSocketAddress addr;
while((!shutdown) && (numRetries < 3)){
try {
// todo 創建serversocket
ss = new ServerSocket();
ss.setReuseAddress(true);
if (listenOnAllIPs) {
int port = view.get(QuorumCnxManager.this.mySid)
.electionAddr.getPort();
//todo 它取出來的地址就是address就是我們在配置文件中配置集群時添加進去的 port 3888...
addr = new InetSocketAddress(port);
} else {
addr = view.get(QuorumCnxManager.this.mySid)
.electionAddr;
}
LOG.info("My election bind port: " + addr.toString());
setName(view.get(QuorumCnxManager.this.mySid)
.electionAddr.toString());
// todo 綁定端口
ss.bind(addr);
while (!shutdown) {
// todo 阻塞接受其他的服務器發起連接
Socket client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request "
+ client.getRemoteSocketAddress());
// todo 如果啟用了仲裁SASL身份驗證,則異步接收和處理連接請求
// todo 這是必需的,因為sasl服務器身份驗證過程可能需要幾秒鍾才能完成,這可能會延遲下一個對等連接請求。
if (quorumSaslAuthEnabled) {
// todo 異步接受一個連接
receiveConnectionAsync(client);
} else {
// todo 跟進這個方法
receiveConnection(client);
}
numRetries = 0;
}
繼續跟進源碼,回到QuorumPeer.java
的createElectionAlgorithm()
方法中,重新截取源碼如下,完成了QuorumCnxManager
的創建,后進行Listener的啟動, Listenner的啟動標記着集群中的各個節點之間有了兩兩之間建立通信能力, 同時Listenner是個線程類,它的Run()方法就在上面的代碼中
FastLeaderElection
啟動Listenner之后, 開始實例化領導者選舉的算法對象new FastLeaderElection(this, qcm)
...
break;
case 3:
// todo 創建CnxnManager 上下文的管理器
qcm = createCnxnManager();
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
// todo 在這里將listener 開啟
listener.start();
// todo 實例化領導者選舉的算法
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
如下圖是FasterElection
的類圖
直觀的看到它三個直接內部類
- Messager(它又有兩個內部線程類)
- WorkerRecriver
- 負責將
- WorkerSender
- WorkerRecriver
- Notification
- 一般是當新節點啟動時狀態為looking,然后發起投票決議,其他節點收到后會用
Notification
告訴它自己信任的leader
- 一般是當新節點啟動時狀態為looking,然后發起投票決議,其他節點收到后會用
- ToSend
- 給對方發送,或者來自其他節點的消息。這些消息既可以是通知,也可以是接收通知的ack
對應着QuorumCnxManager
維護的兩種隊列,FasterElection
同樣維護下面兩個隊列與之照應,一個是sendqueue
另一個是recvqueue
LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;
具體怎么玩呢? 如下圖
就是當節點啟動過程中對外的投票會存入FasterElection
的sendqueue
,然后經過QuorumCnxManager
的sendWorker
通過NIO發送出去, 與之相反的過程,收到的其他節點的投票會被QuorumCnxManager
的recvWorker
收到,然后存入QuorumCnxManager
的recvQueue中
,這個隊列中的msg會繼續被FasterElection
的內部線程類workerRecviver
取出存放到FasterElection
的recvqueue中
通過追蹤代碼,可以發現,Message的兩個內部線程都被作為守護線程的方式開啟
Messenger(QuorumCnxManager manager) {
// todo WorderSender 作為一條新的線程
this.ws = new WorkerSender(manager);
Thread t = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
//todo------------------------------------
// todo WorkerReceiver 作為一條新的線程
this.wr = new WorkerReceiver(manager);
t = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
}
小結
代碼看到這里,其實選舉leader的准備工作已經完成了,也就是說quorumPeer.java
的start()
方法中的startLeaderElection();
已經准備領導選舉的環境,就是上圖
真正開始選舉
下面就去看一下quorumPeer.java
的這個線程類的啟動,部分run()
方法的截取,我們關心它的lookForLeader()
方法
while (running) {
switch (getPeerState()) {
/**
* todo 四種可能的狀態, 經過了leader選舉之后, 不同的服務器就有不同的角色
* todo 也就是說,不同的服務器會會走動下面不同的分支中
* LOOKING 正在進行領導者選舉
* Observing
* Following
* Leading
*/
case LOOKING:
// todo 當為Looking狀態時,會進入領導者選舉的階段
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
// todo 創建了一個 只讀的server但是不着急立即啟動它
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
logFactory, this,
new ZooKeeperServer.BasicDataTreeBuilder(),
this.zkDb);
// Instead of starting roZk immediately, wait some grace(優雅) period(期間) before we decide we're partitioned.
// todo 為了立即啟動roZK ,在我們決定分區之前先等一會
// Thread is used here because otherwise it would require changes in each of election strategy classes which is
// unnecessary code coupling.
//todo 這里新開啟一條線程,避免每一個選舉策略類上有不同的改變 而造成的代碼的耦合
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
// todo 啟動上面那個只讀的Server
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
setBCVote(null);
// todo 上面的代碼都不關系,直接看它的 lookForLeader()方法
// todo 直接點進去,進入的是接口,我們看它的實現類
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
下面是lookForLeader()
的源碼解讀
說實話這個方法還真的是挺長的,但是吧這個方法真的很重要,因為我們可以從這個方法中找到網絡上大家針對Leader的選舉總結的點點滴滴
第一點: 每次的投票都會先投自己一票,說白了new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
將自己的myid,最大的zxid,以及第幾屆封裝起來,但是還有一個細節,就是在投自己的同時,還是會將存有自己信息的這一票通過socket發送給其他的節點
接受別人的投票是通過QuorumManager
的recvWorker
線程類將投票添加進recvQueue
隊列中,投票給自己時,就不走這條路線了,而是選擇直接將票添加進recvQueue
隊列中
在下面代碼中存在一行 HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
這個map可以理解成一個小信箱,每一個節點都會維護一個信箱,這里面可能存放着自己投給自己的票,或者別人投給自己的票,或者別人投給別人的票,或者自己投給別人的票,通過統計這個信箱中的票數可以決定某一個節點是否可以成為leader,源碼如下, 使用信箱中的信息,
// todo 根據別人的投票,以及自己的投票判斷,本輪得到投票的集群能不能成為leader
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// todo 到這里說明接收到投票的機器已經是准leader了
// Verify if there is any change in the proposed leader
// todo 校驗一下, leader有沒有變動
while ((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
if (n == null) {
// todo 判斷自己是不是leader, 如果是,更改自己的狀態未leading , 否則根據配置文件確定狀態是 Observer 還是Follower
// todo leader選舉出來后, QuorumPeer中的run方法中的while再循環,不同角色的服務器就會進入到 不同的分支
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING : learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
在termPredicate()
函數中有如下的邏輯,self.getQuorumVerifier().containsQuorum(set);
它的實現如下,實際上就是在進行過半機制的檢驗,結論就是當某個節點擁有了集群中一半以上的節點的投票時,它就會把自己的狀態修改成leading, 其他的節點根據自己的需求將狀態該變成following或者observing
public boolean containsQuorum(Set<Long> set){
return (set.size() > half);
}
維護着一個時鍾,標記這是第幾次投票了logicalclock
他是AutomicLong類型的變量,他有什么用呢? 通過下面的代碼可以看到如下的邏輯,就是當自己的時鍾比當前接收到投票的時鍾小時,說明自己可能因為其他原因錯過了某次投票,所以更新自己的時鍾,重新判斷投自己還是投別人, 同理,如果接收到的投票的時鍾小於自己當前的時鍾,說明這個票是沒有意義的,直接丟棄不理會
if (n.electionEpoch > logicalclock.get()) {
// todo 將自己的時鍾調整為更新的時間
logicalclock.set(n.electionEpoch);
// todo 清空自己的投票箱
recvset.clear();
那么根據什么判斷是投給自己還是投給別人呢? 通過解析出票的封裝類中封裝的節點的信息,什么信息呢?zxid,myid,epoch 通常情況是epoch大的優先成為leader,一般來說epoch都會相同,所以zxid大的優先成為leader,如果zxid再相同,則myid大的優先成為leader
檢查到別的節點比自己更適合當leader,會重新投票,選舉更適合的節點
完整的源碼
// todo 當前進入的是FastLeaderElection.java的實現類
public Vote lookForLeader() throws InterruptedException {
try {
// todo 創建用來選舉Leader的Bean
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
// todo 每台服務器獨有的投票箱 , 存放其他服務器投過來的票的map
// todo long類型的key (sid)標記誰給當前的server投的票 Vote類型的value 投的票
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized (this) {
//todo Automic 類型的時鍾
logicalclock.incrementAndGet();
//todo 一開始啟動時,入參位置的值都取自己的,相當於投票給自己
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
// todo 發送出去,投票自己
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
// todo 如果自己一直處於LOOKING的狀態,一直循環
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
//todo 嘗試獲取其他服務器的投票的信息
// todo 從接受消息的隊列中取出一個msg(這個隊列中的數據就是它投票給自己的票)
// todo 在QuorumCxnManager.java中 發送的投票的邏輯中,如果是發送給自己的,就直接加到recvQueue,而不經過socket
// todo 所以它在這里是取出了自己的投票
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
// todo 第一輪投票這里不為空
if (n == null) {
// todo 第二輪就沒有投票了,為null, 進入這個分支
// todo 進行判斷 ,如果集群中有三台服務器,現在僅僅啟動一台服務器,還剩下兩台服務器沒啟動
// todo 那就會有3票, 其中1票直接放到 recvQueue , 另外兩票需要發送給其他兩台機器的邏輯就在這里判斷
// todo 驗證是通不過的,因為queueSendMap中的兩條隊列都不為空
if (manager.haveDelivered()) {
sendNotifications();
} else {
// todo 進入這個邏輯
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout * 2;
notTimeout = (tmpTimeOut < maxNotificationInterval ?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
} else if (validVoter(n.sid) && validVoter(n.leader)) {
// todo 收到了其他服務器的投票信息后,來到下面的分支中處理
/*
* Only proceed if the vote comes from a replica in the
* voting view for a replica in the voting view.
* todo 僅當投票來自投票視圖中的副本時,才能繼續進行投票。
*/
switch (n.state) {
case LOOKING:
// todo 表示獲取到投票的服務器的狀態也是looking
// If notification > current, replace and send messages out
// todo 對比接收到的頭片的 epoch和當前時鍾先后
// todo 接收到的投票 > 當前服務器的時鍾
// todo 表示當前server在投票過程中可能以為故障比其他機器少投了幾次,需要重新投票
if (n.electionEpoch > logicalclock.get()) {
// todo 將自己的時鍾調整為更新的時間
logicalclock.set(n.electionEpoch);
// todo 清空自己的投票箱
recvset.clear();
// todo 用別人的信息和自己的信息對比,選出一個更適合當leader的,如果還是自己適合,不作為, 對方適合,修改投票,投 對方
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
// todo 接收到的投票 < 當前服務器的時鍾
// todo 說明這個投票已經不能再用了
} else if (n.electionEpoch < logicalclock.get()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
// todo 別人的投票時鍾和我的時鍾是相同的
// todo 滿足 totalOrderPredicate 后,會更改當前的投票,重新投票
/**
* 在 totalOrderPredicate 比較兩者之間誰更滿足條件
* ((newEpoch > curEpoch) ||
* ((newEpoch == curEpoch) &&
* ((newZxid > curZxid) ||
* ((newZxid == curZxid) &&
* (newId > curId)))));
*/
// todo 返回true說明 對方更適合當leader
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
// todo 將自己的投票存放到投票箱子中
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// todo 根據別人的投票,以及自己的投票判斷,本輪得到投票的集群能不能成為leader
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// todo 到這里說明接收到投票的機器已經是准leader了
// Verify if there is any change in the proposed leader
// todo 校驗一下, leader有沒有變動
while ((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
// todo 判斷自己是不是leader, 如果是,更改自己的狀態未leading , 否則根據配置文件確定狀態是 Observer 還是Follower
// todo leader選舉出來后, QuorumPeer中的run方法中的while再循環,不同角色的服務器就會進入到 不同的分支
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING : learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
// todo 禁止Observer參加投票
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if (n.electionEpoch == logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
if (ooePredicate(recvset, outofelection, n)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING : learningState());
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if (ooePredicate(outofelection, outofelection, n)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING : learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
經過如上的判斷各個節點的就可以選舉出不同的角色,再次回到QuorumPeer.java
的run()
中進行循環時,不再會進入 case LOOKING:
代碼塊了,而是按照自己不同的角色各司其職,完成不同的初始化啟動
情形2: 集群正常啟動后,leader因故障掛掉了,選舉新Leader
第二種選舉leader的情況,集群正常啟動后,leader因故障掛掉了,選舉新Leader
這部分的邏輯是怎樣的呢?
leader雖然掛了,但是角色為Follower的server依然會去執行QuorumPeer.java
的run()
方法中的無限while循環,當它執行follower.followLeader();
方法時找不到leader,就會出異常,最終執行finally
代碼塊中的邏輯,可以看到它修改了自己的狀態為looking,進而重新選舉leader
break;
case FOLLOWING:
// todo server 當選follow角色
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
情形3: 集群中的Follower數量不足以通過半數檢驗,Leader會掛掉自己,然后選舉新leader
情形3: 假設集群中2台Follower,1台leader,那么當掛掉一台Follower時,剩下1台Follower無法滿足過半檢查機制因此會重新選舉leader
回到源碼:leader每次都進入case LEADING:
去執行leader.lead();
case LEADING:
// todo 服務器成功當選成leader
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
// todo 跟進lead
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
但是在leader.lead();
中每次執行都會進行如下的判斷,很明顯,當不滿足半數檢驗時,leader直接掛掉自己,最終將集群中所有節點的狀態改成LOOKING
,重新選舉
if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
//if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
// Lost quorum, shutdown
shutdown("Not sufficient followers synced, only synced with sids: [ "
+ getSidSetString(syncedSet) + " ]");
// make sure the order is the same!
// the leader goes to looking
return;
}
情景4: 集群正常運行,新增加1個Follower
新增加的進來的Follower在啟動時它的狀態是looking, 同樣她也會去嘗試選舉leader,同樣會把第一票投給自己,但是對於一個穩定的集群來說
集群中的各個角色已經確定下來了,在這種情況下,會進入FastLeaderElection.java
的lookForLeader()
方法的如下分支,使當前新添加進來的節點
直接認Leader
case OBSERVING:
// todo 禁止Observer參加投票
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if (n.electionEpoch == logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
if (ooePredicate(recvset, outofelection, n)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING : learningState());
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
如果有錯誤歡迎指出,如果對您有幫助,歡迎點支持