Reference: http://www.cnblogs.com/wuxl360/p/5817549.html
本節本來是要介紹ZooKeeper的實現原理,但是ZooKeeper的原理比較復雜,它涉及到了paxos算法、Zab協議、通信協議等相關知 識,理解起來比較抽象所以還需要借助一些應用場景,來幫我們理解。由於內容比較多,一口氣吃不成胖子,得慢慢來一步一個腳印,因此我對后期 ZooKeeper的學習規划如下:
第一階段:
|---理解ZooKeeper的應用
|---ZooKeeper是什么
|---ZooKeeper能干什么
|---ZooKeeper 怎么使用
第二階段:
|---理解ZooKeeper原理准備
|---了解paxos
|---理解 zab原理
|---理解選舉/同步流程
第三階段:
|---深入ZooKeeper原理
|---分析源碼
|---嘗試開發分布式應用
由於內容較多,而且理解較為復雜,所以每個階段分開來學習和介紹,那么本文主要介紹的的是第一階段,該階段一般 應該放在前面介紹,但感覺像一些ZooKeeper應用案例,如果沒有一定的ZooKeeper基礎,理解起來也比較抽象, 所以放在這介紹。大家可以對比一下前面的應用程序,來對比理解一下前面的那些應用到底用到ZooKeeper的那些功能,來進一步理解ZooKeeper 的實現理念,由於網上關於這方面的介紹比較多,如果一些可愛的博友對該內容已經比較了解,那么您可以不用往下看了,繼續下一步學習。
一、ZooKeeper產生背景
1.1 分布式的發展
分布式這個概念我想大家並不陌生,但真正實戰開始還要從google說起,很早以前在實驗室中分布式被人提出,可是說是計算機內入行較為復雜學習較為困難的技術,並且市場也並不成熟,因此大規模的商業應用一直未成出現,但從Google 發布了MapReduce 和DFS 以及Bigtable的論文之后,分布式在計算機界的格局就發生了變化,從架構上實現了分布式的難題,並且成熟的應用在了海量數據存儲和計算上,其集群的規模也是當前世界上最為龐大的。
以DFS 為基礎的分布式計算框架和key、value 數據高效的解決運算的瓶頸, 而且開發人員不用再寫復雜的分布式程序,只要底層框架完備開發人員只要用較少的代碼就可以完成分布式程序的開發,這使得開發人員只需要關注業務邏輯的即 可。Google 在業界技術上的領軍地位,讓業界望塵莫及的技術實力,IT 因此也是對Google 所退出的技術十分推崇。在最近幾年中分布式則是成為了海量數據存儲以及計算、高並發、高可靠性、高可用性的解決方案。
1.2 ZooKeeper的產生
眾所周知通常分布式架構都是中心化的設計,就是一個主控機連接多個處理節點。 問題可以從這里考慮,當主控機失效時,整個系統則就無法訪問了,所以保證系統的高可用性是非常關鍵之處,也就是要保證主控機的高可用性。分布式鎖就是一個 解決該問題的較好方案,多主控機搶一把鎖。在這里我們就涉及到了我們的重點Zookeeper。
ZooKeeper是什么,chubby 我想大家都不會陌生的,chubby 是實現Google 的一個分布式鎖的實現,運用到了paxos 算法解決的一個分布式事務管理的系統。Zookeeper 就是雅虎模仿強大的Google chubby 實現的一套分布式鎖管理系統。同時,Zookeeper 分布式服務框架是Apache Hadoop的一個子項目,它是一個針對大型分布式系統的可靠協調系統,它主要是用來解決分布式應用中經常遇到的一些數據管理問題,可以高可靠的維護元數據。提供的功能包括:配置維護、名字服務、分布式同步、組服務等。ZooKeeper的設計目標就是封裝好復雜易出錯的關鍵服務,將簡單易用的接口和性能高效、功能穩定的系統提供給用戶。
1.3 ZooKeeper的使用
Zookeeper 作為一個分布式的服務框架,主要用來解決分布式集群中應用系統的一致性問題,它能提供基於類似於文件系統的目錄節點樹方式的數據存儲,但是 Zookeeper 並不是用來專門存儲數據的,它的作用主要是用來維護和監控你存儲的數據的狀態變化。通過監控這些數據狀態的變化,從而可以達到基於數據的集群管理,后面將 會詳細介紹 Zookeeper 能夠解決的一些典型問題。
注意一下這里的"數據"是有限制的:
(1) 從數據大小來看:我們知道ZooKeeper的數據存儲在一個叫ReplicatedDataBase 的 數據庫中,該數據是一個內存數據庫,既然是在內存當中,我就應該知道該數據量就應該不會太大,這一點上就與hadoop的HDFS有了很大的區 別,HDFS的數據主要存儲在磁盤上,因此數據存儲主要是HDFS的事,而ZooKeeper主要是協調功能,並不是用來存儲數據的。
(2) 從數據類型來看:正如前面所說的,ZooKeeper的數據在內存中,由於內存空間的限制,那么我們就不能在上面隨心所欲的存儲數據,所以ZooKeeper存儲的數據都是我們所關心的數據而且數據量還不能太大,而且還會根據我們要以實現的功能來選擇相應的數據。簡單來說,干什么事存什么數據,ZooKeeper所實現的一切功能,都是由ZK節點的性質和該節點所關聯的數據實現的,至於關聯什么數據那就要看你干什么事了。
例如:
① 集群管理:利用臨時節點特性,節點關聯的是機器的主機名、IP地址等相關信息,集群單點故障也屬於該范疇。
② 統一命名:主要利用節點的唯一性和目錄節點樹結構。
③ 配置管理:節點關聯的是配置信息。
④ 分布式鎖:節點關聯的是要競爭的資源。
二、ZooKeeper應用場景
ZooKeeper是一個高可用的分布式數據管理與系統協調框架。基於對Paxos算法的實現,使該框架保證了分布式環境中數據的強一致性,也正是 基於這樣的特性,使得zookeeper能夠應用於很多場景。需要注意的是,ZK並不是生來就為這些場景設計,都是后來眾多開發者根據框架的特性,摸索出 來的典型使用方法。因此,我們也可以根據自己的需要來設計相應的場景實現。正如前文所提到的,ZooKeeper 實現的任何功能都離不開ZooKeeper的數據結構,任何功能的實現都是利用"Znode結構特性+節點關聯的數據"來實現的,好吧那么我們就看一下ZooKeeper數據結構有哪些特性。ZooKeeper數據結構如下圖所示:
圖2.1 ZooKeeper數據結構
Zookeeper 這種數據結構有如下這些特點:
① 每個子目錄項如 NameService 都被稱作為 znode,這個 znode 是被它所在的路徑唯一標識,如 Server1 這個 znode 的標識為 /NameService/Server1;
② znode 可以有子節點目錄,並且每個 znode 可以存儲數據,注意EPHEMERAL 類型的目錄節點不能有子節點目錄;
③ znode 是有版本的,每個 znode 中存儲的數據可以有多個版本,也就是一個訪問路徑中可以存儲多份數據;
④ znode 可以是臨時節點,一旦創建這個 znode 的客戶端與服務器失去聯系,這個 znode 也將自動刪除,Zookeeper 的客戶端和服務器通信采用長連接方式,每個客戶端和服務器通過心跳來保持連接,這個連接狀態稱為 session,如果 znode 是臨時節點,這個 session 失效,znode 也就刪除了;
⑤ znode 的目錄名可以自動編號,如 App1 已經存在,再創建的話,將會自動命名為 App2;
⑥ znode 可以被監控,包括這個目錄節點中存儲的數據的修改,子節點目錄的變化等,一旦變化可以通知設置監控的客戶端,這個是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基於這個特性實現的。
2.1數據發布與訂閱
(1) 典型場景描述
發布與訂閱即所謂的配置管理,顧名思義就是將數據發布到ZK節點上,供訂閱者動態獲取數據,實現配置信息的集中式管理和動態更新。例如全局的配置信息,地址列表等就非常適合使用。集中式的配置管理在應用集群中是非常常見的,一般商業公司內部都會實現一套集中的配置管理中心,應對不同的應用集群對於共享各自配置的需求,並且在配置變更時能夠通知到集群中的每一個機器。
(2) 應用
① 索引信息和集群中機器節點狀態存放在ZK的一些指定節點,供各個客戶端訂閱使用。
② 系統日志(經過處理后的)存儲,這些日志通常2-3天后被清除。
③ 應用中用到的一些配置信息集中管理,在應用啟動的時候主動來獲取一次,並且在節點上注冊一個Watcher,以后每次配置有更新,實時通知到應用,獲取最新配置信息。
④ 業務邏輯中需要用到的一些全局變量,比如一些消息中間件的消息隊列通常有個offset,這個offset存放在zk上,這樣集群中每個發送者都能知道當前的發送進度。
⑤ 系統中有些信息需要動態獲取,並且還會存在人工手動去修改這個信息。以前通常是暴露出接口,例如JMX接口,有了ZK后,只要將這些信息存放到ZK節點上即可。
(3) 應用舉例
例如:同一個應用系統需要多台 PC Server 運行,但是它們運行的應用系統的某些配置項是相同的,如果要修改這些相同的配置項,那么就必須同時修改每台運行這個應用系統的 PC Server,這樣非常麻煩而且容易出錯。將配置信息保存在 Zookeeper 的某個目錄節點中,然后將所有需要修改的應用機器監控配置信息的狀態,一旦配置信息發生變化,每台應用機器就會收到 Zookeeper 的通知,然后從 Zookeeper 獲取新的配置信息應用到系統中。ZooKeeper配置管理服務如下圖所示:
圖2.2 配置管理結構圖
Zookeeper很容易實現這種集中式的配置管理,比如將所需要的配置信息放到/Configuration 節點上,集群中所有機器一啟動就會通過Client對/Configuration這個節點進行監控【zk.exist("/Configuration″,true)】,並且實現Watcher回調方法process(),那么在zookeeper上/Configuration節點下數據發生變化的時候,每個機器都會收到通知,Watcher回調方法將會被執行,那么應用再取下數據即可【zk.getData("/Configuration″,false,null)】。
2.2統一命名服務(Name Service)
(1) 場景描述
分布式應用中,通常需要有一套完整的命名規則,既能夠產生唯一的名稱又便於人識別和記住,通常情況下用樹形的名稱結構是一個理想的選擇,樹形的名稱 結構是一個有層次的目錄結構,既對人友好又不會重復。說到這里你可能想到了 JNDI,沒錯 Zookeeper 的 Name Service 與 JNDI 能夠完成的功能是差不多的,它們都是將有層次的目錄結構關聯到一定資源上,但是Zookeeper的Name Service 更加是廣泛意義上的關聯,也許你並不需要將名稱關聯到特定資源上,你可能只需要一個不會重復名稱,就像數據庫中產生一個唯一的數字主鍵一樣。
(2) 應用
在分布式系統中,通過使用命名服務,客戶端應用能夠根據指定的名字來獲取資源服務的地址,提供者等信息。被命名的實體通常可以是集群中的機器,提供的服務地址,進程對象等等,這些我們都可以統稱他們為名字(Name)。其中較為常見的就是一些分布式服務框架中的服務地址列表。 通過調用ZK提供的創建節點的API,能夠很容易創建一個全局唯一的path,這個path就可以作為一個名稱。Name Service 已經是Zookeeper 內置的功能,你只要調用 Zookeeper 的 API 就能實現。如調用 create 接口就可以很容易創建一個目錄節點。
(3) 應用舉例
阿里開源的分布式服務框架Dubbo中使用ZooKeeper來作為其命名服務,維護全局的服務地址列表。在Dubbo實現中: 服務提供者在啟動的時候,向ZK上的指定節點/dubbo/${serviceName}/providers目錄下寫入自己的URL地址,這個操作就完成了服務的發布。 服務消費者啟 動的時候,訂閱/dubbo/${serviceName}/providers目錄下的提供者URL地址, 並向/dubbo/${serviceName} /consumers目錄下寫入自己的URL地址。 注意,所有向ZK上注冊的地址都是臨時節點,這樣就能夠保證服務提供者和消費者能夠自動感應資源的變化。 另外,Dubbo還有針對服務粒度的監控,方法是訂閱/dubbo/${serviceName}目錄下所有提供者和消費者的信息。
2.3分布通知/協調(Distribution of notification/coordination)
(1) 典型場景描述
ZooKeeper中特有watcher注冊與異步通知機制,能夠很好的實現分布式環境下不同系統之間的通知與協調,實現對數據變更的實時處理。使用方法通常是不同系統都對ZK上同一個znode進行注冊,監聽znode的變化(包括znode本身內容及子節點的),其中一個系統update了znode,那么另一個系統能夠收到通知,並作出相應處理。
(2) 應用
① 另一種心跳檢測機制:檢測系統和被檢測系統之間並不直接關聯起來,而是通過ZK上某個節點關聯,大大減少系統耦合。
② 另一種系統調度模式:某系統由控制台和推送系統兩部分組成,控制台的職責是控制推送系統進行相應的推送工作。管理人員在控制台作的一些操作,實際上是修改了ZK上某些節點的狀態,而ZK就把這些變化通知給他們注冊Watcher的客戶端,即推送系統,於是,作出相應的推送任務。
③ 另一種工作匯報模式:一些類似於任務分發系統,子任務啟動后,到ZK來注冊一個臨時節點,並且定時將自己的進度進行匯報(將進度寫回這個臨時節點),這樣任務管理者就能夠實時知道任務進度。
總之,使用zookeeper來進行分布式通知和協調能夠大大降低系統之間的耦合。
2.4分布式鎖(Distribute Lock)
(1) 場景描述
分布式鎖,這個主要得益於ZooKeeper為我們保證了數據的強一致性,即用戶只要完全相信每時每刻,zk集群中任意節點(一個zk server)上的相同znode的數據是一定是相同的。鎖服務可以分為兩類,一個是保持獨占,另一個是控制時序。
保持獨占,就是所有試圖來獲取這個鎖的客戶端,最終只有一個可以成功獲得這把 鎖。通常的做法是把ZK上的一個znode看作是一把鎖,通過create znode的方式來實現。所有客戶端都去創建 /distribute_lock 節點,最終成功創建的那個客戶端也即擁有了這把鎖。
控制時序,就是所有試圖來獲取這個鎖的客戶端,最終都是會被安排執行,只是有 個全局時序了。做法和上面基本類似,只是這里 /distribute_lock 已經預先存在,客戶端在它下面創建臨時有序節點。Zk的父節點(/distribute_lock)維持一份sequence,保證子節點創建的時序性, 從而也形成了每個客戶端的全局時序。
(2) 應用
共享鎖在同一個進程中很容易實現,但是在跨進程或者在不同 Server 之間就不好實現了。Zookeeper 卻很容易實現這個功能,實現方式也是需要獲得鎖的 Server 創建一個 EPHEMERAL_SEQUENTIAL 目錄節點,然后調用 getChildren方法獲取當前的目錄節點列表中最小的目錄節點是不是就是自己創建的目錄節點,如果正是自己創建的,那么它就獲得了這個鎖,如果不是那么它就調用 exists(String path, boolean watch) 方法並監控 Zookeeper 上目錄節點列表的變化,一直到自己創建的節點是列表中最小編號的目錄節點,從而獲得鎖,釋放鎖很簡單,只要刪除前面它自己所創建的目錄節點就行了。
圖 2.3 ZooKeeper實現Locks的流程圖
代碼清單1 TestMainClient 代碼
package org.zk.leader.election; import org.apache.log4j.xml.DOMConfigurator; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; /** * TestMainClient * <p/> * Author By: sunddenly工作室 * Created Date: 2014-11-13 */ public class TestMainClient implements Watcher { protected static ZooKeeper zk = null; protected static Integer mutex; int sessionTimeout = 10000; protected String root; public TestMainClient(String connectString) { if(zk == null){ try { String configFile = this.getClass().getResource("/").getPath()+"org/zk/leader/election/log4j.xml"; DOMConfigurator.configure(configFile); System.out.println("創建一個新的連接:"); zk = new ZooKeeper(connectString, sessionTimeout, this); mutex = new Integer(-1); } catch (IOException e) { zk = null; } } } synchronized public void process(WatchedEvent event) { synchronized (mutex) { mutex.notify(); } } }
清單 2 Locks 代碼
package org.zk.locks; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat; import org.zk.leader.election.TestMainClient; import java.util.Arrays; import java.util.List; /** * locks * <p/> * Author By: sunddenly工作室 * Created Date: 2014-11-13 16:49:40 */ public class Locks extends TestMainClient { public static final Logger logger = Logger.getLogger(Locks.class); String myZnode; public Locks(String connectString, String root) { super(connectString); this.root = root; if (zk != null) { try { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { logger.error(e); } catch (InterruptedException e) { logger.error(e); } } } void getLock() throws KeeperException, InterruptedException{ List<String> list = zk.getChildren(root, false); String[] nodes = list.toArray(new String[list.size()]); Arrays.sort(nodes); if(myZnode.equals(root+"/"+nodes[0])){ doAction(); } else{ waitForLock(nodes[0]); } } void check() throws InterruptedException, KeeperException { myZnode = zk.create(root + "/lock_" , new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); getLock(); } void waitForLock(String lower) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true); if(stat != null){ mutex.wait(); } else{ getLock(); } } @Override public void process(WatchedEvent event) { if(event.getType() == Event.EventType.NodeDeleted){ System.out.println("得到通知"); super.process(event); doAction(); } } /** * 執行其他任務 */ private void doAction(){ System.out.println("同步隊列已經得到同步,可以開始執行后面的任務了"); } public static void main(String[] args) { String connectString = "localhost:2181"; Locks lk = new Locks(connectString, "/locks"); try { lk.check(); } catch (InterruptedException e) { logger.error(e); } catch (KeeperException e) { logger.error(e); } } }
2.5 集群管理(Cluster Management)
(1) 典型場景描述
集群機器監控:
這通常用於那種對集群中機器狀態,機器在線率有較高要求的場景,能夠快速對集群中機器變化作出響應。這樣的場景中,往往有一個監控系統,實時檢測集 群機器是否存活。過去的做法通常是:監控系統通過某種手段(比如ping)定時檢測每個機器,或者每個機器自己定時向監控系統匯報"我還活着"。 這種做法可行,但是存在兩個比較明顯的問題:
① 集群中機器有變動的時候,牽連修改的東西比較多。
② 有一定的延時。
利用ZooKeeper中兩個特性,就可以實施另一種集群機器存活性監控系統:
① 客戶端在節點 x 上注冊一個Watcher,那么如果 x 的子節點變化了,會通知該客戶端。
② 創建EPHEMERAL類型的節點,一旦客戶端和服務器的會話結束或過期,那么該節點就會消失。
Master選舉:
Master選舉則是zookeeper中最為經典的使用場景了,在分布式環境中,相同的業務應用分布在不同的機器上,有些業務邏輯,例如一些耗時的計算,網絡I/O處,往往只需要讓整個集群中的某一台機器進行執行,其余機器可以共享這個結果,這樣可以大大減少重復勞動,提高性能,於是這個master選舉便是這種場景下的碰到的主要問題。
利用ZooKeeper中兩個特性,就可以實施另一種集群中Master選舉:
① 利用ZooKeeper的強一致性,能夠保證在分布式高並發情況下節點創建的全局唯一性,即:同時有多個客戶端請求創建 /Master 節點,最終一定只有一個客戶端請求能夠創建成功。利用這個特性,就能很輕易的在分布式環境中進行集群選舉了。
②另外,這種場景演化一下,就是動態Master選舉。這就要用到 EPHEMERAL_SEQUENTIAL類型節點的特性了,這樣每個節點會自動被編號。允許所有請求都能夠創建成功,但是得有個創建順序,每次選取序列號最小的那個機器作為Master 。
(2) 應用
在搜索系統中,如果集群中每個機器都生成一份全量索引,不僅耗時,而且不能保證彼此間索引數據一致。因此讓集群中的Master來迚行全量索引的生 成,然后同步到集群中其它機器。另外,Master選丼的容災措施是,可以隨時迚行手動挃定master,就是說應用在zk在無法獲取master信息 時,可以通過比如http方式,向一個地方獲取master。 在Hbase中,也是使用ZooKeeper來實現動態HMaster的選舉。在Hbase實現中,會在ZK上存儲一些ROOT表的地址和HMaster 的地址,HRegionServer也會把自己以臨時節點(Ephemeral)的方式注冊到Zookeeper中,使得HMaster可以隨時感知到各 個HRegionServer的存活狀態,同時,一旦HMaster出現問題,會重新選丼出一個HMaster來運行,從而避免了HMaster的單點問 題的存活狀態,同時,一旦HMaster出現問題,會重新選丼出一個HMaster來運行,從而避免了HMaster的單點問題。
(3) 應用舉例
集群監控:
應用集群中,我們常常需要讓每一個機器知道集群中或依賴的其他某一個集群中哪些機器是活着的,並且在集群機器因為宕機,網絡斷鏈等原因能夠不在人工 介入的情況下迅速通知到每一個機器,Zookeeper 能夠很容易的實現集群管理的功能,如有多台 Server 組成一個服務集群,那么必須要一個"總管"知道當前集群中每台機器的服務狀態,一旦有機器不能提供服務,集群中其它集群必須知道,從而做出調整重新分配服 務策略。同樣當增加集群的服務能力時,就會增加一台或多台 Server,同樣也必須讓"總管"知道,這就是ZooKeeper的集群監控功能。
圖2.4 集群管理結構圖
比如我在zookeeper服務器端有一個znode叫/Configuration,那么集群中每一個機器啟動的時候都去這個節點下創建一個EPHEMERAL類型的節點,比如server1創建/Configuration /Server1,server2創建/Configuration /Server1,然后Server1和Server2都watch /Configuration 這個父節點,那么也就是這個父節點下數據或者子節點變化都會通知對該節點進行watch的客戶端。因為EPHEMERAL類型節點有一個很重要的特性,就 是客戶端和服務器端連接斷掉或者session過期就會使節點消失,那么在某一個機器掛掉或者斷鏈的時候,其對應的節點就會消 失,然后集群中所有對/Configuration進行watch的客戶端都會收到通知,然后取得最新列表即可。
Master選舉:
Zookeeper 不僅能夠維護當前的集群中機器的服務狀態,而且能夠選出一個"總管",讓這個總管來管理集群,這就是 Zookeeper 的另一個功能 Leader Election。Zookeeper 如何實現 Leader Election,也就是選出一個 Master Server。和前面的一樣每台 Server 創建一個 EPHEMERAL 目錄節點,不同的是它還是一個 SEQUENTIAL 目錄節點,所以它是個 EPHEMERAL_SEQUENTIAL 目錄節點。之所以它是 EPHEMERAL_SEQUENTIAL 目錄節點,是因為我們可以給每台 Server 編號,我們可以選擇當前是最小編號的 Server 為 Master,假如這個最小編號的 Server 死去,由於是 EPHEMERAL 節點,死去的 Server 對應的節點也被刪除,所以當前的節點列表中又出現一個最小編號的節點,我們就選擇這個節點為當前 Master。這樣就實現了動態選擇 Master,避免了傳統意義上單 Master 容易出現單點故障的問題。
清單 3 Leader Election代碼
package org.zk.leader.election; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat; import java.net.InetAddress; import java.net.UnknownHostException; /** * LeaderElection * <p/> * Author By: sunddenly工作室 * Created Date: 2014-11-13 */ public class LeaderElection extends TestMainClient { public static final Logger logger = Logger.getLogger(LeaderElection.class); public LeaderElection(String connectString, String root) { super(connectString); this.root = root; if (zk != null) { try { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { logger.error(e); } catch (InterruptedException e) { logger.error(e); } } } void findLeader() throws InterruptedException, UnknownHostException, KeeperException { byte[] leader = null; try { leader = zk.getData(root + "/leader", true, null); } catch (KeeperException e) { if (e instanceof KeeperException.NoNodeException) { logger.error(e); } else { throw e; } } if (leader != null) { following(); } else { String newLeader = null; byte[] localhost = InetAddress.getLocalHost().getAddress(); try { newLeader = zk.create(root + "/leader", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (KeeperException e) { if (e instanceof KeeperException.NodeExistsException) { logger.error(e); } else { throw e; } } if (newLeader != null) { leading(); } else { mutex.wait(); } } } @Override public void process(WatchedEvent event) { if (event.getPath().equals(root + "/leader") && event.getType() == Event.EventType.NodeCreated) { System.out.println("得到通知"); super.process(event); following(); } } void leading() { System.out.println("成為領導者"); } void following() { System.out.println("成為組成員"); } public static void main(String[] args) { String connectString = "localhost:2181"; LeaderElection le = new LeaderElection(connectString, "/GroupMembers"); try { le.findLeader(); } catch (Exception e) { logger.error(e); } } }
2.6 隊列管理
Zookeeper 可以處理兩種類型的隊列:
① 當一個隊列的成員都聚齊時,這個隊列才可用,否則一直等待所有成員到達,這種是同步隊列。
② 隊列按照 FIFO 方式進行入隊和出隊操作,例如實現生產者和消費者模型。
(1) 同步隊列用 Zookeeper 實現的實現思路如下:
創建一個父目錄 /synchronizing,每個成員都監控標志(Set Watch)位目錄 /synchronizing/start 是否存在,然后每個成員都加入這個隊列,加入隊列的方式就是創建 /synchronizing/member_i 的臨時目錄節點,然后每個成員獲取 / synchronizing 目錄的所有目錄節點,也就是 member_i。判斷 i 的值是否已經是成員的個數,如果小於成員個數等待 /synchronizing/start 的出現,如果已經相等就創建 /synchronizing/start。
用下面的流程圖更容易理解:
圖 2.5 同步隊列流程圖
清單 4 Synchronizing 代碼
package org.zk.queue; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; import org.zk.leader.election.TestMainClient; /** * Synchronizing * <p/> * Author By: sunddenly工作室 * Created Date: 2014-11-13 */ public class Synchronizing extends TestMainClient { int size; String name; public static final Logger logger = Logger.getLogger(Synchronizing.class); /** * 構造函數 * * @param connectString 服務器連接 * @param root 根目錄 * @param size 隊列大小 */ Synchronizing(String connectString, String root, int size) { super(connectString); this.root = root; this.size = size; if (zk != null) { try { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (KeeperException e) { logger.error(e); } catch (InterruptedException e) { logger.error(e); } } try { name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()); } catch (UnknownHostException e) { logger.error(e); } } /** * 加入隊列 * * @return * @throws KeeperException * @throws InterruptedException */ void addQueue() throws KeeperException, InterruptedException{ zk.exists(root + "/start",true); zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); synchronized (mutex) { List<String> list = zk.getChildren(root, false); if (list.size() < size) { mutex.wait(); } else { zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } } @Override public void process(WatchedEvent event) { if(event.getPath().equals(root + "/start") && event.getType() == Event.EventType.NodeCreated){ System.out.println("得到通知"); super.process(event); doAction(); } } /** * 執行其他任務 */ private void doAction(){ System.out.println("同步隊列已經得到同步,可以開始執行后面的任務了"); } public static void main(String args[]) { //啟動Server String connectString = "localhost:2181"; int size = 1; Synchronizing b = new Synchronizing(connectString, "/synchronizing", size); try{ b.addQueue(); } catch (KeeperException e){ logger.error(e); } catch (InterruptedException e){ logger.error(e); } } }
(2) FIFO 隊列用 Zookeeper 實現思路如下:
實現的思路也非常簡單,就是在特定的目錄下創建 SEQUENTIAL 類型的子目錄 /queue_i,這樣就能保證所有成員加入隊列時都是有編號的,出隊列時通過 getChildren( ) 方法可以返回當前所有的隊列中的元素,然后消費其中最小的一個,這樣就能保證 FIFO。
下面是生產者和消費者這種隊列形式的示例代碼
清單 5 FIFOQueue 代碼
import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat; import java.nio.ByteBuffer; import java.util.List; /** * FIFOQueue * <p/> * Author By: sunddenly工作室 * Created Date: 2014-11-13 */ public class FIFOQueue extends TestMainClient{ public static final Logger logger = Logger.getLogger(FIFOQueue.class); /** * Constructor * * @param connectString * @param root */ FIFOQueue(String connectString, String root) { super(connectString); this.root = root; if (zk != null) { try { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (KeeperException e) { logger.error(e); } catch (InterruptedException e) { logger.error(e); } } } /** * 生產者 * * @param i * @return */ boolean produce(int i) throws KeeperException, InterruptedException{ ByteBuffer b = ByteBuffer.allocate(4); byte[] value; b.putInt(i); value = b.array(); zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true; } /** * 消費者 * * @return * @throws KeeperException * @throws InterruptedException */ int consume() throws KeeperException, InterruptedException{ int retvalue = -1; Stat stat = null; while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() == 0) { mutex.wait(); } else { Integer min = new Integer(list.get(0).substring(7)); for(String s : list){ Integer tempValue = new Integer(s.substring(7)); if(tempValue < min) min = tempValue; } byte[] b = zk.getData(root + "/element" + min,false, stat); zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } } @Override public void process(WatchedEvent event) { super.process(event); } public static void main(String args[]) { //啟動Server TestMainServer.start(); String connectString = "localhost:"+TestMainServer.CLIENT_PORT; FIFOQueue q = new FIFOQueue(connectString, "/app1"); int i; Integer max = new Integer(5); System.out.println("Producer"); for (i = 0; i < max; i++) try{ q.produce(10 + i); } catch (KeeperException e){ logger.error(e); } catch (InterruptedException e){ logger.error(e); } for (i = 0; i < max; i++) { try{ int r = q.consume(); System.out.println("Item: " + r); } catch (KeeperException e){ i--; logger.error(e); } catch (InterruptedException e){ logger.error(e); } } } }
三、ZooKeeper實際應用
假設我們的集群有:
(1) 20個搜索引擎的服務器:每個負責總索引中的一部分的搜索任務。
① 搜索引擎的服務器中的15個服務器現在提供搜索服務。
② 5個服務器正在生成索引。
這20個搜索引擎的服務器,經常要讓正在提供搜索服務的服務器停止提供服務開始生成索引,或生成索引的服務器已經把索引生成完成可以搜索提供服務了。
(2) 一個總服務器:負責向這20個搜索引擎的服務器發出搜索請求並合並結果集。
(3) 一個備用的總服務器:負責當總服務器宕機時替換總服務器。
(4) 一個web的cgi:向總服務器發出搜索請求。
使用Zookeeper可以保證:
(1) 總服務器:自動感知有多少提供搜索引擎的服務器,並向這些服務器發出搜索請求。
(2) 備用的總服務器:宕機時自動啟用備用的總服務器。
(3) web的cgi:能夠自動地獲知總服務器的網絡地址變化。
(4) 實現如下:
① 提供搜索引擎的服務器都在Zookeeper中創建znode,zk.create("/search/nodes/node1", "hostname".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);
② 總服務器可以從Zookeeper中獲取一個znode的子節點的列表,zk.getChildren("/search/nodes", true);
③ 總服務器遍歷這些子節點,並獲取子節點的數據生成提供搜索引擎的服務器列表;
④ 當總服務器接收到子節點改變的事件信息,重新返回第二步;
⑤ 總服務器在Zookeeper中創建節點,zk.create("/search/master", "hostname".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);
⑥ 備用的總服務器監控Zookeeper中的"/search/master"節點。當這個znode的節點數據改變時,把自己啟動變成總服務器,並把自己的網絡地址數據放進這個節點。
⑦ web的cgi從Zookeeper中"/search/master"節點獲取總服務器的網絡地址數據,並向其發送搜索請求。
⑧ web的cgi監控Zookeeper中的"/search/master"節點,當這個znode的節點數據改變時,從這個節點獲取總服務器的網絡地址數據,並改變當前的總服務器的網絡地址。