1. 概述
Zookeeper是Hadoop的一個子項目,它是分布式系統中的協調系統,可提供的服務主要有:配置服務、名字服務、分布式同步、組服務等。
它有如下的一些特點:
- 簡單
Zookeeper的核心是一個精簡的文件系統,它支持一些簡單的操作和一些抽象操作,例如,排序和通知。
- 豐富
Zookeeper的原語操作是很豐富的,可實現一些協調數據結構和協議。例如,分布式隊列、分布式鎖和一組同級別節點中的“領導者選舉”。
- 高可靠
Zookeeper支持集群模式,可以很容易的解決單點故障問題。
- 松耦合交互
不同進程間的交互不需要了解彼此,甚至可以不必同時存在,某進程在zookeeper中留下消息后,該進程結束后其它進程還可以讀這條消息。
- 資源庫
Zookeeper實現了一個關於通用協調模式的開源共享存儲庫,能使開發者免於編寫這類通用協議。
2. ZooKeeper的安裝
- 獨立模式安裝
Zookeeper的運行環境是需要java的,建議安裝oracle的java6.
可去官網下載一個穩定的版本,然后進行安裝:http://zookeeper.apache.org/
解壓后在zookeeper的conf目錄下創建配置文件zoo.cfg,里面的配置信息可參考統計目錄下的zoo_sample.cfg文件,我們這里配置為:
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/opt/zookeeper-data/ clientPort=2181
tickTime:指定了ZooKeeper的基本時間單位(以毫秒為單位);
initLimit:指定了啟動zookeeper時,zookeeper實例中的隨從實例同步到領導實例的初始化連接時間限制,超出時間限制則連接失敗(以tickTime為時間單位);
syncLimit:指定了zookeeper正常運行時,主從節點之間同步數據的時間限制,若超過這個時間限制,那么隨從實例將會被丟棄;
dataDir:zookeeper存放數據的目錄;
clientPort:用於連接客戶端的端口。
- 啟動一個本地的ZooKeeper實例
% zkServer.sh start
檢查ZooKeeper是否正在運行
echo ruok | nc localhost 2181
若是正常運行的話會打印“imok”。
3. ZooKeeper監控
- 遠程JMX配置
默認情況下,zookeeper是支持本地的jmx監控的。若需要遠程監控zookeeper,則需要進行進行如下配置。
默認的配置有這么一行:
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
咱們在$JMXLOCALONLY后邊添加jmx的相關參數配置:
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY -Djava.rmi.server.hostname=192.168.1.8 -Dcom.sun.management.jmxremote.port=1911 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false org.apache.zookeeper.server.quorum.QuorumPeerMain"
這樣就可以遠程監控了,可以用jconsole.exe或jvisualvm.exe等工具對其進行監控。
- 身份驗證
這里沒有配置驗證信息,如果需要請參見我的博文jvisualvm遠程監控tomcat:http://www.cnblogs.com/leocook/p/jvisualvmandtomcat.html
4. Zookeeper的存儲模型
Zookeeper的數據存儲采用的是結構化存儲,結構化存儲是沒有文件和目錄的概念,里邊的目錄和文件被抽象成了節點(node),zookeeper里可以稱為znode。Znode的層次結構如下圖:
最上邊的是根目錄,下邊分別是不同級別的子目錄。
5. Zookeeper客戶端的使用
- zkCli.sh
可使用./zkCli.sh -server localhost來連接到Zookeeper服務上。
使用ls /可查看根節點下有哪些子節點,可以雙擊Tab鍵查看更多命令。
- Java客戶端
可創建org.apache.zookeeper.ZooKeeper對象來作為zk的客戶端,注意,java api里創建zk客戶端是異步的,為防止在客戶端還未完成創建就被使用的情況,這里可以使用同步計時器,確保zk對象創建完成再被使用。
- C客戶端
可以使用zhandle_t指針來表示zk客戶端,可用zookeeper_init方法來創建。可在ZK_HOME\src\c\src\ cli.c查看部分示例代碼。
6. Zookeeper創建Znode
Znode有兩種類型:短暫的和持久的。短暫的znode在創建的客戶端與服務器端斷開(無論是明確的斷開還是故障斷開)連接時,該znode都會被刪除;相反,持久的znode則不會。

public class CreateGroup implements Watcher{ private static final int SESSION_TIMEOUT = 1000;//會話延時 private ZooKeeper zk = null; private CountDownLatch countDownLatch = new CountDownLatch(1);//同步計數器 public void process(WatchedEvent event) { if(event.getState() == KeeperState.SyncConnected){ countDownLatch.countDown();//計數器減一 } } /** * 創建zk對象 * 當客戶端連接上zookeeper時會執行process(event)里的countDownLatch.countDown(),計數器的值變為0,則countDownLatch.await()方法返回。 * @param hosts * @throws IOException * @throws InterruptedException */ public void connect(String hosts) throws IOException, InterruptedException { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); countDownLatch.await();//阻塞程序繼續執行 } /** * 創建group * * @param groupName 組名 * @throws KeeperException * @throws InterruptedException */ public void create(String groupName) throws KeeperException, InterruptedException { String path = "/" + groupName; String createPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE/*允許任何客戶端對該znode進行讀寫*/, CreateMode.PERSISTENT/*持久化的znode*/); System.out.println("Created " + createPath); } /** * 關閉zk * @throws InterruptedException */ public void close() throws InterruptedException { if(zk != null){ try { zk.close(); } catch (InterruptedException e) { throw e; }finally{ zk = null; System.gc(); } } } }
這里我們使用了同步計數器CountDownLatch,在connect方法中創建執行了zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);之后,下邊接着調用了CountDownLatch對象的await方法阻塞,因為這是zk客戶端不一定已經完成了與服務端的連接,在客戶端連接到服務端時會觸發觀察者調用process()方法,我們在方法里邊判斷一下觸發事件的類型,完成連接后計數器減一,connect方法中解除阻塞。
還有兩個地方需要注意:這里創建的znode的訪問權限是open的,且該znode是持久化存儲的。
測試類如下:
public class CreateGroupTest { private static String hosts = "192.168.1.8"; private static String groupName = "zoo"; private CreateGroup createGroup = null; /** * init * @throws InterruptedException * @throws KeeperException * @throws IOException */ @Before public void init() throws KeeperException, InterruptedException, IOException { createGroup = new CreateGroup(); createGroup.connect(hosts); } @Test public void testCreateGroup() throws KeeperException, InterruptedException { createGroup.create(groupName); } /** * 銷毀資源 */ @After public void destroy() { try { createGroup.close(); createGroup = null; System.gc(); } catch (InterruptedException e) { e.printStackTrace(); } } }
由於zk對象的創建和銷毀代碼是可以復用的,所以這里我們把它分裝成了接口:

/** * 連接的觀察者,封裝了zk的創建等 * @author leo * */ public class ConnectionWatcher implements Watcher { private static final int SESSION_TIMEOUT = 5000; protected ZooKeeper zk = null; private CountDownLatch countDownLatch = new CountDownLatch(1); public void process(WatchedEvent event) { KeeperState state = event.getState(); if(state == KeeperState.SyncConnected){ countDownLatch.countDown(); } } /** * 連接資源 * @param hosts * @throws IOException * @throws InterruptedException */ public void connection(String hosts) throws IOException, InterruptedException { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); countDownLatch.await(); } /** * 釋放資源 * @throws InterruptedException */ public void close() throws InterruptedException { if (null != zk) { try { zk.close(); } catch (InterruptedException e) { throw e; }finally{ zk = null; System.gc(); } } } }
7. Zookeeper刪除Znode
/** * 刪除分組 * @author leo * */ public class DeleteGroup extends ConnectionWatcher { public void delete(String groupName) { String path = "/" + groupName; try { List<String> children = zk.getChildren(path, false); for(String child : children){ zk.delete(path + "/" + child, -1); } zk.delete(path, -1);//版本號為-1, } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
zk.delete(path,version)方法的第二個參數是znode版本號,如果提供的版本號和znode版本號一致才會刪除這個znode,這樣可以檢測出對znode的修改沖突。通過將版本號設置為-1,可以繞過這個版本檢測機制,無論znode的版本號是什么,都會直接將其刪除。
測試類:

public class DeleteGroupTest { private static final String HOSTS = "192.168.1.137"; private static final String groupName = "zoo"; private DeleteGroup deleteGroup = null; @Before public void init() throws IOException, InterruptedException { deleteGroup = new DeleteGroup(); deleteGroup.connection(HOSTS); } @Test public void testDelete() throws IOException, InterruptedException, KeeperException { deleteGroup.delete(groupName); } @After public void destroy() throws InterruptedException { if(null != deleteGroup){ try { deleteGroup.close(); } catch (InterruptedException e) { throw e; }finally{ deleteGroup = null; System.gc(); } } } }
8. Zookeeper的相關操作
ZooKeeper中共有9中操作:
create:創建一個znode
delete:刪除一個znode
exists:測試一個znode
getACL,setACL:獲取/設置一個znode的ACL(權限控制)
getChildren:獲取一個znode的子節點
getData,setData:獲取/設置一個znode所保存的數據
sync:將客戶端的znode視圖與ZooKeeper同步
這里更新數據是必須要提供znode的版本號(也可以使用-1強制更新,這里可以執行前通過exists方法拿到znode的元數據Stat對象,然后從Stat對象中拿到對應的版本號信息),如果版本號不匹配,則更新會失敗。因此一個更新失敗的客戶端可以嘗試是否重試或執行其它操作。
9. ZooKeeper的API
ZooKeeper的api支持多種語言,在操作時可以選擇使用同步api還是異步api。同步api一般是直接返回結果,異步api一般是通過回調來傳送執行結果的,一般方法中有某參數是類AsyncCallback的內部接口,那么該方法應該就是異步調用,回調方法名為processResult。
10. 觀察觸發器
可以對客戶端和服務器端之間的連接設置觀察觸發器(后邊稱之為zookeeper的狀態觀察觸發器),也可以對znode設置觀察觸發器。
- 狀態觀察器
zk的整個生命周期如下:
可在創建zk對象時傳入一個觀察器,在完成CONNECTING狀態到CONNECTED狀態時,觀察器會觸發一個事件,該觸發的事件類型為NONE,通過event.getState()方法拿到事件狀態為SyncConnected。有一點需要注意的就是,在zk調用close方法時不會觸發任何事件,因為這類的顯示調用是開發者主動執行的,屬於可控的,不用使用事件通知來告知程序。這一塊在下篇博文還會詳細解說。
- 設置znode的觀察器
可以在讀操作exists、getChildren和getData上設置觀察,在執行寫操作create、delete和setData將會觸發觀察事件,當然,在執行寫的操作時,也可以選擇是否觸發znode上設置的觀察器,具體可查看相關的api。
當觀察的znode被創建、刪除或其數據被更新時,設置在exists上的觀察將會被觸發;
當觀察的znode被刪除或數據被更新時,設置在getData上的觀察將會被觸發;
當觀察的znode的子節點被創建、刪除或znode自身被刪除時,設置在getChildren上的觀察將會被觸發,可通過觀察事件的類型來判斷被刪除的是znode還是它的子節點。
對於NodeCreated和NodeDeleted根據路徑就能發現是哪個znode被寫;對於NodeChildrenChanged可根據getChildren來獲取新的子節點列表。
注意:在收到收到觸發事件到執行讀操作之間,znode的狀態可能會發生狀態,這點需要牢記。
至此,編寫簡單的zookeeper應該是可以的了,下篇博文咱們來深入探討zookeeper的相關知識。
參考地址:http://zookeeper.apache.org/doc/r3.4.6/
參考書籍:《hadoop權威指南》