總結:
what:分布式應用的協調服務。用於對分布式系統進行節點管理、leader選舉、配置管理等。zookeeper自身也是個分布式系統,有多個節點,具備一致性(借助Paxos算法)、容錯性、高可用等特點。
原理:從設計模式角度來看,是一個基於觀察者模式的分布式應用的管理框架,它負責存儲和管理大家都關心的數據,然后接受觀察者的注冊,一旦這些數據的狀態發生變化,Zookeeper 就將負責通知已經在 Zookeeper 上注冊的那些觀察者做出相應的反應。
主要概念:znode
zookeeper中的數據是以樹形結構組織的,類似於樹形文件系統。樹中的節點稱為znode,znode有持久型和臨時型兩個類型,具體來說有四種:
持久型:PERSISTENT、PERSISTENT_SEQUENTIAL
臨時型:EPHEMERAL、EPHEMERAL_SEQUENTIAL
臨時型的znode在創建者與zookeeper連接斷開時(客戶端主動斷開、服務端宕機導致被動斷開等)會被自動刪除,持久型的則不會;帶SEQUENTIAL后綴的與不帶的相比區別在於前者類型的znode寫入時會自動在末尾加編號,編號遞增,客戶端可以根據編號排序來推斷事件的順序或實現一些功能(如leader選舉、實現讀寫鎖等)。
主要應用場景:
(均為通過利用 zookeeper znode及znode變化通知機制 來實現。對於前者,要么利用“zk中節點不能重名”的特性、要么利用節點順序做文章。臨時節點用得多)
配置管理/命名管理:用於存儲所管理的分布式系統的配置數據(如節點的地址等),這些數據被系統的各節點共享。zookeeper自身的配置數據也存儲在樹形文件系統中。
節點管理(membership):各節點連接zookeeper后往同一個目錄(如/kafka/nodes)下創建一個EPHEMERAL類型的zonde,值為當前節點的id(如將ip:port作為id);並監控該目錄節點。這樣每個節點都可知道系統中其他節點的存在,當節點增減時也能收到通知從而感知到系統的節點的變化。
leader選舉(master/slave選舉、replication leader選舉):與節點選管理類似,只不過進一步地,各節點從節點列表中按定好的規則選出一個作為leader。如改為創建EPHEMERAL_SEQUENTIAL類型的節點,並將編號最小創建者的作為leader。
分布式鎖:(創建臨時節點,這樣在連接斷開時節點自動刪除)
分布式排他鎖:
非公平鎖:與leader選舉類似,按預定的規則選出的znode(如將編號最小的znode)的創建者作為鎖的獲得者,其他的創建者等待並監聽共同父目錄的children變化事件。當然,由於編號是按創建順序遞增的,故若規則為選編號最小者,則是公平的。
公平鎖:先寫先得——往同一個znode下創建多個znode時,會按寫入的先后順序存儲,故每次都讓最靠前的znode的創建者獲得鎖即可。
也可利用“zk中的節點不能重名的特性”來實現排他鎖:各客戶端創建同一EPHEMERAL znode,只會有一個創建成功,成功者獲得鎖,失敗者等待並監聽該znode的刪除事件,流程圖:
屏障(barrier):與節點管理類似,只不過各節點等監控到集群中節點數達到預期數理時才開始接下來的工作。
讀/寫鎖:各客戶端往同一目錄下創建EPHEMERAL_SEQUENTIAL節點(寫或讀節點,結果如"read_1"、"write_2"、"read_3"),然后獲取目錄下的znode列表並按序號升序排序.
讀鎖:若當前客戶端創建的znode前無寫znode則獲得讀鎖,否則等待並監聽該寫節點的刪除事件
寫鎖:若當前客戶端創建的znode前無節點(即為第一個節點)則獲得鎖,否則等待並監聽前一節點的刪除事件
讀鎖: 寫鎖:
入門簡介,不妨見:https://mp.weixin.qq.com/s/jr6pUFurUdBBoKGf9sY-JA
=============以下為正文=============
Apache Zookeeper 由 Apache Hadoop 的 Zookeeper 子項目發展而來,現已經成為 Apache 的頂級項目,它是一個開放源碼的分布式應用程序協調服務,是Google Chubby的一個開源實現。它是一個為分布式應用提供一致性服務的組件,提供的功能包括:配置管理,名字服務,提供分布式同步、隊列管理、集群管理等。
使用場景(即上述的功能):典型應用場景篇一,典型應用場景篇二
原理:Zookeeper 從設計模式角度來看,是一個基於觀察者模式設計的分布式服務管理框架,它負責存儲和管理大家都關心的數據,然后接受觀察者的注冊,一旦這些數據的狀態發生變化,Zookeeper 就將負責通知已經在 Zookeeper 上注冊的那些觀察者做出相應的反應,從而實現集群中類似 Master/Slave 管理模式。
1、安裝
Zookeeper有兩種運行模式:
1.1、獨立模式(standalone mode)
(具體參考:http://blog.csdn.net/csfreebird/article/details/44006453)
只運行在一台服務器上,適合測試環境。
1.2、復制模式(replicated mode)
運行於一個集群上,適合生產環境,這個計算機集群被稱為一個“集合體”(ensemble)。
Zookeeper通過復制來實現高可用性,只要集合體中半數以上的機器處於可用狀態,它就能夠保證服務繼續。這跟Zookeeper的復制策略有關:Zookeeper確保對znode樹的每一個修改都會被復制到集合體中超過半數的機器上。
由於ZooKeeper集群,會有一個Leader負責管理和協調其他集群服務器,因此服務器的數量通常都是奇數,例如3,5,7...等,這樣2n+1的數量的服務器就可以允許最多n台服務器的失效。
(安裝可參考:http://blog.csdn.net/csfreebird/article/details/44007295)
這里以zookeeper-3.4.8為例:
1.2.1、下載解壓
解壓后目錄結構為:
. ├── bin ├── build.xml ├── CHANGES.txt ├── conf ├── contrib ├── dist-maven ├── docs ├── ivysettings.xml ├── ivy.xml ├── lib ├── LICENSE.txt ├── NOTICE.txt ├── README_packaging.txt ├── README.txt ├── recipes ├── src ├── zookeeper-3.4.8.jar ├── zookeeper-3.4.8.jar.asc ├── zookeeper-3.4.8.jar.md5 └── zookeeper-3.4.8.jar.sha1
1.2.2、配置文件
進入conf目錄, cp zoo_sample.cfg zoo.cfg ,修改配置文件zoo.cfg為如下:

# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/usr/local/zookeeper/zookeeper-3.4.8/zk_data # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 server.1=192.168.6.131:2888:3888 server.2=192.168.6.132:2888:3888 server.3=192.168.6.133:2888:3888
各配置項的含義:

1. tickTime = 2000:Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,每隔tickTime時間就會發送一個心跳。
2. initLimit: 配置 Zookeeper 接受客戶端(此客戶端不是用戶連接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集群中連接到 Leader 的 Follower 服務器)初始化連接時最長能忍受多少個心跳時間間隔數。當已超過initLimit個tickTime長度后 Zookeeper 服務器還沒有收到客戶端的返回信息,則表明客戶端連接失敗。總的時間長度就是 initLimit * tickTime 秒。
3. syncLimit: 配置 Leader 與 Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是 syncLimit * tickTime 秒
4. dataDir: Zookeeper 保存數據的目錄,默認情況下,Zookeeper 將寫數據的日志文件也保存在這個目錄里。
5. dataLogDir:若沒提供的話則用dataDir。zookeeper的持久化都存儲在這兩個目錄里。dataLogDir里是放到的順序日志(WAL)。而dataDir里放的是內存數據結構的snapshot,便於快速恢復。為了達到性能最大化,一般建議把dataDir和dataLogDir分到不同的磁盤上,以充分利用磁盤順序寫的特性。
6. clientPort:Zookeeper服務器監聽的端口,以接受客戶端的訪問請求。
7. server.A=B:C:D:其中 A 是一個數字,表示這個是第幾號服務器;B 是這個服務器的 ip 地址;C 表示的是這個服務器與集群中的 Leader 服務器交換信息的端口;D 表示的是萬一集群中的 Leader 服務器掛了,需要一個端口來重新進行選舉,選出一個新的 Leader,此端口就是用來執行選舉時服務器相互通信的端口。如果是偽集群的配置方式,由於 B 都是一樣,所以不同的 Zookeeper 實例通信端口號不能一樣,所以要給它們分配不同的端口號。
每個節點的配置文件都一樣。
更多可參考Zookeeper的配置
1.2.3、添加myid文件
除了修改 zoo.cfg 配置文件,集群模式下還要配置一個文件 myid,這個文件在 上述dataDir 指定的目錄下,這個文件里面就只有一個數據就是 A 的值,Zookeeper 啟動時會讀取這個文件,拿到里面的數據與 zoo.cfg 里面的配置信息比較從而判斷到底是那個 server。
1.2.4、啟動/結束
bin/zkServer.sh start|start-foreground|stop|restart|status|upgrade|print-cmd ,啟動后默認在bin下產生日志文件zookeeper.out
1. 啟動ZK服務: bin/zkServer.sh start 2. 查看ZK服務狀態: bin/zkServer.sh status 3. 停止ZK服務: bin/zkServer.sh stop 4. 重啟ZK服務: bin/zkServer.sh restart
(啟動后在zoo.cfg所指定的dataDir下有version-2文件夾(下有log.1文件)和zookeeper_server.pid文件)
啟動后可以借助下面的命令行客戶端看是否能連上以確定是否成功啟動,也可使用四字命令(如 echo stat|netcat localhost 2181 )快速確定節點狀態
2、使用
2.1、Shell客戶端
2.1.1 zk命令
命令行客戶端連接ZooKeeper: ./bin/zkCli.sh -server localhost:2181 ,連接成功后,會輸出 ZooKeeper 的相關環境以及配置信息。並能進行一些操作:
1. 顯示根目錄下、文件: ls / 使用 ls 命令來查看當前 ZooKeeper 中所包含的內容 2. 顯示根目錄下、文件: ls2 / 查看當前節點數據並能看到更新次數等數據 3. 創建文件並設置初始內容:create /zk "test" 創建一個新的 znode節點“ zk ”以及與它關聯的字符串 4. 獲取文件內容: get /zk 確認 znode 是否包含我們所創建的字符串 5. 修改文件內容: set /zk "zkbak" 對 zk 所關聯的字符串進行設置 6. 刪除文件: delete /zk 將剛才創建的 znode 刪除 7. 刪除目錄(即使非空):rmr 目錄 如可用於刪除Kafka的某個Consumer Group 8. 退出客戶端: quit 9. 幫助命令: help
客戶端連接后,用get / 命令可以發現此時只有zookeeper一項;如果此Zookeeper用於對Kafka或JStorm等提供服務,則還會有相應的其他目錄。
2.1.2 四字命令
此外,也可通過四字命令更方便地獲取服務端信息, 四字命令的用法為 echo 四字命令|netcat localhost 2181 ,常用的四字命令如下:
- conf:輸出Zookeeper服務器配置的詳細信息
- cons:輸出所有連接到服務器的客戶端的完全的連接/會話的詳細信息。包括“接收/發送”的包數量、會話ID、操作延遲、最后的操作執行等
- dump:輸出未經處理的會話和臨時節點
- envi:輸出關於服務器運行環境的詳細信息
- reqs:輸出未經處理的請求
- ruok:測試服務是否處於正確狀態。若是則會返回“imok”,否則不做任何反應
- stat:輸出關於性能和連接的客戶端的列表。(通過此命令也可查看節點是leader還是follower)
- wchs:輸出服務器watch的詳細信息
- wchc:通過session列出服務器watch的詳細信息,它的輸出是一個與watch相關的會話的列表
- wchp:通過路徑列出服務器watch的詳細信息,它輸出一個與session相關的路徑
- mntr:輸出一些Zookeeper運行時信息,通過對這些返回結果的解析可以達到監控效果
2.2、Java客戶端
使用Java進行Zookeeper的CRUD操作,示例如下:

1 package cn.edu.buaa.act.test.TestZookeeper; 2 3 import java.io.IOException; 4 5 import org.apache.zookeeper.CreateMode; 6 import org.apache.zookeeper.KeeperException; 7 import org.apache.zookeeper.Watcher; 8 import org.apache.zookeeper.ZooDefs.Ids; 9 import org.apache.zookeeper.ZooKeeper; 10 11 /** 12 * 此類包含兩個主要的 ZooKeeper 函數,分別為 createZKInstance ()和 ZKOperations ()。<br> 13 * <br> 14 * 15 * (1)createZKInstance ()函數負責對 ZooKeeper 實例 zk 進行初始化。 ZooKeeper 類有兩個構造函數,我們這里使用 16 * “ ZooKeeper ( String connectString, , int sessionTimeout, , Watcher watcher 17 * )”對其進行初始化。因此,我們需要提供初始化所需的,連接字符串信息,會話超時時間,以及一個 watcher 實例。<br> 18 * <br> 19 * 20 * (2)ZKOperations ()函數是我們所定義的對節點的一系列操作。它包括:創建 ZooKeeper 21 * 節點、查看節點、修改節點數據、查看修改后節點數據、刪除節點、查看節點是否存在。另外,需要注意的是:在創建節點的時候,需要提供節點的名稱、數據、 22 * 權限以及節點類型。此外,使用 exists 函數時,如果節點不存在將返回一個 null 值。 23 */ 24 public class ZookeeperTest { 25 26 // 會話超時時間,設置為與系統默認時間一致 27 28 private static final int SESSION_TIMEOUT = 30000; 29 30 // 創建 ZooKeeper 實例 31 32 ZooKeeper zk; 33 34 // 創建 Watcher 實例 35 36 Watcher wh = new Watcher() { 37 38 public void process(org.apache.zookeeper.WatchedEvent event) { 39 System.out.println("event=" + event.toString()); 40 } 41 42 }; 43 44 // 初始化 ZooKeeper 實例 45 46 private void createZKInstance() throws IOException { 47 zk = new ZooKeeper("192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181", ZookeeperTest.SESSION_TIMEOUT, 48 this.wh); 49 } 50 51 private void ZKOperations() throws IOException, InterruptedException, KeeperException { 52 53 System.out.println("\n 創建 ZooKeeper 節點 (znode : zoo2, 數據: myData2 ,權限: OPEN_ACL_UNSAFE ,節點類型: Persistent"); 54 zk.create("/zoo2", "myData2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 55 56 System.out.println("\n 查看是否創建成功: "); 57 System.out.println(new String(zk.getData("/zoo2", false, null))); 58 59 System.out.println("\n 修改節點數據 "); 60 zk.setData("/zoo2", "shenlan211314".getBytes(), -1); 61 62 System.out.println("\n 查看是否修改成功: "); 63 System.out.println(new String(zk.getData("/zoo2", false, null))); 64 65 System.out.println("\n 刪除節點 "); 66 zk.delete("/zoo2", -1); 67 68 System.out.println("\n 查看節點是否被刪除: "); 69 System.out.println(" 節點狀態: [" + zk.exists("/zoo2", false) + "]"); 70 71 } 72 73 private void ZKClose() throws InterruptedException { 74 zk.close(); 75 } 76 77 public static void main(String[] args) throws IOException, InterruptedException, KeeperException { 78 ZookeeperTest dm = new ZookeeperTest(); 79 dm.createZKInstance(); 80 dm.ZKOperations(); 81 dm.ZKClose(); 82 } 83 84 }
相關Maven依賴:
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency>
3、Zookeeper dashboard
GitHub Zookeeper UI Dashboard
4、進階
4.1 數據及日志維護
Zookeeper的數據文件存放在配置中指定的dataDir中,每個數據文件名都以snapshot開頭,每個數據文件為zookeeper某個時刻數據全量快照。在zookeeper中,對數據的更新操作,包括創建節點、更新節點內容、刪除節點都會記錄事務日志;客戶端對ZK的更新操作都是永久的,不可回退的。為做到這點,ZK會將每次更新操作以事務日志的形式寫入磁盤,寫入成功后才會給予客戶端響應。zookeeper在完成若干次事務日志(snapCount)后會生成一次快照,把當前zk中的所有節點的狀態以文件的形式dump到硬盤中,生成一個snapshot文件。這里的事務次數是可以配置,默認是100000個。
Zookeeper的日志包括兩個部分,一部分是系統日志,另一部分是事務日志。
系統日志使用log4j進行管理,conf目錄中有一個log4j配置文件,該配置文件默認沒有打開滾動輸出,需要用戶自己配置,具體請參看log4j介紹。
事務日志默認存放在dataDir中,當然可以使用dataLogDir指定存放的位置。正常運行過程中,針對所有更新操作,在返回客戶端“更新成功”的響應前,ZK會確保已經將本次更新操作的事務日志寫到磁盤上,只有這樣,整個更新操作才會生效。每觸發一次數據快照,就會生成一個新的事務日志。
默認情況下,zk不會自動清理數據文件和日志文件,因此需要管理員自己清理。我們可以使用ZK的工具類PurgeTxnLog進行清理,當然,我們也可以寫腳本自己維護,同時可以使用工具慢慢清理,避免占用大量IO。清理腳本如下:
務必注意:
如果長時間不清理,切忌同一時間使用rm命令同時刪除大量文件,這樣會造成IO利用率瞬間飆升,zookeeper的連接會出現斷連或session超時,影響現網業務使用。
另外,對於每一個數據文件,它都是某一時刻的完整快照,我們可以定時將該文件備份,方便對數據進行還原或將zookeeper直接遷移到另外一個集群。
4.2 數據清理
具體詳見:http://nileader.blog.51cto.com/1381108/932156
4.3 可視化事務日志
具體詳見:http://nileader.blog.51cto.com/1381108/926753
Zookeeper運行久了產生的日志和事務數據非常大,在我們的實踐中甚至由於從沒清理導致磁盤滿了,雖然Zookeeper節點沒死,但ZK UI監控上顯示出“This instance is not serviceing requests”。由於Zookeeper不能提供服務,JStorm節點就死了,Kafka節點也無法提供服務(沒死),清理數據后就好了。
上述參考資料里列出了幾種方法,我們采取定時自動清理的做法(每次留10個),寫如下腳本,加入crontab定期執行:
#snapshot file dir dataDir=/usr/local/zookeeper/zookeeper-3.4.8/zk_data/version-2 #tran log dir dataLogDir=/usr/local/zookeeper/zookeeper-3.4.8/zk_data/version-2 #zk log dir #Leave 10 files count=10 count=$[$count+1] ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f
4.4 權限控制
具體參見:使用super身份對有權限的節點進行操作 和 ZooKeeper權限控制
4.5 Zookeeper應用場景及實踐
經典應用:
1)NameService
不多說,本身就是一個共享的樹形文件系統
2)配置管理
不同的process watch 自己的配置znode, 當配置變化可以收到通知
3)集群membership管理
集群節點下,每個server對應一個EPHEMERAL的節點,節點名就是server name/ IP,當它和zookeeper的心跳斷了,對應的節點被刪除,並且所有集群內的節點可以收到新的child List,保證每個server都知道進群內的其他server
4) master選舉
和3)類似,但是節點換成EPHEMERAL_SEQUENTIAL的,大家都約定以當前編號最小的server做為master。當master掛了,每個server都會被通知到一份新的server 列表,大家都把編號最小的那個作為master
5) 分布式獨占鎖
類似Java synchronized的語義 在約定的鎖節點上創建EPHEMERAL_SEQUENTIAL節點作為等待隊列,如果編號最小的就是自己就獲得鎖,否則wait()住。獲得鎖的process通過刪除自己的節點釋放鎖,這樣每個等待的process會得到通知,在事件處理函數里判斷自己是不是最小的編號,如果是則喚醒之前wait住的線程,這樣本進程就獲得了鎖
6)barrier屏障
意思是所有相關的process都到達齊了再往下執行。實現方法,也是在barrier節點創建EPHEMERAL_SEQUENTIAL的節點,每個process都會得到變化后的children list,當大小為barrier要求數的時候,各個process繼續執行。 zookeeper作為分布式process協調的中介,本身是一個單點failure點,所以它本身也做為一個集群,通常部署2n + 1台, 客戶端連接不同的zookeeper server,但是得到視圖是一樣的,這點是zookeeper內部保證的
Zookeeper客戶端主要有原生客戶端、zclient、Apache Curator等。
以下代碼使用Zookeeper原生的客戶端(Zookeeper源碼里),推薦使用更方便的客戶端 Curator
4.5.1 配置管理

1 package cn.edu.buaa.act.test.TestZookeeper; 2 3 import java.io.IOException; 4 5 import org.apache.zookeeper.CreateMode; 6 import org.apache.zookeeper.KeeperException; 7 import org.apache.zookeeper.Watcher; 8 import org.apache.zookeeper.ZooKeeper; 9 import org.apache.zookeeper.ZooDefs.Ids; 10 11 /** 12 * <h1>配置管理</h1><br/> 13 * Zookeeper很容易實現集中式的配置管理:<br/> 14 * 15 * <p> 16 * 比如將APP1的所有配置配置到/APP1 17 * node下,APP1所有機器一啟動就對/APP1這個節點進行監控(zk.exist("/APP1",true)),並且實現回調方法Watcher. 18 * </p> 19 * 20 * <p> 21 * 那么在zookeeper上/APP1 22 * znode節點下數據發生變化的時候,每個機器都會收到通知,Watcher方法將會被執行,那么應用再取下數據即可(zk.getData("/APP1", 23 * false,null)); 24 * </p> 25 * 26 * @author zsm Email: qzdhzsm@163.com 27 * @date Sep 13, 2016 8:31:08 PM 28 * @version 1.0 29 * @parameter 30 * @since 31 * @return 32 * 33 */ 34 public class ManageConfiguration { 35 private static final int SESSION_TIMEOUT = 30000; 36 private static final String zookeeperAddr = "192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181"; 37 private static final String znodePath = "/zsm_dbIP"; 38 39 private ZooKeeper zk; 40 41 public ManageConfiguration() throws KeeperException, InterruptedException, IOException { 42 Watcher wh = new Watcher() { 43 public void process(org.apache.zookeeper.WatchedEvent event) { 44 try { 45 System.out.println("db ip changed,new value:" + new String(zk.getData(znodePath, true, null))); 46 } catch (KeeperException e) { 47 // TODO Auto-generated catch block 48 e.printStackTrace(); 49 } catch (InterruptedException e) { 50 // TODO Auto-generated catch block 51 e.printStackTrace(); 52 } 53 } 54 }; 55 56 // 初始化 ZooKeeper 實例,初始化時提供的Watcher就會觸發一次 57 zk = new ZooKeeper(zookeeperAddr, SESSION_TIMEOUT, wh); 58 59 { 60 // 為了確保節點存在,不存在則先創建 61 if (zk.exists(znodePath, false) == null) { 62 zk.create(znodePath, "192.168.0.7".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 63 System.out.println("節點不存在,新建節點:" + znodePath + "=" + new String(zk.getData(znodePath, true, null))); 64 } 65 } 66 67 } 68 69 public static void main(String[] args) throws InterruptedException, KeeperException, IOException { 70 new ManageConfiguration(); 71 while (true) { 72 ; 73 } 74 } 75 }
4.5.2 集群管理

1 package cn.edu.buaa.act.test.TestZookeeper; 2 3 import java.io.IOException; 4 5 import org.apache.zookeeper.CreateMode; 6 import org.apache.zookeeper.KeeperException; 7 import org.apache.zookeeper.Watcher; 8 import org.apache.zookeeper.ZooDefs.Ids; 9 import org.apache.zookeeper.ZooKeeper; 10 11 /** 12 * <h1>集群管理</h1> <br/> 13 * 14 * <pre> 15 * 在 Zookeeper 上創建一個 EPHEMERAL 類型的節點,然后每個 Server 在它們創建目錄節點的父目錄節點上調用getChildren(String path, boolean watch) 方法並設置 watch 為 true 16 * </pre> 17 * 18 * <pre> 19 * 由於是 EPHEMERAL 目錄節點,當創建它的 Server 死去,這個目錄節點也隨之被刪除,所以 Children 將會變化,這時 20 * getChildren上的 Watch 將會被調用,所以其它 Server 就知道已經有某台 Server 死去了。新增 Server 同理。 21 * </pre> 22 * 23 * <pre> 24 * 若創建的是EPHEMERAL_SEQUENTIAL 節點,節點名會自動被append上一個唯一編號。可以在節點加入或刪除觸發wh時取編號最小的作為Leader,實現master選舉 25 * </pre> 26 * 27 * @author zsm 28 * @date Sep 13, 2016 8:31:08 PM 29 * @version 1.0 30 * @parameter 31 * @since 32 * @return 33 */ 34 public class ManageCluster { 35 private static final int SESSION_TIMEOUT = 30000; 36 private static final String zookeeperAddr = "192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181"; 37 private static final String znodePath = "/zsm_Nodes"; 38 39 private ZooKeeper zk; 40 41 public ManageCluster(String thisNodeID) throws IOException, KeeperException, InterruptedException { 42 43 Watcher wh = new Watcher() { 44 public void process(org.apache.zookeeper.WatchedEvent event) { 45 // System.out.println("*********wh1:event=" + event.toString() + 46 // " ********"); 47 try { 48 System.out.println("節點變化,現有所有節點:" + zk.getChildren(znodePath, true)); 49 } catch (KeeperException e) { 50 // TODO Auto-generated catch block 51 e.printStackTrace(); 52 } catch (InterruptedException e) { 53 // TODO Auto-generated catch block 54 e.printStackTrace(); 55 } 56 } 57 }; 58 59 // 初始化 ZooKeeper 實例,初始化時提供的Watcher就會觸發一次 60 zk = new ZooKeeper(zookeeperAddr, SESSION_TIMEOUT, wh); 61 62 {// 根節點不存在的話創建PERSISTENT根節點 63 if (zk.exists(znodePath, null) == null) { 64 zk.create(znodePath, "root node".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 65 } 66 } 67 68 // 創建EPHEMERAL_SEQUENTIAL節點 69 zk.create(znodePath + "/" + thisNodeID, thisNodeID.getBytes(), Ids.OPEN_ACL_UNSAFE, 70 CreateMode.EPHEMERAL_SEQUENTIAL); 71 72 // 在父節點getChildren,true 73 zk.getChildren(znodePath, true); 74 75 } 76 77 public static void main(String[] args) throws IOException, KeeperException, InterruptedException { 78 // TODO Auto-generated method stub 79 if (args.length < 0) { 80 System.out.println("args length is <0."); 81 System.exit(1); 82 } 83 new ManageCluster(args[0]); 84 while (true) { 85 ; 86 } 87 } 88 89 }
4.5.3 分布式同步鎖、隊列等

1 package cn.edu.buaa.act.test.TestZookeeper; 2 3 import java.io.IOException; 4 import java.util.Arrays; 5 import java.util.List; 6 7 import org.apache.zookeeper.CreateMode; 8 import org.apache.zookeeper.KeeperException; 9 import org.apache.zookeeper.Watcher; 10 import org.apache.zookeeper.ZooDefs.Ids; 11 import org.apache.zookeeper.ZooKeeper; 12 13 /** 14 * <pre> 15 * 分布式共享鎖,與通過Zookeeper管理Cluster類似,只是每次觸發Watcher后的操作,取現有所有節點后有下一步操作: 若最小節點等於當前節點, 16 * 則獲得鎖. 17 * 18 * <pre/> 19 * 20 * <pre> 21 * 如果所有節點的nodeid都一樣,則各節點優先級一樣,先進先得到鎖;若不同節點的nodeid不能一樣.則nodeid越小優先級越大 22 * 23 * <pre/> 24 * 25 * @author zsm 26 * @date 2016年9月18日 下午10:15:24 27 * @version 1.0 28 * @parameter 29 * @since 30 * @return 31 */ 32 public class T3_ManageLocks { 33 private static final int SESSION_TIMEOUT = 5000; 34 private static final String zookeeperAddr = "192.168.6.131:2181,192.168.6.132:2181,192.168.6.133:2181"; 35 private static final String znodePath = "/zsm_Locks"; 36 37 private ZooKeeper zk; 38 private static String thisNodeID; 39 40 public T3_ManageLocks(String thisNodeID) throws IOException, KeeperException, InterruptedException { 41 T3_ManageLocks.thisNodeID = thisNodeID; 42 43 Watcher wh = new Watcher() { 44 public void process(org.apache.zookeeper.WatchedEvent event) { 45 // System.out.println("*********wh1:event=" + event.toString() + 46 // " ********"); 47 try { 48 List<String> nodeListTmp = zk.getChildren(znodePath, true); 49 System.out.println("節點變化,現有所有節點:" + nodeListTmp); 50 51 String[] nodeList = nodeListTmp.toArray(new String[nodeListTmp.size()]); 52 Arrays.sort(nodeList); 53 54 String nodeIdWithLock = new String(zk.getData(znodePath + "/" + nodeList[0], null, null)); 55 System.out.println("curnode: " + T3_ManageLocks.thisNodeID + " node with lock: " + nodeIdWithLock); 56 if (nodeIdWithLock.equals(T3_ManageLocks.thisNodeID)) { 57 System.out.println("當前節點 " + nodeIdWithLock + " 獲得鎖"); 58 // do something then System.exit(0); 59 } 60 61 } catch (KeeperException e) { 62 // TODO Auto-generated catch block 63 e.printStackTrace(); 64 } catch (InterruptedException e) { 65 // TODO Auto-generated catch block 66 e.printStackTrace(); 67 } 68 } 69 }; 70 71 // 初始化 ZooKeeper 實例,初始化時提供的Watcher就會觸發一次 72 zk = new ZooKeeper(zookeeperAddr, SESSION_TIMEOUT, wh); 73 74 {// 根節點不存在的話創建PERSISTENT根節點 75 if (zk.exists(znodePath, null) == null) { 76 zk.create(znodePath, "root node".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 77 } 78 } 79 80 // 創建EPHEMERAL_SEQUENTIAL節點 81 zk.create(znodePath + "/" + thisNodeID, thisNodeID.getBytes(), Ids.OPEN_ACL_UNSAFE, 82 CreateMode.EPHEMERAL_SEQUENTIAL); 83 84 // 在父節點getChildren,true 85 // zk.getChildren(znodePath, true); 86 87 } 88 89 public static void main(String[] args) throws IOException, KeeperException, InterruptedException { 90 // TODO Auto-generated method stub 91 if (args.length < 0) { 92 System.out.println("args length is <0."); 93 System.exit(1); 94 } 95 new T3_ManageLocks(args[0]); 96 while (true) { 97 ; 98 } 99 } 100 }
5、參考資料
1、http://blog.csdn.net/csfreebird/article/details/43984425(安裝)
2、http://www.cnblogs.com/yuyijq/p/3424473.html(Zookeeper系列)
3、http://www.blogjava.net/BucketLi/archive/2010/12/21/341268.html(使用和原理探究)
4、http://data.qq.com/article?id=2863(全。騰訊大數據之Zookeeper運營經驗分享)