針對ZooKeeper的會話創建,節點創建、刪除,數據讀取、更新,權限控制等API進行簡單的驗證。
1、新建maven工程,添加依賴
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> <scope>test</scope> </dependency>
2、新建單元測試進行驗證
/** * 軟件版權:流沙~~ * 修改日期 修改人員 修改說明 * ========= =========== ===================== * 2020/1/14 liusha 新增 * ========= =========== ===================== */ package com.sand.zookeeper; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.junit.Test; import java.util.List; import java.util.concurrent.CountDownLatch; /** * 功能說明:ZooKeeper Api 測試類 * 開發人員:@author liusha * 開發日期:2020/1/14 9:58 * 功能描述:會話創建,節點創建、刪除,數據讀取、更新,權限控制等 */ public class ZooKeeperApi implements Watcher { private static Stat stat = new Stat(); private static ZooKeeper zooKeeper = null; private static final String host = "127.0.0.1:2181"; private static CountDownLatch countDownLatch = new CountDownLatch(1); private static final String hosts = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"; /** * ZooKeeper CreateMode節點類型說明: * 1.PERSISTENT:持久型 * 2.PERSISTENT_SEQUENTIAL:持久順序型 * 3.EPHEMERAL:臨時型 * 4.EPHEMERAL_SEQUENTIAL:臨時順序型 * <p> * 1、2種類型客戶端斷開后不會消失 * 3、4種類型客戶端斷開后超時時間內沒有新的連接節點將會消失 */ /** * ZooKeeper ZooDefs.Ids權限類型說明: * OPEN_ACL_UNSAFE:完全開放的ACL,任何連接的客戶端都可以操作該屬性znode * CREATOR_ALL_ACL:只有創建者才有ACL權限 * READ_ACL_UNSAFE:只能讀取ACL */ /** * ZooKeeper EventType事件類型說明: * NodeCreated:節點創建 * NodeDataChanged:節點的數據變更 * NodeChildrenChanged:子節點的數據變更 * NodeDeleted:子節點刪除 */ /** * ZooKeeper KeeperState狀態類型說明: * Disconnected:連接失敗 * SyncConnected:連接成功 * AuthFailed:認證失敗 * Expired:會話過期 * None:初始狀態 */ /** * 接收事件通知 * * @param event 事件通知 */ @Override public void process(WatchedEvent event) { System.out.println("Receive WatchedEvent:" + event); try { if (Event.KeeperState.SyncConnected == event.getState()) { System.out.println("通知:會話連接成功"); if (Event.EventType.None == event.getType() && null == event.getPath()) { System.out.println("進入會話初始狀態"); // 釋放所有等待的線程 countDownLatch.countDown(); } else if (event.getType() == Event.EventType.NodeCreated) { System.out.println("節點創建通知:" + event.getPath()); zooKeeper.exists(event.getPath(), true); } else if (event.getType() == Event.EventType.NodeDataChanged) { System.out.println("節點的數據變更通知:" + new String(zooKeeper.getData(event.getPath(), true, stat))); System.out.println("czxid=" + stat.getCzxid() + ",mzxid=" + stat.getMzxid() + ",version=" + stat.getVersion()); zooKeeper.exists(event.getPath(), true); } else if (event.getType() == Event.EventType.NodeChildrenChanged) { System.out.println("子節點的數據變更通知:" + zooKeeper.getChildren(event.getPath(), true)); zooKeeper.exists(event.getPath(), true); } else if (event.getType() == Event.EventType.NodeDeleted) { System.out.println("節點刪除通知:" + event.getPath()); zooKeeper.exists(event.getPath(), true); } else { System.out.println("未知事件通知類型:" + event.getType()); zooKeeper.exists(event.getPath(), true); } } else if (Event.KeeperState.Disconnected == event.getState()) { System.out.println("通知:會話連接失敗"); } else if (Event.KeeperState.AuthFailed == event.getState()) { System.out.println("通知:會話認證失敗"); } else if (Event.KeeperState.Expired == event.getState()) { System.out.println("通知:會話過期"); } else { System.out.println("未知的通知狀態:" + event.getState()); } } catch (Exception e) { e.printStackTrace(); } } /** * 創建會話(最基礎的實例) * * @throws Exception Exception */ @Test public void constructor_usage_simple() throws Exception { zooKeeper = new ZooKeeper(hosts, 5000, new ZooKeeperApi()); System.out.println("ZooKeeper.state:" + zooKeeper.getState()); // 所有線程執行完畢 countDownLatch.await(); System.out.println("ZooKeeper session會話創建完成。"); } /** * 創建會話(可復用sessionId的實例) * * @throws Exception Exception */ @Test public void constructor_usage_SID_PWD() throws Exception { zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi()); System.out.println("ZooKeeper.state:" + zooKeeper.getState()); // 所有線程執行完畢 countDownLatch.await(); long sessionId = zooKeeper.getSessionId(); byte[] sessionPasswd = zooKeeper.getSessionPasswd(); System.out.println(String.format("首次獲取sessionId:%s,sessionPasswd:%s", sessionId, sessionPasswd)); // 使用不正確的sessionId zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi(), 1L, "123".getBytes()); System.out.println("ZooKeeper.state err session:" + zooKeeper.getState()); // 使用正確的sessionId zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi(), sessionId, sessionPasswd); System.out.println("ZooKeeper.state session:" + zooKeeper.getState()); Thread.sleep(Integer.MAX_VALUE); } /** * 創建節點(同步) * * @throws Exception Exception */ @Test public void create_API_sync() throws Exception { String path = "/zk-create-znode-test-"; zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi()); System.out.println("ZooKeeper.state:" + zooKeeper.getState()); // 所有線程執行完畢 countDownLatch.await(); String path1 = zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("節點創建成功:" + path1); String path2 = zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("節點創建成功:" + path2); } /** * 創建節點(異步) * 同步接口創建節點時需要考慮接口拋出異常的情況, * 異步接口的異常體現在回調函數的ResultCode響應碼中,比同步接口更健壯。 * * @throws Exception Exception */ @Test public void create_API_async() throws Exception { String path = "/zk-create-znode-test-"; zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi()); System.out.println("ZooKeeper.state:" + zooKeeper.getState()); // 所有線程執行完畢 countDownLatch.await(); zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new CreateCallBack(), "ZooKeeper async create znode."); zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new CreateCallBack(), "ZooKeeper async create znode."); zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new CreateCallBack(), "ZooKeeper async create znode."); Thread.sleep(Integer.MAX_VALUE); } /** * 創建節點異步回調 */ class CreateCallBack implements AsyncCallback.StringCallback { /** * @param rc 服務端響應碼 0:接口調用成功,-4:客戶端與服務端連接已斷開,-110:指定節點已存在,-112:會話已過期 * @param path 調用接口時傳入的節點路徑(原樣輸出) * @param ctx 調用接口時傳入的ctx值(原樣輸出) * @param name 實際在服務端創建的節點名 */ @Override public void processResult(int rc, String path, Object ctx, String name) { System.out.println("創建結果:rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",name=" + name); switch (rc) { case 0: System.out.println("節點創建成功:" + name); break; case -4: System.out.println("客戶端與服務端連接已斷開"); break; case -110: System.out.println("指定節點已存在"); break; case -112: System.out.println("會話已過期"); break; default: System.out.println("服務端響應碼" + rc + "未知"); break; } } } /** * 刪除節點(同步) * 注:只允許刪除葉子節點,不能直接刪除根節點 * * @throws Exception Exception */ @Test public void delete_API_sync() throws Exception { String path = "/zk-delete-znode-test"; zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi()); System.out.println("ZooKeeper state:" + zooKeeper.getState()); // 所有線程執行完畢 countDownLatch.await(); zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zooKeeper.delete(path, -1); Thread.sleep(Integer.MAX_VALUE); } /** * 刪除節點(異步) * 注:只允許刪除葉子節點,不能直接刪除根節點 * * @throws Exception Exception */ @Test public void delete_API_async() throws Exception { String path = "/zk-delete-znode-test"; zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi()); System.out.println("ZooKeeper state:" + zooKeeper.getState()); // 所有線程執行完畢 countDownLatch.await(); zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zooKeeper.delete(path, -1, new DeleteCallBack(), "ZooKeeper async delete znode"); Thread.sleep(Integer.MAX_VALUE); } /** * 刪除節點異步回調 */ class DeleteCallBack implements AsyncCallback.VoidCallback { /** * @param rc 服務端響應碼 0:接口調用成功,-4:客戶端與服務端連接已斷開,-110:指定節點已存在,-112:會話已過期 * @param path 調用接口時傳入的節點路徑(原樣輸出) * @param ctx 調用接口時傳入的ctx值(原樣輸出) */ @Override public void processResult(int rc, String path, Object ctx) { System.out.println("刪除結果:rc=" + rc + ",path=" + path + ",ctx=" + ctx); switch (rc) { case 0: System.out.println("節點刪除成功"); break; case -4: System.out.println("客戶端與服務端連接已斷開"); break; case -112: System.out.println("會話已過期"); break; default: System.out.println("服務端響應碼" + rc + "未知"); break; } } } /** * 獲取子節點(同步) * * @throws Exception Exception */ @Test public void getChildren_API_sync() throws Exception { String path = "/zk-getChildren-sync-test"; zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi()); System.out.println("ZooKeeper state:" + zooKeeper.getState()); // 所有線程執行完畢 countDownLatch.await(); zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zooKeeper.create(path + "/children1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); List<String> childrenList = zooKeeper.getChildren(path, true); System.out.println("獲取子節點:" + childrenList); zooKeeper.create(path + "/children2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); Thread.sleep(Integer.MAX_VALUE); } /** * 獲取子節點(異步) * * @throws Exception Exception */ @Test public void getChildren_API_async() throws Exception { String path = "/zk-getChildren-async-test"; zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi()); System.out.println("ZooKeeper state:" + zooKeeper.getState()); // 所有線程執行完畢 countDownLatch.await(); zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zooKeeper.create(path + "/children1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zooKeeper.getChildren(path, true, new ChildrenCallBack(), "異步獲取子節點"); zooKeeper.create(path + "/children2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); Thread.sleep(Integer.MAX_VALUE); } /** * 獲取子節點異步回調 */ class ChildrenCallBack implements AsyncCallback.Children2Callback { /** * @param rc 服務端響應碼 0:接口調用成功,-4:客戶端與服務端連接已斷開,-110:指定節點已存在,-112:會話已過期 * @param path 調用接口時傳入的節點路徑(原樣輸出) * @param ctx 調用接口時傳入的ctx值(原樣輸出) * @param childrenList 子節點列表 * @param stat 節點狀態,由服務器端響應的新stat替換 */ @Override public void processResult(int rc, String path, Object ctx, List<String> childrenList, Stat stat) { System.out.println("獲取結果:rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",childrenList=" + childrenList + ",stat=" + stat); switch (rc) { case 0: System.out.println("子節點獲取成功:" + childrenList); break; case -4: System.out.println("客戶端與服務端連接已斷開"); break; case -112: System.out.println("會話已過期"); break; default: System.out.println("服務端響應碼" + rc + "未知"); break; } } } /** * 獲取節點數據(同步) * 更新節點數據(同步) * * @throws Exception Exception */ @Test public void getData_API_sync() throws Exception { String path = "/zk-getData-sync-test"; zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi()); System.out.println("ZooKeeper state:" + zooKeeper.getState()); // 所有線程執行完畢 countDownLatch.await(); zooKeeper.create(path, "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("節點數據:" + new String(zooKeeper.getData(path, true, stat))); System.out.println("czxid=" + stat.getCzxid() + ",mzxid=" + stat.getMzxid() + ",version=" + stat.getVersion()); zooKeeper.setData(path, "test".getBytes(), -1); Thread.sleep(Integer.MAX_VALUE); } /** * 獲取節點數據(異步) * 更新節點數據(同步) * * @throws Exception Exception */ @Test public void getData_API_async() throws Exception { String path = "/zk-getData-async-test"; zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi()); System.out.println("ZooKeeper state:" + zooKeeper.getState()); // 所有線程執行完畢 countDownLatch.await(); zooKeeper.create(path, "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zooKeeper.getData(path, true, new DataCallBack(), "異步獲取節點數據"); System.out.println("czxid=" + stat.getCzxid() + ",mzxid=" + stat.getMzxid() + ",version=" + stat.getVersion()); Stat stat1 = zooKeeper.setData(path, "test".getBytes(), -1); System.out.println("czxid=" + stat1.getCzxid() + ",mzxid=" + stat1.getMzxid() + ",version=" + stat1.getVersion()); Stat stat2 = zooKeeper.setData(path, "test123".getBytes(), stat1.getVersion()); System.out.println("czxid=" + stat2.getCzxid() + ",mzxid=" + stat2.getMzxid() + ",version=" + stat2.getVersion()); try { zooKeeper.setData(path, "test123456".getBytes(), stat1.getVersion()); } catch (KeeperException e) { System.out.println("Error Code:" + e.code() + "," + e.getMessage()); } Thread.sleep(Integer.MAX_VALUE); } /** * 獲取節點數據異步回調 */ class DataCallBack implements AsyncCallback.DataCallback { /** * @param rc 服務端響應碼 0:接口調用成功,-4:客戶端與服務端連接已斷開,-110:指定節點已存在,-112:會話已過期 * @param path 調用接口時傳入的節點路徑(原樣輸出) * @param ctx 調用接口時傳入的ctx值(原樣輸出) * @param data 節點數據 * @param stat 節點狀態,由服務器端響應的新stat替換 */ @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { System.out.println("獲取結果:rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",data=" + new String(data) + ",stat=" + stat); System.out.println("czxid=" + stat.getCzxid() + ",mzxid=" + stat.getMzxid() + ",version=" + stat.getVersion()); switch (rc) { case 0: System.out.println("節點數據獲取成功:" + new String(data)); break; case -4: System.out.println("客戶端與服務端連接已斷開"); break; case -112: System.out.println("會話已過期"); break; default: System.out.println("服務端響應碼" + rc + "未知"); break; } } } /** * 更新節點數據(異步) * * @throws Exception Exception */ @Test public void setData_API_async() throws Exception { String path = "/zk-setData-test"; zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi()); System.out.println("ZooKeeper state:" + zooKeeper.getState()); // 所有線程執行完畢 countDownLatch.await(); zooKeeper.exists(path, true); zooKeeper.create(path, "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zooKeeper.setData(path, "test123456".getBytes(), -1, new StatCallBack(), "異步更新節點數據"); Thread.sleep(Integer.MAX_VALUE); } /** * 更新節點數據異步回調 */ class StatCallBack implements AsyncCallback.StatCallback { /** * @param rc 服務端響應碼 0:接口調用成功,-4:客戶端與服務端連接已斷開,-110:指定節點已存在,-112:會話已過期 * @param path 調用接口時傳入的節點路徑(原樣輸出) * @param ctx 調用接口時傳入的ctx值(原樣輸出) * @param stat 節點狀態,由服務器端響應的新stat替換 */ @Override public void processResult(int rc, String path, Object ctx, Stat stat) { System.out.println("更新結果:rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",stat=" + stat); System.out.println("czxid=" + stat.getCzxid() + ",mzxid=" + stat.getMzxid() + ",version=" + stat.getVersion()); switch (rc) { case 0: System.out.println("節點數據設置成功"); break; case -4: System.out.println("客戶端與服務端連接已斷開"); break; case -112: System.out.println("會話已過期"); break; default: System.out.println("服務端響應碼" + rc + "未知"); break; } } } /** * 權限控制 * * @throws Exception Exception */ @Test public void auth_control_API() throws Exception { String path = "/zk-setData-test"; zooKeeper = new ZooKeeper(host, 5000, new ZooKeeperApi()); zooKeeper.addAuthInfo("digest", "zoo:true".getBytes()); zooKeeper.create(path, "init".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL); // 1)無權限信息訪問 // ZooKeeper zooKeeper1 = new ZooKeeper(host, 5000, new ZooKeeperApi()); // System.out.println("訪問結果:" + new String(zooKeeper1.getData(path, true, stat))); // 2)錯誤權限信息訪問 // ZooKeeper zooKeeper2 = new ZooKeeper(host, 5000, new ZooKeeperApi()); // zooKeeper2.addAuthInfo("digest", "zoo:false".getBytes()); // System.out.println("訪問結果:" + new String(zooKeeper2.getData(path, true, stat))); // 3)正確權限信息訪問 ZooKeeper zooKeeper3 = new ZooKeeper(host, 5000, new ZooKeeperApi()); zooKeeper3.addAuthInfo("digest", "zoo:true".getBytes()); System.out.println("訪問結果:" + new String(zooKeeper3.getData(path, true, stat))); Thread.sleep(Integer.MAX_VALUE); } }