ZK介紹
ZK = zookeeper
ZK是微服務解決方案中擁有服務注冊發現最為核心的環境,是微服務的基石。作為服務注冊發現模塊,並不是只有ZK一種產品,目前得到行業認可的還有:Eureka、Consul。
這里我們只聊ZK,這個工具本身很小zip包就幾兆,安裝非常傻瓜,能夠支持集群部署。
官網地址:https://zookeeper.apache.org/
背景
在集群環境下ZK的leader&follower的概念,已經節點異常ZK面臨的問題以及如何解決。ZK本身是java語言開發,也開源到Github上但官方文檔對內部介紹的很少,零散的博客很多,有些寫的很不錯。
提問:
ZK節點狀態角色
ZK集群單節點狀態(每個節點有且只有一個狀態),ZK的定位一定需要一個leader節點處於lading狀態。
- looking:尋找leader狀態,當前集群沒有leader,進入leader選舉流程。
- following:跟隨者狀態,接受leading節點同步和指揮。
- leading:領導者狀態。
- observing:觀察者狀態,表名當前服務器是observer。
ZAB協議(原子廣播)
Zookeeper專門設計了一種名為原子廣播(ZAB)的支持崩潰恢復的一致性協議。ZK實現了一種主從模式的系統架構來保持集群中各個副本之間的數據一致性,所有的寫操作都必須通過Leader完成,Leader寫入本地日志后再復制到所有的Follower節點。一旦Leader節點無法工作,ZAB協議能夠自動從Follower節點中重新選出一個合適的替代者,即新的Leader,該過程即為領導選舉。
ZK集群中事務處理是leader負責,follower會轉發到leader來統一處理。簡單理解就是ZK的寫統一leader來做,讀可以follower處理,這也就是CAP理論中ZK更適合讀多寫少的服務。
腦裂問題
腦裂問題出現在集群中leader死掉,follower選出了新leader而原leader又復活了的情況下,因為ZK的過半機制是允許損失一定數量的機器而扔能正常提供給服務,當leader死亡判斷不一致時就會出現多個leader。
什么是腦裂?
簡單點來說,腦裂(Split-Brain) 就是比如當你的 cluster 里面有兩個節點,它們都知道在這個 cluster 里需要選舉出一個 master。那么當它們兩個之間的通信完全沒有問題的時候,就會達成共識,選出其中一個作為 master。
但是如果它們之間的通信出了問題,那么兩個結點都會覺得現在沒有 master,所以每個都把自己選舉成 master,於是 cluster 里面就會有兩個 master。
對於Zookeeper來說有一個很重要的問題,就是到底是根據一個什么樣的情況來判斷一個節點死亡down掉了?在分布式系統中這些都是有監控者來判斷的,但是監控者也很難判定其他的節點的狀態,唯一一個可靠的途徑就是心跳,所以Zookeeper也是使用心跳來判斷客戶端是否仍然活着。
使用ZooKeeper來做Leader HA基本都是同樣的方式:
- 每個節點都嘗試注冊一個象征Leader的臨時節點,其他沒有注冊成功的則成為follower,並且通過watch機制 (這里有介紹) 監控着leader所創建的臨時節點;
- Zookeeper通過內部心跳機制來確定leader的狀態,一旦Leader出現意外Zookeeper能很快獲悉並且通知其他的follower,其他flower在之后作出相關反應,這樣就完成了一個切換。這種模式也是比較通用的模式,基本大部分都是這樣實現的。
但是這里面有個很嚴重的問題,如果注意不到會導致短暫的時間內系統出現腦裂。因為心跳出現超時可能是Leader掛了,但是也可能是Zookeeper節點之間網絡出現了問題,導致Leader假死的情況。
Leader其實並未死掉,但是與ZooKeeper之間的網絡出現問題導致Zookeeper認為其掛掉了然后通知其他節點進行切換,這樣follower中就有一個成為了Leader。
但是原本的Leader並未死掉,這時候client也獲得Leader切換的消息,仍然會有一些延時,Zookeeper通訊需要一個一個通知。
這時候整個系統在混亂中,很有可能有一部分client已經通知到了連接到新的Leader上去了,而有的client仍然連接在老的Leader上。
如果同時有兩個client需要對Leader的同一個數據更新,並且剛好這兩個client此刻分別連接在新老的Leader上,就會出現很嚴重問題。
這里做下小總結:
- 假死:由於心跳超時(網絡原因導致的)認為Leader死了,但其實leader還存活着;
- 腦裂:由於假死會發起新的Leader選舉,選舉出一個新的Leader,但舊的Leader網絡又通了,導致出現了兩個Leader ,有的客戶端連接到老的Leader,而有的客戶端則連接到新的leader。
Zookeeper集群中的"腦裂"場景說明
對於一個集群,想要提高這個集群的可用性,通常會采用多機房部署,比如現在有一個由6台zkServer所組成的一個集群,部署在了兩個機房:

正常情況下,此集群只會有一個Leader,那么如果機房之間的網絡斷了之后,兩個機房內的zkServer還是可以相互通信的。如果不考慮過半機制,那么就會出現每個機房內部都將選出一個Leader。

這就相當於原本一個集群,被分成了兩個集群,出現了兩個"大腦",這就是所謂的"腦裂"現象。
對於這種情況,其實也可以看出來,原本應該是統一的一個集群對外提供服務的,現在變成了兩個集群同時對外提供服務,如果過了一會,斷了的網絡突然聯通了,那么此時就會出現問題了。兩個集群剛剛都對外提供服務了,數據該怎么合並,數據沖突怎么解決等等問題。
剛剛在說明腦裂場景時有一個前提條件就是沒有考慮過半機制,所以實際上Zookeeper集群中是不會輕易出現腦裂問題的,原因就在於過半機制。
Zookeeper腦裂是什么原因導致的?
主要原因是Zookeeper集群和Zookeeper client判斷超時並不能做到完全同步,也就是說可能一前一后,如果是集群先於client發現,那就會出現上面的情況。
同時,在發現並切換后通知各個客戶端也有先后快慢。一般出現這種情況的幾率很小,需要Leader節點與Zookeeper集群網絡斷開,但是與其他集群角色之間的網絡沒有問題,還要滿足上面那些情況,但是一旦出現就會引起很嚴重的后果,數據不一致。
Zookeeper是如何解決"腦裂"問題的?
要解決Split-Brain腦裂的問題,一般有下面幾種種方法:
- Quorums (法定人數) 方式: 比如3個節點的集群,Quorums = 2, 也就是說集群可以容忍1個節點失效,這時候還能選舉出1個lead,集群還可用。比如4個節點的集群,它的Quorums = 3,Quorums要超過3,相當於集群的容忍度還是1,如果2個節點失效,那么整個集群還是無效的。這是Zookeeper防止"腦裂"默認采用的方法;
- Redundant communications (冗余通信)方式:集群中采用多種通信方式,防止一種通信方式失效導致集群中的節點無法通信。
- Fencing (共享資源) 方式:比如能看到共享資源就表示在集群中,能夠獲得共享資源的鎖的就是Leader,看不到共享資源的,就不在集群中。
- 仲裁機制方式;
- 啟動磁盤鎖定方式。
要想避免Zookeeper"腦裂"情況其實也很簡單,在follower節點切換的時候不在檢查到老的Leader節點出現問題后馬上切換,而是在休眠一段足夠的時間,確保老的leader已經獲知變更並且做了相關的shutdown清理工作了,然后再注冊成為master就能避免這類問題了。
這個休眠時間一般定義為與Zookeeper定義的超時時間就夠了,但是這段時間內系統可能是不可用的,但是相對於數據不一致的后果來說還是值得的。
1)ZooKeeper默認采用了Quorums這種方式來防止"腦裂"現象
即只有集群中超過半數節點投票才能選舉出Leader。
這樣的方式可以確保Leader的唯一性,要么選出唯一的一個Leader,要么選舉失敗。在zookeeper中Quorums作用如下:
- 集群中最少的節點數用來選舉Leader保證集群可用;
- 通知客戶端數據已經安全保存前集群中最少數量的節點數已經保存了該數據。一旦這些節點保存了該數據,客戶端將被通知已經安全保存了,可以繼續其他任務。而集群中剩余的節點將會最終也保存了該數據。
假設某個Leader假死,其余的followers選舉出了一個新的Leader。這時,舊的Leader復活並且仍然認為自己是Leader,這個時候它向其他followers發出寫請求也是會被拒絕的。
因為每當新Leader產生時,會生成一個epoch標號(標識當前屬於那個Leader的統治時期),這個epoch是遞增的,followers如果確認了新的Leader存在,知道其epoch,就會拒絕epoch小於現任Leader epoch的所有請求。
那有沒有follower不知道新的Leader存在呢?有可能,但肯定不是大多數,否則新Leader無法產生。Zookeeper的寫也遵循quorum機制,因此,得不到大多數支持的寫是無效的,舊Leader即使各種認為自己是leader,依然沒有什么作用。
Zookeeper除了可以采用上面默認的Quorums方式來避免出現"腦裂",還可以可采用下面的預防措施:
2)添加冗余的心跳線,例如雙線條線,盡量減少“裂腦”發生機會
3)啟用磁盤鎖
正在服務一方鎖住共享磁盤,"裂腦"發生時,讓對方完全"搶不走"共享磁盤資源。但使用鎖磁盤也會有一個不小的問題,如果占用共享盤的一方不主動"解鎖",另一方就永遠得不到共享磁盤。
現實中假如服務節點突然死機或崩潰,就不可能執行解鎖命令。后備節點也就接管不了共享資源和應用服務。於是有人在HA中設計了"智能"鎖。即正在服務的一方只在發現心跳線全部斷開(察覺不到對端)時才啟用磁盤鎖。平時就不上鎖了。
4)設置仲裁機制
例如設置參考IP(如網關IP),當心跳線完全斷開時,2個節點都各自ping一下 參考IP,不通則表明斷點就出在本端,不僅"心跳"、還兼對外"服務"的本端網絡鏈路斷了,即使啟動(或繼續)應用服務也沒有用了,那就主動放棄競爭,讓能夠ping通參考IP的一端去起服務。
更保險一些,ping不通參考IP的一方干脆就自我重啟,以徹底釋放有可能還占用着的那些共享資源。
ZK的過半機制
在領導者選舉的過程中,如果某台zkServer獲得了超過半數的選票,則此zkServer就可以成為Leader了。
舉個簡單的例子:如果現在集群中有5台zkServer,那么half=5/2=2,那么也就是說,領導者選舉的過程中至少要有三台zkServer投了同一個zkServer,才會符合過半機制,才能選出來一個Leader。
那么Zookeeper選舉的過程中為什么一定要有一個過半機制驗證?
因為這樣不需要等待所有zkServer都投了同一個zkServer就可以選舉出來一個Leader了。這樣比較快,所以叫快速領導者選舉算法。
Zookeeper過半機制中為什么是大於,而不是大於等於?
這就是跟腦裂問題有關系了。比如回到上文出現腦裂問題的場景 (如上圖1):
當機房中間的網絡斷掉之后,機房1內的三台服務器會進行領導者選舉,但是此時過半機制的條件是 "節點數 > 3",也就是說至少要4台zkServer才能選出來一個Leader。
所以對於機房1來說它不能選出一個Leader,同樣機房2也不能選出一個Leader,這種情況下整個集群當機房間的網絡斷掉后,整個集群將沒有Leader。
而如果過半機制的條件是 "節點數 >= 3",那么機房1和機房2都會選出一個Leader,這樣就出現了腦裂。這就可以解釋為什么過半機制中是大於而不是大於等於,目的就是為了防止腦裂。
如果假設我們現在只有5台機器,也部署在兩個機房:

此時過半機制的條件是 "節點數 > 2",也就是至少要3台服務器才能選出一個Leader。
此時機房件的網絡斷開了,對於機房1來說是沒有影響的,Leader依然還是Leader;對於機房2來說是選不出來Leader的,此時整個集群中只有一個Leader。
因此總結得出,有了過半機制,對於一個Zookeeper集群來說,要么沒有Leader,要么只有1個Leader,這樣Zookeeper也就能避免了腦裂問題。
ZK的過半機制一定程度上也減少了腦裂情況的出現,起碼不會出現三個leader同時。ZK中的Epoch機制(時鍾)每次選舉都是遞增+1,當通信時需要判斷epoch是否一致,小於自己的則拋棄,大於自己則重置自己,等於則選舉;
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
過半選舉算法
ZK投票處理策略
投票信息包含 :所選舉leader的Serverid,Zxid,SelectionEpoch
- Epoch判斷,自身logicEpoch與SelectionEpoch判斷:大於、小於、等於。
- 優先檢查ZXID。ZXID比較大的服務器優先作為Leader。
- 如果ZXID相同,那么就比較myid。myid較大的服務器作為Leader服務器。
ZK中有三種選舉算法,分別是LeaderElection,FastLeaderElection,AuthLeaderElection,FastLeaderElection和AuthLeaderElection是類似的選舉算法,唯一區別是后者加入了認證信息, FastLeaderElection比LeaderElection更高效,后續的版本只保留FastLeaderElection。
理解:
在集群環境下多個節點啟動,ZK首先需要在多個節點中選出一個節點作為leader並處於Leading狀態,這樣就面臨一個選舉問題,同時選舉規則是什么樣的。“過半選舉算法”:投票選舉中獲得票數過半的節點勝出,即狀態從looking變為leading,效率更高。
官網資料描述:Clustered (Multi-Server) Setup,如下圖:

As long as a majority of the ensemble are up, the service will be available. Because Zookeeper requires a majority, it is best to use an odd number of machines. For example, with four machines ZooKeeper can only handle the failure of a single machine; if two machines fail, the remaining two machines do not constitute a majority. However, with five machines ZooKeeper can handle the failure of two machines.
以5台服務器講解思路:
- 服務器1啟動,此時只有它一台服務器啟動了,它發出去的Vote沒有任何響應,所以它的選舉狀態一直是LOOKING狀態;
- 服務器2啟動,它與最開始啟動的服務器1進行通信,互相交換自己的選舉結果,由於兩者都沒有歷史數據,所以id值較大的服務器2勝出,但是由於沒有達到超過半數以上的服務器都同意選舉它(這個例子中的半數以上是3),所以服務器1,2還是繼續保持LOOKING狀態.
- 服務器3啟動,根據前面的理論,分析有三台服務器選舉了它,服務器3成為服務器1,2,3中的老大,所以它成為了這次選舉的leader.
- 服務器4啟動,根據前面的分析,理論上服務器4應該是服務器1,2,3,4中最大的,但是由於前面已經有半數以上的服務器選舉了服務器3,所以它只能接收當小弟的命了.
- 服務器5啟動,同4一樣,當小弟.
假設5台中掛了2台(3、4),其中leader也掛掉:
leader和follower間有檢查心跳,需要同步數據 Leader節點掛了,整個Zookeeper集群將暫停對外服務,進入新一輪Leader選舉
1)服務器1、2、5發現與leader失聯,狀態轉為looking,開始新的投票 2)服務器1、2、5分別開始投票並廣播投票信息,自身Epoch自增; 3) 服務器1、2、5分別處理投票,判斷出leader分別廣播 4)根據投票處理邏輯會選出一台(2票過半) 5)各自服務器重新變更為leader、follower狀態 6)重新提供服務
源碼解析:
URL: FastLeaderElection
/**
* Starts a new round of leader election. Whenever our QuorumPeer
* changes its state to LOOKING, this method is invoked, and it
* sends notifications to all other peers.
*/
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
self.start_fle = Time.currentElapsedTime();
try {
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = minNotificationInterval;
synchronized (this) {
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();
SyncedLearnerTracker voteSet;
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if (n == null) {
if (manager.haveDelivered()) {
sendNotifications();
} else {
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout * 2;
notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
} else if (validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view for a replica in the current or next voting view.
*/
switch (n.state) {
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
break;
}
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Adding vote: from=" + n.sid
+ ", proposed leader=" + n.leader
+ ", proposed zxid=0x" + Long.toHexString(n.zxid)
+ ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
// don't care about the version if it's in LOOKING state
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
if (voteSet.hasAllQuorums()) {
// Verify if there is any change in the proposed leader
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: {}", n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if (n.electionEpoch == logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
setPeerState(n.leader, voteSet);
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify that
* a majority are following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
setPeerState(n.leader, voteSet);
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: " + n.state + " (n.state), " + n.sid + " (n.sid)");
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if (self.jmxLeaderElectionBean != null) {
MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
}
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch)
|| ((newEpoch == curEpoch)
&& ((newZxid > curZxid)
|| ((newZxid == curZxid)
&& (newId > curId)))));
歸納
在日常的ZK運維時需要注意以上場景在極端情況下出現問題,特別是腦裂的出現,可以采用:
過半選舉策略下部署原則:
- 服務器群部署要單數,如:3、5、7、...,單數是最容易選出leader的配置量。
- ZK允許節點最大損失數,原則就是“保證過半選舉正常”,多了就是浪費。
詳細的算法邏輯是很復雜要考慮很多情況,其中有個Epoch的概念(自增長),分為:LogicEpoch和ElectionEpoch,每次投票都有判斷每個投票周期是否一致等等。
