zookeeper 的由來:
分布式系統的很多難題,都是由於缺少協調機制造成的。在分布式協調這塊做得比較好的,有 Google 的 Chubby 以及 Apache 的 Zookeeper。Google Chubby 是一個分布式鎖服務,通過 Google Chubby 來解決分布式協作、Master 選舉等與分布式鎖服務相關的問題。
Zookeeper 也是類似,因為當時在雅虎內部的很多系統都需要依賴一個系統來進行分布式協調,但是谷歌的Chubby是不開源的,所以后來雅虎基於 Chubby 的思想開發了 zookeeper,並捐贈給了 Apache。
zookeeper解決了什么問題:
-
zookeeper是一個精簡的文件系統。這點它和hadoop有點像,但是zookeeper這個文件系統是管理小文件的,而hadoop是管理超大文件的。
-
zookeeper提供了豐富的“構件”,這些構件可以實現很多協調數據結構和協議的操作。例如:分布式隊列、分布式鎖以及一組同級節點的“領導者選舉”算法。
-
zookeeper是高可用的,它本身的穩定性是相當之好,分布式集群完全可以依賴zookeeper集群的管理,利用zookeeper避免分布式系統的單點故障的問題。
-
zookeeper采用了松耦合的交互模式。這點在zookeeper提供分布式鎖上表現最為明顯,zookeeper可以被用作一個約會機制,讓參入的進程不在了解其他進程的(或網絡)的情況下能夠彼此發現並進行交互,參入的各方甚至不必同時存在,只要在zookeeper留下一條消息,在該進程結束后,另外一個進程還可以讀取這條信息,從而解耦了各個節點之間的關系。
-
zookeeper為集群提供了一個共享存儲庫,集群可以從這里集中讀寫共享的信息,避免了每個節點的共享操作編程,減輕了分布式系統的開發難度。
-
zookeeper的設計采用的是觀察者的設計模式,zookeeper主要是負責存儲和管理大家關心的數據,然后接受觀察者的注冊,一旦這些數據的狀態發生變化,Zookeeper 就將負責通知已經在 Zookeeper 上注冊的那些觀察者做出相應的反應,從而實現集群中類似 Master/Slave 管理模式。
- 。。。。。
數據節點:
在 ZooKeeper中,每個數據節點都是有生命周期的,其生命周期的長短取決於數據節點的節點類型。在 ZooKeeper中,節點類型可以分為持久節點(PERSISTENT)、臨時節點(EPHEMERAL)和順序節點(SEQUENTIAL)三大類,具體在節點創建過程中,通過組合使用,可以生成以下四種組合型節點類型:
- 持久節點(PERSISTENT):持久節點是 ZooKeeper中最常見的一種節點類型。所謂持久節點,是指該數據節點被創建后,就會一直存在於 ZooKeeper服務器上,直到有刪除操作來主動清除這個節點。
- 持久順序節點(PERSISTENT SEQUENTIAL):持久順序節點的基本特性和持久節點是一致的,額外的特性表現在順序性上。在ZooKeeper中,每個父節點都會為它的第一級子節點維護一份順序,用於記錄下每個子節點創建的先后順序。基於這個順序特性,在創建子節點的時候,可以設置這個標記,那么在創建節點過程中, ZooKeeper會自動為給定節點名加上一個數字后綴,作為一個新的、完整的節點名。另外需要注意的是,這個數字后綴的上限是整型的最大值。
- 臨時節點(EPHEMERAL):和持久節點不同的是,臨時節點的生命周期和客戶端的會話綁定在一起,也就是說,如果客戶端會話失效,那么這個節點就會被自動清理掉。注意,這里提到的是客戶端會話失效,而非TCP連接斷開。另外, ZooKeeper規定了不能基於臨時節點來創建子節點,即臨時節點只能作為葉子節點。
- 臨時順序節點(EPHEMERAL SEQUENTIAL):臨時順序節點的基本特性和臨時節點也是一致的,同樣是在臨時節點的基礎上,添加了順序的特性。
如果自己設計一個類似 zookeeper 這個中間件,我們需要考慮到什么呢?:
1. 防止單點故障
如果要防止 zookeeper 這個中間件的單點故障,那就勢必要做集群。而且這個集群如果要滿足高性能要求的話,還得是一個高性能高可用的集群。高性能意味着這個集群能夠分擔客戶端的請求流量,高可用意味着集群中的某一個節點宕機以后,不影響整個集群的數據和繼續提供服務的可能性。結論: 所以這個中間件需要考慮到集群,而且這個集群還需要分攤客戶端的請求流量,實現服務的高性能。
2. 接着上面那個結論再來思考,如果要滿足這樣的一個高性能集群,我們最直觀的想法應該是,每個節點都能接收到請求,並且每個節點的數據都必須要保持一致。要實現各個節點的數據一致性,就勢必要一個 leader 節點負責協調和數據同步操作。這個我想大家都知道,如果在這樣一個集群中沒有 leader 節點,每個節點都可以接收所有請求,那么這個集群的數據同步的復雜度是非常大。結論:所以這個集群中涉及到數據同步以及會存在leader 節點
3.繼續思考,如何在這些節點中選舉出 leader 節點,以及leader 掛了以后,如何恢復呢?結論:所以 zookeeper 用了基於 paxos 理論所衍生出來的 ZAB 協議
.4. leader 節點如何和其他節點保證數據一致性,並且要求是強一致的。在分布式系統中,每一個機器節點雖然都能夠明確知道自己進行的事務操作過程是成功和失敗,但是卻無法直接獲取其他分布式節點的操作結果。所以當一個事務操作涉及到跨節點的時候,就需要用到分布式事務,分布式事務的數據一致性協議有 2PC 協議和3PC 協議。
Zookeeper 集群角色:
Leader 角色:Leader 服務器是整個 zookeeper 集群的核心,主要的工作任務有兩項1. 事務請求的唯一調度和處理者,保證集群事物處理的順序性2. 集群內部各服務器的調度者
Follower 角色:Follower 角色的主要職責是1. 處理客戶端非事務請求、轉發事務請求給 leader 服務器2. 參與事物請求 Proposal 的投票(需要半數以上服務器通過才能通知 leader commit 數據; Leader 發起的提案,要求 Follower 投票)3. 參與 Leader 選舉的投票
Observer 角色:Observer 是 zookeeper3.3 開始引入的一個全新的服務器角色,從字面來理解,該角色充當了觀察者的角色。觀察 zookeeper 集群中的最新狀態變化並將這些狀態變化同步到 observer 服務器上。Observer 的工作原理與follower 角色基本一致,而它和 follower 角色唯一的不同在於 observer 不參與任何形式的投票,包括事務請求Proposal的投票和leader選舉的投票。簡單來說,observer服務器只提供非事務請求服務,通常在於不影響集群事物處理能力的前提下提升集群非事務處理的能力
zookeeper 的集群:
如上圖,在 zookeeper 中,客戶端會隨機連接到 zookeeper 集群中的一個節點,如果是讀請求,就直接從當前節點中讀取數據,如果是寫請求,那么請求會被轉發給 leader 提交事務,然后 leader 會廣播事務,只要有超過半數節點寫入成功,那么寫請求就會被提交(類 2PC 事務)
通常 zookeeper 是由 2n+1 台 server 組成,每個 server 都知道彼此的存在。對於 2n+1 台 server,只要有 n+1 台(大多數)server 可用,整個系統保持可用。我們已經了解到,一個 zookeeper 集群如果要對外提供可用的服務,那么集群中必須要有過半的機器正常工作並且彼此之間能夠正常通信,基於這個特性,如果向搭建一個能夠允許 F 台機器down 掉的集群,那么就要部署 2*F+1 台服務器構成的zookeeper 集群。因此 3 台機器構成的 zookeeper 集群,能夠在掛掉一台機器后依然正常工作。一個 5 台機器集群的服務,能夠對 2 台機器down掉的情況下進行容災。如果一台由 6 台服務構成的集群,同樣只能掛掉 2 台機器。因此,5 台和 6 台在容災能力上並沒有明顯優勢,反而增加了網絡通信負擔。系統啟動時,集群中的 server 會選舉出一台server 為 Leader,其它的就作為 follower(這里先不考慮observer 角色)。
之所以要滿足這樣一個等式,是因為一個節點要成為集群中的 leader,需要有超過及群眾過半數的節點支持,這個涉及到 leader 選舉算法。同時也涉及到事務請求的提交投票。
所有事務請求必須由一個全局唯一的服務器來協調處理,這個服務器就是 Leader 服務器,其他的服務器就是follower。leader 服務器把客戶端的失去請求轉化成一個事務 Proposal(提議),並把這個 Proposal 分發給集群中的所有 Follower 服務器。之后 Leader 服務器需要等待所有Follower 服務器的反饋,一旦超過半數的 Follower 服務器進行了正確的反饋,那么 Leader 就會再次向所有的Follower 服務器發送 Commit 消息,要求各個 follower 節點對前面的一個 Proposal 進行提交;
ZAB協議:
ZAB(Zookeeper Atomic Broadcast) 協議是為分布式協調服務 ZooKeeper 專門設計的一種支持崩潰恢復的原子廣播協議。在 ZooKeeper 中,主要依賴 ZAB 協議來實現分布式數據一致性,基於該協議,ZooKeeper 實現了一種主備模式的系統架構來保持集群中各個副本之間的數據一致性。ZAB 協議包含兩種基本模式,分別是
1. 崩潰恢復
2. 原子廣播
當整個集群在啟動時,或者當 leader 節點出現網絡中斷、崩潰等情況時,ZAB 協議就會進入恢復模式並選舉產生新的 Leader,當 leader 服務器選舉出來后,並且集群中有過半的機器和該 leader 節點完成數據同步后(同步指的是數據同步,用來保證集群中過半的機器能夠和 leader 服務器的數據狀態保持一致),ZAB 協議就會退出恢復模式。當集群中已經有過半的 Follower 節點完成了和 Leader 狀態同步以后,那么整個集群就進入了消息廣播模式。這個時候,在 Leader 節點正常工作時,啟動一台新的服務器加入到集群,那這個服務器會直接進入數據恢復模式,和 leader 節點進行數據同步。同步完成后即可正常對外提供非事務請求的處理。
消息廣播的實現原理 :
消息廣播的過程實際上是一個簡化版本的二階段提交過程。如下圖:
1. leader 接收到消息請求后,將消息賦予一個全局唯一的64 位自增 id,叫:zxid,通過 zxid 的大小比較既可以實現因果有序這個特征
2. leader 為每個 follower 准備了一個 FIFO 隊列(通過 TCP協議來實現,以實現了全局有序這一個特點)將帶有 zxid的消息作為一個提案(proposal)分發給所有的 follower
3. 當 follower 接收到 proposal,先把 proposal 寫到磁盤,寫入成功以后再向 leader 回復一個 ack
4. 當 leader 接收到合法數量(超過半數節點)的 ACK 后,leader 就會向這些 follower 發送 commit 命令,同時會在本地執行該消息
5. 當 follower 收到消息的 commit 命令以后,會提交該消息。
leader 的投票過程,不需要 Observer 的 ack,也就是Observer 不需要參與投票過程,但是 Observer 必須要同步 Leader 的數據從而在處理請求的時候保證數據的一致性
崩潰恢復(數據恢復):
ZAB 協議的這個基於原子廣播協議的消息廣播過程,在正常情況下是沒有任何問題的,但是一旦 Leader 節點崩潰,或者由於網絡問題導致 Leader 服務器失去了過半的Follower 節點的聯系(leader 失去與過半 follower 節點聯系,可能leader 節點和 follower 節點之間產生了網絡分區,那么此時的 leader 不再是合法的 leader 了),那么就會進入到崩潰恢復模式。在 ZAB 協議中,為了保證程序的正確運行,整個恢復過程結束后需要選舉出一個新的Leader為了使 leader 掛了后系統能正常工作,需要解決以下兩個問題
1. 已經被處理的消息不能丟失:當 leader 收到合法數量 follower 的 ACKs 后,就向各個 follower 廣播 COMMIT 命令,同時也會在本地執行 COMMIT 並向連接的客戶端返回「成功」。但是如果在各個 follower 在收到 COMMIT 命令前leader就掛了,導致剩下的服務器並沒有執行都這條消息。leader 對事務消息發起 commit 操作,但是該消息在follower1 上執行了,但是 follower2 還沒有收到 commit,就已經掛了,而實際上客戶端已經收到該事務消息處理成功的回執了。所以在 zab 協議下需要保證所有機器都要執行這個事務消息
2. 被丟棄的消息不能再次出現:當 leader 接收到消息請求生成 proposal 后就掛了,其他 follower 並沒有收到此 proposal,因此經過恢復模式重新選了 leader 后,這條消息是被跳過的。 此時,之前掛了的 leader 重新啟動並注冊成了follower,他保留了被跳過消息的 proposal 狀態,與整個系統的狀態是不一致的,需要將其刪除。
ZAB 協議需要滿足上面兩種情況,就必須要設計一個leader 選舉算法:能夠確保已經被 leader 提交的事務Proposal能夠提交、同時丟棄已經被跳過的事務Proposal。針對這個要求
1. 如果 leader 選舉算法能夠保證新選舉出來的 Leader 服務器擁有集群中所有機器最高編號(ZXID 最大)的事務Proposal,那么就可以保證這個新選舉出來的 Leader 一定具有已經提交的提案。因為所有提案被 COMMIT 之前必須有超過半數的 follower ACK,即必須有超過半數節點的服務器的事務日志上有該提案的 proposal,因此,只要有合法數量的節點正常工作,就必然有一個節點保存了所有被 COMMIT 消息的 proposal 狀態另外一個,zxid 是 64 位,高 32 位是 epoch 編號,每經過一次 Leader 選舉產生一個新的 leader,新的 leader 會將epoch 號+1,低 32 位是消息計數器,每接收到一條消息這個值+1,新 leader 選舉后這個值重置為 0.這樣設計的好處在於老的 leader 掛了以后重啟,它不會被選舉為 leader,因此此時它的 zxid 肯定小於當前新的 leader。當老的leader 作為 follower 接入新的 leader 后,新的 leader 會讓它將所有的擁有舊的 epoch 號的未被 COMMIT 的proposal 清除
關於 ZXID :
zxid,也就是事務 id,為了保證事務的順序一致性,zookeeper 采用了遞增的事務 id 號(zxid)來標識事務。所有的提議(proposal)都在被提出的時候加上了 zxid。實現中 zxid 是一個 64 位的數字,它高 32 位是 epoch(ZAB 協議通過epoch 編號來區分 Leader 周期變化的策略)用來標識 leader 關系是否改變,每次一個 leader 被選出來,它都會有一個新的epoch=(原來的 epoch+1),標識當前屬於那個 leader 的統治時期。低 32 位用於遞增計數。epoch:可以理解為當前集群所處的年代或者周期,每個leader 就像皇帝,都有自己的年號,所以每次改朝換代,leader 變更之后,都會在前一個年代的基礎上加 1。這樣就算舊的 leader 崩潰恢復之后,也沒有人聽他的了,因為follower 只聽從當前年代的 leader 的命令。epoch 的變化大家可以做一個簡單的實驗,
1. 啟動一個 zookeeper 集群。
2. 在 /"dataDir"/zookeeper/VERSION-2 路 徑 下 會 看 到 一 個currentEpoch 文件。文件中顯示的是當前的 epoch
3. 把 leader 節點停機,這個時候在看 currentEpoch 會有變化。 隨着每次選舉新的 leader,epoch 都會發生變化
leader 選舉:
Leader 選舉會分兩個過程啟動的時候的 leader 選舉、 leader 崩潰的時候的的選舉服務器啟動時的 leader 選舉每個節點啟動的時候狀態都是 LOOKING,處於觀望狀態,接下來就開始進行選主流程進行 Leader 選舉,至少需要兩台機器,我們選取 3 台機器組成的服務器集群為例。在集群初始化階段,當有一台服務器 Server1 啟動時,它本身是無法進行和完成 Leader 選舉,當第二台服務器 Server2 啟動時,這個時候兩台機器可以相互通信,每台機器都試圖找到 Leader,於是進入 Leader 選舉過程。選舉過程如下:
(1) 每個 Server 發出一個投票。由於是初始情況,Server1和 Server2 都會將自己作為 Leader 服務器來進行投票,每次投票會包含所推舉的服務器的 myid 和 ZXID、epoch,使用(myid, ZXID,epoch)來表示,此時 Server1的投票為(1, 0),Server2 的投票為(2, 0),然后各自將這個投票發給集群中其他機器。
(2) 接受來自各個服務器的投票。集群的每個服務器收到投票后,首先判斷該投票的有效性,如檢查是否是本輪投票(epoch)、是否來自LOOKING狀態的服務器。
(3) 處理投票。針對每一個投票,服務器都需要將別人的投票和自己的投票進行 PK,PK 規則如下
i. 優先檢查 ZXID。ZXID 比較大的服務器優先作為Leader
ii. 如果 ZXID 相同,那么就比較 myid。myid 較大的服務器作為 Leader 服務器。
對於 Server1 而言,它的投票是(1, 0),接收 Server2的投票為(2, 0),首先會比較兩者的 ZXID,均為 0,再比較 myid,此時 Server2 的 myid 最大,於是更新自己的投票為(2, 0),然后重新投票,對於 Server2 而言,它不需要更新自己的投票,只是再次向集群中所有機器發出上一次投票信息即可。
(4) 統計投票。每次投票后,服務器都會統計投票信息,判斷是否已經有過半機器接受到相同的投票信息,對於 Server1、Server2 而言,都統計出集群中已經有兩台機器接受了(2, 0)的投票信息,此時便認為已經選出了 Leader。
(5) 改變服務器狀態。一旦確定了 Leader,每個服務器就會更新自己的狀態,如果是 Follower,那么就變更為FOLLOWING,如果是 Leader,就變更為 LEADING。
運行過程中的 leader 選舉:
當集群中的 leader 服務器出現宕機或者不可用的情況時,那么整個集群將無法對外提供服務,而是進入新一輪的Leader 選舉,服務器運行期間的 Leader 選舉和啟動時期的 Leader 選舉基本過程是一致的。
(1) 變更狀態。Leader 掛后,余下的非 Observer 服務器都會將自己的服務器狀態變更為 LOOKING,然后開始進入 Leader 選舉過程。
(2) 每個 Server 會發出一個投票。在運行期間,每個服務器上的 ZXID 可能不同,此時假定 Server1 的 ZXID 為123,Server3的ZXID為122;在第一輪投票中,Server1和 Server3 都會投自己,產生投票(1, 123),(3, 122),然后各自將投票發送給集群中所有機器。接收來自各個服務器的投票。與啟動時過程相同。
(3) 處理投票。與啟動時過程相同,此時,Server1 將會成為 Leader。
(4) 統計投票。與啟動時過程相同。
(5) 改變服務器的狀態。與啟動時過程相同
Leader 選舉源碼分析:
有了理論基礎以后,我們先讀一下源碼(zookeeper-3.4.12),看看他的實現邏輯。首先我們需要知道源碼入口,也就是Zookeeper啟動的主類: QuorumPeerMain 類的 main 方法開始:
public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try {//初始化主要邏輯 main.initializeAndRun(args); } //...異常捕獲 LOG.info("Exiting normally"); System.exit(0); }
進入 main.initializeAndRun(args) 可以看到:
protected void initializeAndRun(String[] args) throws ConfigException, IOException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } // 啟動后台定時任務異步執行清除任務,刪除垃圾數據 // Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); //判斷是集群還是單機 if (args.length == 1 && config.servers.size() > 0) { // 集群 runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone //單機 ZooKeeperServerMain.main(args); } }
進入 runFromConfig():
public void runFromConfig(QuorumPeerConfig config) throws IOException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); try {// 初始化NIOServerCnxnFactory ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); // 邏輯主線程 進行投票,選舉 quorumPeer = getQuorumPeer(); // 進入一系列的配置 quorumPeer.setQuorumPeers(config.getServers()); quorumPeer.setTxnFactory(new FileTxnSnapLog( new File(config.getDataLogDir()), new File(config.getDataDir()))); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); //配置 myid quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setQuorumVerifier(config.getQuorumVerifier()); // 為客戶端提供寫的server 即2181訪問端口的訪問功能 quorumPeer.setClientPortAddress(config.getClientPortAddress()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); // sets quorum sasl authentication configurations quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); if(quorumPeer.isQuorumSaslAuthEnabled()){ quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); // 初始化的工作 quorumPeer.initialize(); // 啟動主線程,QuorumPeer 重寫了 Thread.start 方法 quorumPeer.start(); quorumPeer.join();//使得線程之間的並行執行變為串行執行 } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } }
調用 QuorumPeer 的 start 方法:
@Override public synchronized void start() { //載入本地DB數據 主要還是epoch loadDataBase(); //啟動ZooKeeperThread線程 cnxnFactory.start(); //啟動leader選舉線程 startLeaderElection(); super.start(); }
loadDataBase() 主要是從本地文件中恢復數據,以及獲取最新的 zxid。
private void loadDataBase() { File updating = new File(getTxnFactory().getSnapDir(), UPDATING_EPOCH_FILENAME); try {//載入本地數據 zkDb.loadDataBase(); // load the epochs 加載ZXID long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; // 根據zxid的高32位是epoch號,低32位是事務id進行抽離epoch號 long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); try {//從${data}/version-2/currentEpochs文件中加載當前的epoch號 currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); //從 zxid中提取的epoch比文件里的epoch要大的話,並且沒有正在修改epoch if (epochOfZxid > currentEpoch && updating.exists()) { setCurrentEpoch(epochOfZxid);//設置位大的epoch if (!updating.delete()) { throw new IOException("Failed to delete " + updating.toString()); } } } // ........ //如果如果還比他大 拋出異常 if (epochOfZxid > currentEpoch) { throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid); } try {//再比較 acceptedEpoch acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME); } // ........ if (acceptedEpoch < currentEpoch) { throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch)); } // ....... }
接下去就是初始化選舉算法 leaderElection:
synchronized public void startLeaderElection() { try { // 根據myid zxid epoch 3個選舉參數創建Voto 對象,准備選舉 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } for (QuorumServer p : getView().values()) { if (p.id == myid) { myQuorumAddr = p.addr; break; } } if (myQuorumAddr == null) { throw new RuntimeException("My id " + myid + " not in the peer list"); } if (electionType == 0) {//如果是這個選舉策略,代表 LeaderElection選舉策略 try {//創建 UDP Socket udpSocket = new DatagramSocket(myQuorumAddr.getPort()); responder = new ResponderThread(); responder.start(); } catch (SocketException e) { throw new RuntimeException(e); } }//根據類型創建選舉算法 this.electionAlg = createElectionAlgorithm(electionType); }
進入選舉算法的初始化 createElectionAlgorithm():配置選舉算法,選舉算法有 3 種,可以通過在 zoo.cfg 里面進行配置,默認是 FastLeaderElection 選舉
protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; // 選擇選舉策略 //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3://Leader選舉IO負責類 qcm = createCnxnManager(); QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ // 啟動已綁定端口的選舉線程,等待其他服務器連接 listener.start(); //基於 TCP的選舉算法 le = new FastLeaderElection(this, qcm); } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }
繼續看 FastLeaderElection 的初始化動作,主要初始化了業務層的發送隊列和接收隊列 :
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){ this.stop = false; this.manager = manager; starter(self, manager); } // *********************************************** private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1; // 投票 發送隊列 阻塞 sendqueue = new LinkedBlockingQueue<ToSend>(); // 投票 接受隊列 阻塞 recvqueue = new LinkedBlockingQueue<Notification>(); this.messenger = new Messenger(manager); }
然后再 Messager 的構造函數里 初始化發送和接受兩個線程並且啟動線程。
Messenger(QuorumCnxManager manager) {//異步決策 // 創建 vote 發送線程 this.ws = new WorkerSender(manager); Thread t = new Thread(this.ws,"WorkerSender[myid=" + self.getId() + "]"); t.setDaemon(true); t.start();//啟動 // 創建 vote 接受線程 this.wr = new WorkerReceiver(manager); t = new Thread(this.wr,"WorkerReceiver[myid=" + self.getId() + "]"); t.setDaemon(true); t.start();//啟動 }
然后再回到 QuorumPeer.java。 FastLeaderElection 初始化完成以后,調用 super.start(),最終運行 QuorumPeer 的run 方法
public void run() { setName("QuorumPeer" + "[myid=" + getId() + "]" + cnxnFactory.getLocalAddress()); // 省略通過JMX初始化。來監控一些屬性的代碼 try { // Main loop 主循環 while (running) { switch (getPeerState()) { case LOOKING: //LOOKING 狀態,則進入選舉 if (Boolean.getBoolean("readonlymode.enabled")) { // 創建 ReadOnlyZooKeeperServer,但是不立即啟動 final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer( logFactory, this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb); //通過 Thread 異步解耦 Thread roZkMgr = new Thread() { public void run() { try { // lower-bound grace period to 2 secs sleep(Math.max(2000, tickTime)); if (ServerState.LOOKING.equals(getPeerState())) { roZk.startup(); } // ....... } }; try {//啟動 roZkMgr.start(); setBCVote(null); // 通過策略模式來決定當前用那個算法選舉 setCurrentVote(makeLEStrategy().lookForLeader()); // ......... } else { try { setBCVote(null); setCurrentVote(makeLEStrategy().lookForLeader()); //........ } break; //**************************************************************************** case OBSERVING: // Observing 針對 Observer角色的節點 try { LOG.info("OBSERVING"); setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e ); } finally { observer.shutdown(); setObserver(null); setPeerState(ServerState.LOOKING); } break; //***************************************************************************** case FOLLOWING:// 從節點狀態 try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); setPeerState(ServerState.LOOKING); } break; //********************************************************************** case LEADING: // leader 節點 LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } setPeerState(ServerState.LOOKING); } break; } } } // .......... }
由於是剛剛啟動,是 LOOKING 狀態。所以走第一條分支。調用 setCurrentVote(makeLEStrategy().lookForLeader());,最終根據上一步選擇的策略應該運行 FastLeaderElection 中的選舉算法,看一下 lookForLeader();
//開始選舉 Leader public Vote lookForLeader() throws InterruptedException { // ...省略一些代碼 try { // 收到的投票 HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); // 存儲選舉結果 HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = finalizeWait; // AtomicLong logicalclock = new AtomicLong(); synchronized(this){ logicalclock.incrementAndGet(); // 增加邏輯時鍾 // 修改自己的zxid epoch updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); // 發送投票 // Loop in which we exchange notifications until we find a leader while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ // 主循環 直到選舉出leader /* * Remove next notification from queue, times out after 2 times * the termination time */ //從IO進程里面 獲取投票結果,自己的投票也在里面 Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS); // 如果沒有獲取到足夠的通知久一直發送自己的選票,也就是持續進行選舉 if(n == null){ // 如果空了 就繼續發送 直到選舉出leader if(manager.haveDelivered()){ sendNotifications(); } else { // 消息沒發出去,可能其他集群沒啟動 繼續嘗試連接 manager.connectAll(); } /// 延長超時時間 int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } // 收到投票消息 查看是否屬於本集群內的消息 else if(self.getVotingView().containsKey(n.sid)) { switch (n.state) {// 判斷收到消息的節點狀態 case LOOKING: // If notification > current, replace and send messages out // 判斷epoch 是否大於 logicalclock ,如是,則是新一輪選舉 if (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); // 更新本地logicalclock recvset.clear(); // 清空接受隊列 // 一次性比較 myid epoch zxid 看此消息是否勝出 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, //此方法看下面代碼 getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { //投票結束修改票據為 leader票據 updateProposal(n.leader, n.zxid, n.peerEpoch); } else {//否則票據不變 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); // 繼續廣播票據,讓其他節點知道我現在的投票 //如果是epoch小於當前 忽略 } else if (n.electionEpoch < logicalclock.get()) { break; //如果 epoch 相同 跟上面一樣的比較 更新票據 廣播票據 } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } // 把最終票據放進接受隊列 用來做最后判斷 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); // 判斷選舉是否結束 默認算法是否超過半數同意 見下面代碼 if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // 一直等待 notification 到達 直到超時就返回null // final static int finalizeWait = 200; while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } // 確定 leader if (n == null) { // 修改狀態 self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); //返回最終投票結果 Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } } break; // 如果收到的選票狀態 不是LOOKING 比如剛剛加入已經選舉好的集群 // Observer 不參與選舉 case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break; case FOLLOWING: case LEADING: // 判斷 epoch 是否相同 if(n.electionEpoch == logicalclock.get()){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); // 投票是否結束 結束的話確認leader 是否有效 // 如果結束 修改自己的投票並且返回 if(ooePredicate(recvset, outofelection, n)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } //在加入一個已建立的集群之前,確認大多數人都在跟隨同一個Leader。 outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if(ooePredicate(outofelection, outofelection, n)) { synchronized(this){ logicalclock.set(n.electionEpoch); self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid); break; } } else { LOG.warn("Ignoring notification from non-cluster member " + n.sid); } } return null; } // ....... }
以上代碼就是整個選舉的核心。
- 首先更新logicalclock並通過 updateProposal 修改自己的選票信息,並且通過 sendNotifications 進行發送選票。
- 進入主循環進行本輪投票。
- 從recvqueue隊列中獲取一個投票信息,如果沒有獲取到足夠的選票通知一直發送自己的選票,也就是持續進行選舉,否則進入步驟4。
- 判斷投票信息中的選舉狀態:
- LOOKING狀態:
- 如果對方的Epoch大於本地的logicalclock,則更新本地的logicalclock並清空本地投票信息統計箱recvset,並將自己作為候選和投票中的leader進行比較,選擇大的作為新的投票,然后廣播出去,否則進入下面步驟2。
- 如果對方的Epoch小於本地的logicalclock,則忽略對方的投票,重新進入下一輪選舉流程,否則進入下面步驟3。
- 如果對方的Epoch等於本地的logicalclock,則比較當前本地被推選的leader和投票中的leader,選擇大的作為新的投票,然后廣播出去。
- 把對方的投票信息保存到本地投票統計箱recvset中,判斷當前被選舉的leader是否在投票中占了大多數(大於一半的server數量),如果是則需再等待finalizeWait時間(從recvqueue繼續poll投票消息)看是否有人修改了leader的候選,如果有則再將該投票信息再放回recvqueue中並重新開始下一輪循環,否則確定角色,結束選舉。
- OBSERVING狀態:不參與選舉。
- FOLLOWING/LEADING:
- 如果對方的Epoch等於本地的logicalclock,把對方的投票信息保存到本地投票統計箱recvset中,判斷對方的投票信息是否在recvset中占大多數並且確認自己確實為leader,如果是則確定角色,結束選舉,否則進入下面步驟2。
- 將對方的投票信息放入本地統計不參與投票信息箱outofelection中,判斷對方的投票信息是否在outofelection中占大多數並且確認自己確實為leader,如果是則更新logicalclock為當前epoch,並確定角色,結束選舉,否則進入下一輪選舉。
- LOOKING狀態:
上述選舉中是通過獲取到選票,其中根據選票中的3大元素跟本地進行比對。進入 totalOrderPredicate :
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); if(self.getQuorumVerifier().getWeight(newId) == 0){ return false; } /*如果以下三種情況之一成立,則返回true: * 1-選票中epoch更高 * 2-選票中epoch與當前epoch相同,但新zxid更高 * 3-選票中epoch與當前epoch相同,新zxid與當前zxid相同服務器id更高。 */ //這里判斷規則很明顯,先比較epoch 如果相等比較 zxid 繼續想等繼續比較 myid 大的為leader return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
選票方法,便利選票集合,查看是否有人選票超過一半,其實就是判斷是否選出了leader:
protected boolean termPredicate(HashMap<Long, Vote> votes,Vote vote) { HashSet<Long> set = new HashSet<Long>(); /* * First make the views consistent. Sometimes peers will have * different zxids for a server depending on timing. */ // 遍歷接收到的集合 把符合當前投票的 放入 Set for (Map.Entry<Long,Vote> entry : votes.entrySet()) { if (vote.equals(entry.getValue())){ set.add(entry.getKey()); } } // 統計票據,看是否過半 return self.getQuorumVerifier().containsQuorum(set); }
到這里為止,Leader選舉就結束了。我們再來看看消息如何廣播,看 sendNotifications:
private void sendNotifications() { for (QuorumServer server : self.getVotingView().values()) {// 循環發送 long sid = server.id; // 准備發送的消息實體 ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } sendqueue.offer(notmsg); // 使用offer 添加到隊列 會被sendWorker線程消費 } }
FastLeaderElection 選舉過程:
其實在這個投票過程中就涉及到幾個類,FastLeaderElection:FastLeaderElection 實現了 Election 接口,實現各服務器之間基於 TCP 協議進行選舉Notification:內部類,Notification 表示收到的選舉投票信息(其他服務器發來的選舉投票信息),其包含了被選舉者的 id、zxid、選舉周期等信息ToSend:ToSend表示發送給其他服務器的選舉投票信息,也包含了被選舉者的 id、zxid、選舉周期等信息Messenger : Messenger 包 含 了 WorkerReceiver 和WorkerSender 兩個內部類;WorkerReceiver 實現了 Runnable 接口,是選票接收器。其會不斷地從 QuorumCnxManager 中獲取其他服務器發來的選舉消息,並將其轉換成一個選票,然后保存到recvqueue 中WorkerSender 也實現了 Runnable 接口,為選票發送器,其會不斷地從 sendqueue 中獲取待發送的選票,並將其傳遞到底層 QuorumCnxManager 中
至於 Zookeeper是真么接受請求的,其實我們可以看一下代碼:
@Override public synchronized void start() { //載入本地DB數據 主要還是epoch
loadDataBase(); //啟動ZooKeeperThread線程
cnxnFactory.start(); //啟動leader選舉線程
startLeaderElection(); super.start(); }
其中 cnxnFactory.start(); 就是啟動了服務端的接受請求的線程,默認實現有兩個 NIO 及 Netty:
至於怎么設置請看如下代碼:
//這個是我們需要配置的屬性key
public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
static public ServerCnxnFactory createFactory() throws IOException { String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY); if (serverCnxnFactoryName == null) {//默認是NIO serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } try {//這里配置的類即Netty ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName) .getDeclaredConstructor().newInstance(); LOG.info("Using {} as server connection factory", serverCnxnFactoryName); return serverCnxnFactory; } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate "
+ serverCnxnFactoryName); ioe.initCause(e); throw ioe; } }
至於接下去的接受請求這里就不展開了,無非是通過NIO 或者Netty去處理請求。操作節點。