ZooKeeper全面介紹


ZooKeeper簡介
ZooKeeper是分布式服務框架,主要是用來解決分布式應用中經常遇到的一些數據管理問題,如: 統一命名服務、狀態同步服務、集群管理、分布式應用配置項的管理等等。
 
ZooKeeper是Apache的子項目,之前是Hadoop項目的一部分,使用Java實現,最新的版本可以通過官網  http://hadoop.apache.org/zookeeper/來獲取。
 
ZooKeeper基本概念
角色
ZooKeeper中的角色主要有以下三類,如下表所示:
Open in new window
 
系統模型如圖所示:
Open in new window
 
設計目的
1. 最終一致性:client不論連接到哪個Server,展示給它都是同一個視圖,這是ZooKeeper最重要的性能。
2. 可靠性:具有簡單、健壯、良好的性能,如果消息m被到一台服務器接受,那么它將被所有的服務器接受。
3. 實時性:ZooKeeper保證客戶端將在一個時間間隔范圍內獲得服務器的更新信息,或者服務器失效的信息。但由於網絡延時等原因,ZooKeeper不能保證兩個客戶端能同時得到剛更新的數據,如果需要最新數據,應該在讀數據之前調用sync()接口。
4. 等待無關(wait-free):慢的或者失效的client不得干預快速的client的請求,使得每個client都能有效的等待。
5. 原子性:更新只能成功或者失敗,沒有中間狀態。
6. 順序性:包括全局有序和偏序兩種:全局有序是指如果在一台服務器上消息a在消息b前發布,則在所有Server上消息a都將在消息b前被發布;偏序是指如果一個消息b在消息a后被同一個發送者發布,a必將排在b前面。
 
ZooKeeper工作原理
ZooKeeper的核心是廣播,這個機制保證了各個Server之間的同步。實現這個機制的協議叫做Zab協議。Zab協議有兩種模式,它們分別是恢復模式(選主)和廣播模式(同步)。當服務啟動或者在領導者崩潰后,Zab就進入了恢復模式,當領導者被選舉出來,且大多數Server完成了和leader的狀態同步以后,恢復模式就結束了。狀態同步保證了leader和Server具有相同的系統狀態。為了保證事務的順序一致性,ZooKeeper采用了遞增的事務id號 (zxid)來標識事務。所有的提議(proposal)都在被提出的時候加上了zxid。實現中zxid是一個64位的數字,它高32位是epoch用 來標識leader關系是否改變,每次一個leader被選出來,它都會有一個新的epoch,標識當前屬於那個leader的統治時期。低32位用於遞增計數。
每個Server在工作過程中有三種狀態:
LOOKING:當前Server不知道leader是誰,正在搜尋。
LEADING:當前Server即為選舉出來的leader。
FOLLOWING:leader已經選舉出來,當前Server與之同步。
 
選主流程
當 leader崩潰或者leader失去大多數的follower,這時候zk進入恢復模式,恢復模式需要重新選舉出一個新的leader,讓所有的 Server都恢復到一個正確的狀態。Zk的選舉算法有兩種:一種是基於basic paxos實現的,另外一種是基於fast paxos算法實現的。系統默認的選舉算法為fast paxos。
basic paxos流程:
1 .選舉線程由當前Server發起選舉的線程擔任,其主要功能是對投票結果進行統計,並選出推薦的Server;
2 .選舉線程首先向所有Server發起一次詢問(包括自己);
3 .選舉線程收到回復后,驗證是否是自己發起的詢問(驗證zxid是否一致),然后獲取對方的id(myid),並存儲到當前詢問對象列表中,最后獲取對方提議的leader相關信息(id,zxid),並將這些信息存儲到當次選舉的投票記錄表中;
4. 收到所有Server回復以后,就計算出zxid最大的那個Server,並將這個Server相關信息設置成下一次要投票的Server;
5. 線程將當前zxid最大的Server設置為當前Server要推薦的Leader,如果此時獲勝的Server獲得n/2 + 1的Server票數, 設置當前推薦的leader為獲勝的Server,將根據獲勝的Server相關信息設置自己的狀態,否則,繼續這個過程,直到leader被選舉出來。
通 過流程分析我們可以得出:要使Leader獲得多數Server的支持,則Server總數必須是奇數2n+1,且存活的Server的數目不得少於 n+1.每個Server啟動后都會重復以上流程。在恢復模式下,如果是剛從崩潰狀態恢復的或者剛啟動的server還會從磁盤快照中恢復數據和會話信 息,zk會記錄事務日志並定期進行快照,方便在恢復時進行狀態恢復。
選主的具體流程圖如下所示:
Open in new window
 
 
fast paxos流程是在選舉過程中,某Server首先向所有Server提議自己要成為leader,當其它Server收到提議以后,解決epoch和 zxid的沖突,並接受對方的提議,然后向對方發送接受提議完成的消息,重復這個流程,最后一定能選舉出Leader。其流程圖如下所示:
Open in new window
 
同步流程
選完leader以后,zk就進入狀態同步過程。
1. leader等待server連接;
2 .Follower連接leader,將最大的zxid發送給leader;
3 .Leader根據follower的zxid確定同步點;
4 .完成同步后通知follower 已經成為uptodate狀態;
5 .Follower收到uptodate消息后,又可以重新接受client的請求進行服務了。
流程圖如下所示:
Open in new window
 
工作流程
 
Leader工作流程
Leader主要有三個功能:
1 .恢復數據;
2 .維持與Learner的心跳,接收Learner請求並判斷Learner的請求消息類型;
3 .Learner的消息類型主要有PING消息、REQUEST消息、ACK消息、REVALIDATE消息,根據不同的消息類型,進行不同的處理。
PING 消息是指Learner的心跳信息;REQUEST消息是Follower發送的提議信息,包括寫請求及同步請求;ACK消息是Follower的對提議 的回復,超過半數的Follower通過,則commit該提議;REVALIDATE消息是用來延長SESSION有效時間。
Leader的工作流程簡圖如下所示:
Open in new window
 
 
Follower工作流程
Follower主要有四個功能:
1. 向Leader發送請求(PING消息、REQUEST消息、ACK消息、REVALIDATE消息);
2 .接收Leader消息並進行處理;
3 .接收Client的請求,如果為寫請求,發送給Leader進行投票;
4 .返回Client結果。
Follower的消息循環處理如下幾種來自Leader的消息:
1 .PING消息: 心跳消息;
2 .PROPOSAL消息:Leader發起的提案,要求Follower投票;
3 .COMMIT消息:服務器端最新一次提案的信息;
4 .UPTODATE消息:表明同步完成;
5 .REVALIDATE消息:根據Leader的REVALIDATE結果,關閉待revalidate的session還是允許其接受消息;
6 .SYNC消息:返回SYNC結果到客戶端,這個消息最初由客戶端發起,用來強制得到最新的更新。
Follower的工作流程簡圖如下所示:
Open in new window
 

數據模型

Zookeeper 會維護一個具有層次關系的數據結構,它非常類似於一個標准的文件系統。
圖 1 Zookeeper 數據結構 
 
Zookeeper 這種數據結構有如下這些特點:
1. 每個子目錄項如 NameService 都被稱作為 znode,這個 znode 是被它所在的路徑唯一標識,如 Server1 這個 znode 的標識為 /NameService/Server1(圖上有一個錯誤,大家自行發現吧:-))。
2. znode 可以有子節點目錄,並且每個 znode 可以存儲數據,注意 EPHEMERAL 類型的目錄節點不能有子節點目錄。
3. znode 是有版本的,每個 znode 中存儲的數據可以有多個版本,也就是一個訪問路徑中可以存儲多份數據。
4. znode 可以是臨時節點,一旦創建這個 znode 的客戶端與服務器失去聯系,這個 znode 也將自動刪除,Zookeeper 的客戶端和服務器通信采用長連接方式,每個客戶端和服務器通過心跳來保持連接,這個連接狀態稱為 session,如果 znode 是臨時節點,這個 session 失效,znode 也就刪除了。
5. znode 的目錄名可以自動編號,如 App1 已經存在,再創建的話,將會自動命名為 App2。
6. znode 可以被監控,包括這個目錄節點中存儲的數據的修改,子節點目錄的變化等,一旦變化可以通知設置監控的客戶端,這個是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基於這個特性實現的,后面在典型的應用場景中會有實例介紹。
 
ZooKeeper安裝配置
ZooKeeper的安裝模式分為三種,分別為:單機模式(stand-alone)、集群模式和集群偽分布模式(略)。
 
單機模式
下載ZooKeeper的安裝包之后, 解壓到合適目錄. 進入ZooKeeper目錄下的conf子目錄, 創建zoo.cfg(或者直接改名zoo_sample.cfg):
 
 tickTime=2000 
 dataDir=D:/devtools/zookeeper-3.2.2/build 
 clientPort=2181 
 
  • tickTime:這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。
  • dataDir:顧名思義就是 Zookeeper 保存數據的目錄,默認情況下,Zookeeper 將寫數據的日志文件也保存在這個目錄里。
  • clientPort:這個端口就是客戶端連接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求。
 
當這些配置項配置好后,你現在就可以啟動 Zookeeper 了,啟動后要檢查 Zookeeper 是否已經在服務,可以通過 netstat -ano 命令查看是否有你配置的 clientPort 端口號在監聽服務。
 
集群模式
Zookeeper 不僅可以單機提供服務,同時也支持多機組成集群來提供服務。實際上 Zookeeper 還支持另外一種偽集群的方式,也就是可以在一台物理機上運行多個 Zookeeper 實例,下面將介紹集群模式的安裝和配置。
 
Zookeeper 的集群模式的安裝和配置也不是很復雜,所要做的就是增加幾個配置項。集群模式除了上面的三個配置項還要增加下面幾個配置項:
 
 initLimit=5 
 syncLimit=2 
 server.1=192.168.211.1:2888:3888 
 server.2=192.168.211.2:2888:3888 


 
  • initLimit:這個配置項是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集群中連接到 Leader 的 Follower 服務器)初始化連接時最長能忍受多少個心跳時間間隔數。當已經超過 10 個心跳的時間(也就是 tickTime)長度后 Zookeeper 服務器還沒有收到客戶端的返回信息,那么表明這個客戶端連接失敗。總的時間長度就是 5*2000=10 秒
  • syncLimit:這個配置項標識 Leader 與 Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是 2*2000=4 秒
  • server.A=B:C:D:其中 A 是一個數字,表示這個是第幾號服務器;B 是這個服務器的 ip 地址;C 表示的是這個服務器與集群中的 Leader 服務器交換信息的端口;D 表示的是萬一集群中的 Leader 服務器掛了,需要一個端口來重新進行選舉,選出一個新的 Leader,而這個端口就是用來執行選舉時服務器相互通信的端口。如果是偽集群的配置方式,由於 B 都是一樣,所以不同的 Zookeeper 實例通信端口號不能一樣,所以要給它們分配不同的端口號。
 
除了修改 zoo.cfg 配置文件,集群模式下還要配置一個文件 myid,這個文件在 dataDir 目錄下,這個文件里面就有一個數據就是 A 的值,Zookeeper 啟動時會讀取這個文件,拿到里面的數據與 zoo.cfg 里面的配置信息比較從而判斷到底是那個 server。
 
ZooKeeper常用命令
ZooKeeper服務命令:
1. 啟動ZK服務: ./zkServer.sh start
2. 查看ZK服務狀態: ./zkServer.sh status
3. 停止ZK服務: ./zkServer.sh stop
4. 重啟ZK服務: ./zkServer.sh restart
 
ZooKeeper客戶端命令:
ZooKeeper 命令行工具類似於Linux的shell環境,使用它可以對ZooKeeper進行訪問,數據創建,數據修改等操作. 使用 zkCli.sh -server 127.0.0.1:2181 連接到 ZooKeeper 服務,連接成功后,系統會輸出 ZooKeeper 的相關環境以及配置信息。
命令行工具的一些簡單操作如下:
1. 顯示根目錄下、文件: ls / 使用 ls 命令來查看當前 ZooKeeper 中所包含的內容
2. 顯示根目錄下、文件: ls2 / 查看當前節點數據並能看到更新次數等數據
3. 創建文件,並設置初始內容: create /zk "test" 創建一個新的 znode節點“ zk ”以及與它關聯的字符串
4. 獲取文件內容: get /zk 確認 znode 是否包含我們所創建的字符串
5. 修改文件內容: set /zk "zkbak" 對 zk 所關聯的字符串進行設置
6. 刪除文件: delete /zk 將剛才創建的 znode 刪除
7. 退出客戶端: quit
8. 幫助命令: help
 
ZooKeeper 常用四字命令:
ZooKeeper 支持某些特定的四字命令字母與其的交互。它們大多是查詢命令,用來獲取 ZooKeeper 服務的當前狀態及相關信息。用戶在客戶端可以通過 telnet 或 nc 向 ZooKeeper 提交相應的命令
1. 可以通過命令:echo stat|nc 127.0.0.1 2181 來查看哪個節點被選擇作為follower或者leader
2. 使用echo ruok|nc 127.0.0.1 2181 測試是否啟動了該Server,若回復imok表示已經啟動。
3. echo dump| nc 127.0.0.1 2181 ,列出未經處理的會話和臨時節點。
4. echo kill | nc 127.0.0.1 2181 ,關掉server
5. echo conf | nc 127.0.0.1 2181 ,輸出相關服務配置的詳細信息。
6. echo cons | nc 127.0.0.1 2181 ,列出所有連接到服務器的客戶端的完全的連接 / 會話的詳細信息。
7. echo envi |nc 127.0.0.1 2181 ,輸出關於服務環境的詳細信息(區別於 conf 命令)。
8. echo reqs | nc 127.0.0.1 2181 ,列出未經處理的請求。
9. echo wchs | nc 127.0.0.1 2181 ,列出服務器 watch 的詳細信息。
10. echo wchc | nc 127.0.0.1 2181 ,通過 session 列出服務器 watch 的詳細信息,它的輸出是一個與 watch 相關的會話的列表。
11. echo wchp | nc 127.0.0.1 2181 ,通過路徑列出服務器 watch 的詳細信息。它輸出一個與 session 相關的路徑。
 
ZooKeeper的Java客戶端API
客戶端要連接 Zookeeper 服務器可以通過創建 org.apache.zookeeper. ZooKeeper 的一個實例對象,然后調用這個類提供的接口來和服務器交互。
 
前面說了 ZooKeeper 主要是用來維護和監控一個目錄節點樹中存儲的數據的狀態,所以我們能夠操作 ZooKeeper 的節點也和操作目錄節點樹大體一樣,如創建一個目錄節點,給某個目錄節點設置數據,獲取某個目錄節點的所有子目錄節點,給某個目錄節點設置權限和監控這個目錄節點的狀態變化。
 
類org.apache.zookeeper. ZooKeeper 的方法如下表所示:
 
String create(String path, byte[] data, List<ACL> acl,CreateMode createMode) 創建一個給定的目錄節點 path, 並給它設置數據,CreateMode 標識有四種形式的目錄節點,分別是 PERSISTENT:持久化目錄節點,這個目錄節點存儲的數據不會丟失;PERSISTENT_SEQUENTIAL:順序自動編號的目錄節點,這種目錄節點會根據當前已近存在的節點數自動加 1,然后返回給客戶端已經成功創建的目錄節點名;EPHEMERAL:臨時目錄節點,一旦創建這個節點的客戶端與服務器端口也就是 session 超時,這種節點會被自動刪除;EPHEMERAL_SEQUENTIAL:臨時自動編號節點
Stat exists(String path, boolean watch) 判斷某個 path 是否存在,並設置是否監控這個目錄節點,這里的 watcher 是在創建 ZooKeeper 實例時指定的 watcher,exists方法還有一個重載方法,可以指定特定的watcher
Stat exists(String path,Watcher watcher) 重載方法,這里給某個目錄節點設置特定的 watcher,Watcher 在 ZooKeeper 是一個核心功能,Watcher 可以監控目錄節點的數據變化以及子目錄的變化,一旦這些狀態發生變化,服務器就會通知所有設置在這個目錄節點上的 Watcher,從而每個客戶端都很快知道它所關注的目錄節點的狀態發生變化,而做出相應的反應
void delete(String path, int version) 刪除 path 對應的目錄節點,version 為 -1 可以匹配任何版本,也就刪除了這個目錄節點所有數據
List<String>getChildren(String path, boolean watch) 獲取指定 path 下的所有子目錄節點,同樣 getChildren方法也有一個重載方法可以設置特定的 watcher 監控子節點的狀態
Stat setData(String path, byte[] data, int version) 給 path 設置數據,可以指定這個數據的版本號,如果 version 為 -1 怎可以匹配任何版本
byte[] getData(String path, boolean watch, Stat stat) 獲取這個 path 對應的目錄節點存儲的數據,數據的版本等信息可以通過 stat 來指定,同時還可以設置是否監控這個目錄節點數據的狀態
voidaddAuthInfo(String scheme, byte[] auth) 客戶端將自己的授權信息提交給服務器,服務器將根據這個授權信息驗證客戶端的訪問權限。
Stat setACL(String path,List<ACL> acl, int version) 給某個目錄節點重新設置訪問權限,需要注意的是 Zookeeper 中的目錄節點權限不具有傳遞性,父目錄節點的權限不能傳遞給子目錄節點。目錄節點 ACL 由兩部分組成:perms 和 id。
Perms 有 ALL、READ、WRITE、CREATE、DELETE、ADMIN 幾種 
而 id 標識了訪問目錄節點的身份列表,默認情況下有以下兩種:
ANYONE_ID_UNSAFE = new Id("world", "anyone") 和 AUTH_IDS = new Id("auth", "") 分別表示任何人都可以訪問和創建者擁有訪問權限。
List<ACL>getACL(String path,Stat stat) 獲取某個目錄節點的訪問權限列表
 
除了以上這些列出的方法之外還有一些重載方法,如都提供了一個回調類的重載方法以及可以設置特定 Watcher 的重載方法,具體的方法可以參考 org.apache.zookeeper. ZooKeeper 類的 API 說明。
 
下面給出基本的操作 ZooKeeper 的示例代碼,這樣你就能對 ZooKeeper 有直觀的認識了。下面的清單包括了創建與 ZooKeeper 服務器的連接以及最基本的數據操作:
// 創建一個與服務器的連接
 ZooKeeper zk = new ZooKeeper("localhost:" + CLIENT_PORT, 
        ClientBase.CONNECTION_TIMEOUT, new Watcher() { 
            // 監控所有被觸發的事件
            public void process(WatchedEvent event) { 
                System.out.println("已經觸發了" + event.getType() + "事件!"); 
            } 
        }); 
 // 創建一個目錄節點
 zk.create("/testRootPath", "testRootData".getBytes(), Ids.OPEN_ACL_UNSAFE,
   CreateMode.PERSISTENT); 
 // 創建一個子目錄節點
 zk.create("/testRootPath/testChildPathOne", "testChildDataOne".getBytes(),
   Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); 
 System.out.println(new String(zk.getData("/testRootPath",false,null))); 
 // 取出子目錄節點列表
 System.out.println(zk.getChildren("/testRootPath",true)); 
 // 修改子目錄節點數據
 zk.setData("/testRootPath/testChildPathOne","modifyChildDataOne".getBytes(),-1); 
 System.out.println("目錄節點狀態:["+zk.exists("/testRootPath",true)+"]"); 
 // 創建另外一個子目錄節點
 zk.create("/testRootPath/testChildPathTwo", "testChildDataTwo".getBytes(), 
   Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); 
 System.out.println(new String(zk.getData("/testRootPath/testChildPathTwo",true,null))); 
 // 刪除子目錄節點
 zk.delete("/testRootPath/testChildPathTwo",-1); 
 zk.delete("/testRootPath/testChildPathOne",-1); 
 // 刪除父目錄節點
 zk.delete("/testRootPath",-1); 
 // 關閉連接
 zk.close();
 
輸出的結果如下:
已經觸發了 None 事件!
 testRootData 
 [testChildPathOne] 
目錄節點狀態:[5,5,1281804532336,1281804532336,0,1,0,0,12,1,6] 
已經觸發了 NodeChildrenChanged 事件!
 testChildDataTwo 
已經觸發了 NodeDeleted 事件!
已經觸發了 NodeDeleted 事件!

 
當對目錄節點監控狀態打開時,一旦目錄節點的狀態發生變化,Watcher 對象的 process 方法就會被調用。
 
直接使用zk的api實現業務功能比較繁瑣。因為要處理session loss,session expire等異常,在發生這些異常后進行重連。又因為ZK的watcher是一次性的,如果要基於wather實現發布/訂閱模式,還要自己包裝一下,將一次性訂閱包裝成持久訂閱。另外如果要使用抽象級別更高的功能,比如分布式鎖,leader選舉等,還要自己額外做很多事情。這里介紹下ZK的兩個第三方客戶端包裝小工具,可以分別解決上述小問題。
 
  • I0Itec-zkClient
zkClient主要做了兩件事情。一件是在session loss和session expire時自動創建新的ZooKeeper實例進行重連。另一件是將一次性watcher包裝為持久watcher。后者的具體做法是簡單的在watcher回調中,重新讀取數據的同時再注冊相同的watcher實例。
 
ZkConnection 類: 對zookeeper API的簡單分裝,提供了鏈接zookeeper server和數據CRUD的操作;此類實現了IZkConnection接口,通常情況下,如果I0Itec-zkclient不能滿足需要的時候,我 們可以重寫ZkConnection即可.ZkClient類: 核心類,也是開發者需要直接使用的類,它內部維護了zookeeper的鏈接管理和Event處理邏輯等,同時也暴露了zookeeper znode的CRUD方法列表.IZkChildListener接口: znode 子節點事件偵聽器,當ZkClient接收到某個path節點變更或者子節點變更事件時,會觸發lisntener.IZkDataListener接 口:IZkStateListener接口: 當zookeeper客戶端狀態變更時,觸發.

zkClient簡單的使用樣例如下:

public static void testzkClient(final String serverList) {
     ZkClient zkClient4subChild = new ZkClient(serverList);
     zkClient4subChild.subscribeChildChanges(PATH, new IZkChildListener() {
          @Override
          public void handleChildChange(String parentPath, List currentChilds) throws Exception {
               System.out.println(prefix() + "clildren of path " + parentPath + ":" + currentChilds);
          }
     });
}
 
上面是訂閱children變化,下面是訂閱數據變化
ZkClient zkClient4subData = new ZkClient(serverList);
zkClient4subData.subscribeDataChanges(PATH, new IZkDataListener() {
     @Override
     public void handleDataChange(String dataPath, Object data) throws Exception {
          System.out.println(prefix() + "Data of " + dataPath + " has changed");
     }
     @Override
     public void handleDataDeleted(String dataPath) throws Exception {
          System.out.println(prefix() + dataPath + " has deleted");
     }
});
 
訂閱連接狀態的變化:
ZkClient zkClient4subStat = new ZkClient(serverList);
zkClient4subStat.subscribeStateChanges(new IZkStateListener() {
     @Override public void handleNewSession() throws Exception {
          System.out.println(prefix() + "handleNewSession()");
     }
     @Override
     public void handleStateChanged(KeeperState stat) throws Exception {
          System.out.println(prefix() + "handleStateChanged,stat:" + stat);
     }
});

下面表格列出了寫操作與ZK內部產生的事件的對應關系:

   event For “/path” event For “/path/child”
 
create(“/path”) EventType.NodeCreated NA
delete(“/path”) EventType.NodeDeleted NA
setData(“/path”) EventType.NodeDataChanged NA
create(“/path/child”) EventType.NodeChildrenChanged EventType.NodeCreated
delete(“/path/child”) EventType.NodeChildrenChanged EventType.NodeDeleted
setData(“/path/child”) NA EventType.NodeDataChanged

而ZK內部的寫事件與所觸發的watcher的對應關系如下:

event For “/path” defaultWatcher exists
(“/path”)
getData
(“/path”)
getChildren
(“/path”)
 
EventType.None
EventType.NodeCreated    
EventType.NodeDeleted   √(不正常)  
EventType.NodeDataChanged    
EventType.NodeChildrenChanged      

綜合上面兩個表,我們可以總結出各種寫操作可以觸發哪些watcher,如下表所示:

   “/path” “/path/child”    exists getData getChildren exists getData getChildren
 
 
create(“/path”)        
delete(“/path”)      
setData(“/path”)        
create(“/path/child”)      
delete(“/path/child”)    
setData(“/path/child”)        

如果發生session close、authFail和invalid,那么所有類型的wather都會被觸發。

zkClient除了做了一些便捷包裝之外,對watcher使用做了一點增強。比如subscribeChildChanges實際上是通過exists和getChildren關注了兩個事件。這樣當create(“/path”)時,對應path上通過getChildren注冊的listener也會被調用。另外subscribeDataChanges實際上只是通過exists注冊了事件。因為從上表可以看到,對於一個更新,通過exists和getData注冊的watcher要么都會觸發,要么都不會觸發。

zkClient地址:https://github.com/sgroschupf/zkclient
Maven工程中使用zkClient需要加的依賴:

    <dependency>
        <groupId>zkclient</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.1</version>
    </dependency>
  • menagerie
menagerie基於Zookeeper實現了java.util.concurrent包的一個分布式版本。這個封裝是更大粒度上對各種分布式一致性使用場景的抽象。其中最基礎和常用的是一個分布式鎖的實現:
org.menagerie.locks.ReentrantZkLock,通過ZooKeeper的全局有序的特性和EPHEMERAL_SEQUENTIAL類型znode的支持,實現了分布式鎖。具體做法是:不同的client上每個試圖獲得鎖的線程,都在相同的basepath下面創建一個EPHEMERAL_SEQUENTIAL的node。EPHEMERAL表示要創建的是臨時znode,創建連接斷開時會自動刪除; SEQUENTIAL表示要自動在傳入的path后面綴上一個自增的全局唯一后綴,作為最終的path。因此對不同的請求ZK會生成不同的后綴,並分別返回帶了各自后綴的path給各個請求。因為ZK全局有序的特性,不管client請求怎樣先后到達,在ZKServer端都會最終排好一個順序,因此自增后綴最小的那個子節點,就對應第一個到達ZK的有效請求。然后client讀取basepath下的所有子節點和ZK返回給自己的path進行比較,當發現自己創建的sequential node的后綴序號排在第一個時,就認為自己獲得了鎖;否則的話,就認為自己沒有獲得鎖。這時肯定是有其他並發的並且是沒有斷開的client/線程先創建了node。

基於分布式鎖,還實現了其他業務場景,比如leader選舉:
public static void leaderElectionTest() {
ZkSessionManager zksm = new DefaultZkSessionManager(“ZK-host-ip:2181″, 5000);
LeaderElector elector = new ZkLeaderElector(“/leaderElectionTest”, zksm, Ids.OPEN_ACL_UNSAFE);
if (elector.nominateSelfForLeader()) {
System.out.println(“Try to become the leader success!”);
}
}

java.util.concurrent包下面的其他接口實現,也主要是基於ReentrantZkLock的,比如ZkHashMap實現了ConcurrentMap。具體請參見menagerie的API文檔

menagerie地址:https://github.com/openUtility/menagerie
Maven工程中使用menagerie需要加的依賴:

    <dependency>
        <groupId>org.menagerie</groupId>
        <artifactId>menagerie</artifactId>
        <version>1.1-SNAPSHOT</version>
    </dependency>
 
ZooKeeper 典型的應用場景
 
Zookeeper 從設計模式角度來看,是一個基於觀察者模式設計的分布式服務管理框架,它負責存儲和管理大家都關心的數據,然后接受觀察者的注冊,一旦這些數據的狀態發生變化,Zookeeper 就將負責通知已經在 Zookeeper 上注冊的那些觀察者做出相應的反應,從而實現集群中類似 Master/Slave 管理模式,關於 Zookeeper 的詳細架構等內部細節可以閱讀 Zookeeper 的源碼。
 
下面詳細介紹這些典型的應用場景,也就是 Zookeeper 到底能幫我們解決那些問題?下面將給出答案。
 
統一命名服務(Name Service)
 
分布式應用中,通常需要有一套完整的命名規則,既能夠產生唯一的名稱又便於人識別和記住,通常情況下用樹形的名稱結構是一個理想的選擇,樹形的名稱結構是一個有層次的目錄結構,既對人友好又不會重復。說到這里你可能想到了 JNDI,沒錯 Zookeeper 的 Name Service 與 JNDI 能夠完成的功能是差不多的,它們都是將有層次的目錄結構關聯到一定資源上,但是 Zookeeper 的 Name Service 更加是廣泛意義上的關聯,也許你並不需要將名稱關聯到特定資源上,你可能只需要一個不會重復名稱,就像數據庫中產生一個唯一的數字主鍵一樣。
 
Name Service 已經是 Zookeeper 內置的功能,你只要調用 Zookeeper 的 API 就能實現。如調用 create 接口就可以很容易創建一個目錄節點。
 
配置管理(Configuration Management)
 
配置的管理在分布式應用環境中很常見,例如同一個應用系統需要多台 PC Server 運行,但是它們運行的應用系統的某些配置項是相同的,如果要修改這些相同的配置項,那么就必須同時修改每台運行這個應用系統的 PC Server,這樣非常麻煩而且容易出錯。
 
像這樣的配置信息完全可以交給 Zookeeper 來管理,將配置信息保存在 Zookeeper 的某個目錄節點中,然后將所有需要修改的應用機器監控配置信息的狀態,一旦配置信息發生變化,每台應用機器就會收到 Zookeeper 的通知,然后從 Zookeeper 獲取新的配置信息應用到系統中。
 
圖 2. 配置管理結構圖 
 
集群管理(Group Membership)
 
Zookeeper 能夠很容易的實現集群管理的功能,如有多台 Server 組成一個服務集群,那么必須要一個“總管”知道當前集群中每台機器的服務狀態,一旦有機器不能提供服務,集群中其它集群必須知道,從而做出調整重新分配服務策略。同樣當增加集群的服務能力時,就會增加一台或多台 Server,同樣也必須讓“總管”知道。
 
Zookeeper 不僅能夠幫你維護當前的集群中機器的服務狀態,而且能夠幫你選出一個“總管”,讓這個總管來管理集群,這就是 Zookeeper 的另一個功能 Leader Election。
 
它們的實現方式都是在 Zookeeper 上創建一個 EPHEMERAL 類型的目錄節點,然后每個 Server 在它們創建目錄節點的父目錄節點上調用  getChildren( String path, boolean watch) 方法並設置 watch 為 true,由於是 EPHEMERAL 目錄節點,當創建它的 Server 死去,這個目錄節點也隨之被刪除,所以 Children 將會變化,這時  getChildren上的 Watch 將會被調用,所以其它 Server 就知道已經有某台 Server 死去了。新增 Server 也是同樣的原理。
 
Zookeeper 如何實現 Leader Election,也就是選出一個 Master Server。和前面的一樣每台 Server 創建一個 EPHEMERAL 目錄節點,不同的是它還是一個 SEQUENTIAL 目錄節點,所以它是個 EPHEMERAL_SEQUENTIAL 目錄節點。之所以它是 EPHEMERAL_SEQUENTIAL 目錄節點,是因為我們可以給每台 Server 編號,我們可以選擇當前是最小編號的 Server 為 Master,假如這個最小編號的 Server 死去,由於是 EPHEMERAL 節點,死去的 Server 對應的節點也被刪除,所以當前的節點列表中又出現一個最小編號的節點,我們就選擇這個節點為當前 Master。這樣就實現了動態選擇 Master,避免了傳統意義上單 Master 容易出現單點故障的問題。
 
圖 3. 集群管理結構圖 
 
 
這部分的示例代碼如下,完整的代碼請看附件:
                        
 void findLeader() throws InterruptedException { 
        byte[] leader = null; 
        try { 
            leader = zk.getData(root + "/leader", true, null); 
        } catch (Exception e) { 
            logger.error(e); 
        } 
        if (leader != null) { 
            following(); 
        } else { 
            String newLeader = null; 
            try { 
                byte[] localhost = InetAddress.getLocalHost().getAddress(); 
                newLeader = zk.create(root + "/leader", localhost, 
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); 
            } catch (Exception e) { 
                logger.error(e); 
            } 
            if (newLeader != null) { 
                leading(); 
            } else { 
                mutex.wait(); 
            } 
        } 
    } 

 
共享鎖(Locks)
 
共享鎖在同一個進程中很容易實現,但是在跨進程或者在不同 Server 之間就不好實現了。Zookeeper 卻很容易實現這個功能,實現方式也是需要獲得鎖的 Server 創建一個 EPHEMERAL_SEQUENTIAL 目錄節點,然后調用  getChildren方法獲取當前的目錄節點列表中最小的目錄節點是不是就是自己創建的目錄節點,如果正是自己創建的,那么它就獲得了這個鎖,如果不是那么它就調用 exists( String path, boolean watch) 方法並監控 Zookeeper 上目錄節點列表的變化,一直到自己創建的節點是列表中最小編號的目錄節點,從而獲得鎖,釋放鎖很簡單,只要刪除前面它自己所創建的目錄節點就行了。
 
圖 4. Zookeeper 實現 Locks 的流程圖 
 
                         
 void getLock() throws KeeperException, InterruptedException{ 
        List<String> list = zk.getChildren(root, false); 
        String[] nodes = list.toArray(new String[list.size()]); 
        Arrays.sort(nodes); 
        if(myZnode.equals(root+"/"+nodes[0])){ 
            doAction(); 
        } 
        else{ 
            waitForLock(nodes[0]); 
        } 
    } 
    void waitForLock(String lower) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true); 
        if(stat != null){ 
            mutex.wait(); 
        } 
        else{ 
            getLock(); 
        } 
    } 

 
隊列管理
 
Zookeeper 可以處理兩種類型的隊列:
(1)當一個隊列的成員都聚齊時,這個隊列才可用,否則一直等待所有成員到達,這種是同步隊列。
(2)隊列按照 FIFO 方式進行入隊和出隊操作,例如實現生產者和消費者模型。
 
同步隊列用 Zookeeper 實現的實現思路如下:
 
創建一個父目錄 /synchronizing,每個成員都監控標志(Set Watch)位目錄 /synchronizing/start 是否存在,然后每個成員都加入這個隊列,加入隊列的方式就是創建 /synchronizing/member_i 的臨時目錄節點,然后每個成員獲取 / synchronizing 目錄的所有目錄節點,也就是 member_i。判斷 i 的值是否已經是成員的個數,如果小於成員個數等待 /synchronizing/start 的出現,如果已經相等就創建 /synchronizing/start。
 
用下面的流程圖更容易理解:
圖 5. 同步隊列流程圖 
 
同步隊列的關鍵代碼如下:
void addQueue() throws KeeperException, InterruptedException{ 
        zk.exists(root + "/start",true); 
        zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, 
        CreateMode.EPHEMERAL_SEQUENTIAL); 
        synchronized (mutex) { 
            List<String> list = zk.getChildren(root, false); 
            if (list.size() < size) { 
                mutex.wait(); 
            } else { 
                zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,
                 CreateMode.PERSISTENT); 
            } 
        } 
 } 

 
當隊列沒滿是進入 wait(),然后會一直等待 Watch 的通知,Watch 的代碼如下:
public void process(WatchedEvent event) { 
        if(event.getPath().equals(root + "/start") &&
         event.getType() == Event.EventType.NodeCreated){ 
            System.out.println("得到通知"); 
            super.process(event); 
            doAction(); 
        } 
    } 

 
FIFO 隊列用 Zookeeper 實現思路如下:
 
實現的思路也非常簡單,就是在特定的目錄下創建 SEQUENTIAL 類型的子目錄 /queue_i,這樣就能保證所有成員加入隊列時都是有編號的,出隊列時通過 getChildren( ) 方法可以返回當前所有的隊列中的元素,然后消費其中最小的一個,這樣就能保證 FIFO。
 
下面是生產者和消費者這種隊列形式的示例代碼:
boolean produce(int i) throws KeeperException, InterruptedException{ 
        ByteBuffer b = ByteBuffer.allocate(4); 
        byte[] value; 
        b.putInt(i); 
        value = b.array(); 
        zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                    CreateMode.PERSISTENT_SEQUENTIAL); 
        return true; 
    } 

 
int consume() throws KeeperException, InterruptedException{ 
        int retvalue = -1; 
        Stat stat = null; 
        while (true) { 
            synchronized (mutex) { 
                List<String> list = zk.getChildren(root, true); 
                if (list.size() == 0) { 
                    mutex.wait(); 
                } else { 
                    Integer min = new Integer(list.get(0).substring(7)); 
                    for(String s : list){ 
                        Integer tempValue = new Integer(s.substring(7)); 
                        if(tempValue < min) min = tempValue; 
                    } 
                    byte[] b = zk.getData(root + "/element" + min,false, stat); 
                    zk.delete(root + "/element" + min, 0); 
                    ByteBuffer buffer = ByteBuffer.wrap(b); 
                    retvalue = buffer.getInt(); 
                    return retvalue; 
                } 
            } 
        } 
 } 

 
 
參考資料
分布式服務框架 Zookeeper—管理分布式環境中的數據- http://netcome.iteye.com/blog/1474255
zookeeper工作原理、安裝配置、工具命令簡介- http://www.cnblogs.com/kunpengit/p/4045334.html
I0Itec-zkClient小結- http://san-yun.iteye.com/blog/1977454
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM