package com.zookeeper.watcher; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; public class ZKClient implements Watcher{ private static final Logger logger = Logger.getLogger(ZKClient.class); //定義session失效時間 private static final int SESSION_TIMEOUT = 10000; //zookeeper服務器地址 private static final String ZOOKEEPER_ADDRESS = "192.168.56.1:2181"; //ZooKeeper變量 private ZooKeeper zk = null; //定義原子變量 AtomicInteger seq = new AtomicInteger(); //信號量設置,用於等待zookeeper連接建立之后,通知阻塞程序繼續向下執行 private CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) throws InterruptedException { String parentPath= "/test"; //父節點 String childrenPath = "/test/children"; //子節點 ZKClient test = new ZKClient(); //創建鏈接 test.createConnection(); boolean isSuccess = test.createNode(parentPath, "abc", Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); if (isSuccess) { //讀取數據 String result = test.getNodeData(parentPath, true); logger.info("更新前數據:" + result); //更新數據 isSuccess = test.updateNode(parentPath, String.valueOf(System.currentTimeMillis())); if(isSuccess){ logger.info("更新后數據:" + test.getNodeData(parentPath, true)); } // 創建子節點 isSuccess = test.createNode(childrenPath, String.valueOf(System.currentTimeMillis()), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); if(isSuccess){ test.updateNode(childrenPath, String.valueOf(System.currentTimeMillis())); } //讀取子節點 List<String> childrenList = test.getChildren(parentPath, true); if(childrenList!=null && !childrenList.isEmpty()){ for(String children : childrenList){ System.out.println("子節點:" + children); } } } Thread.sleep(1000); //創建臨時有序子節點 test.createNode(childrenPath, String.valueOf(System.currentTimeMillis()), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); test.createNode(childrenPath, String.valueOf(System.currentTimeMillis()), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); test.createNode(childrenPath, String.valueOf(System.currentTimeMillis()), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); // 讀取子節點,並刪除 List<String> childrenList = test.getChildren(parentPath, true); if (childrenList != null && !childrenList.isEmpty()) { for (String children : childrenList) { System.out.println("子節點:" + children); test.deleteNode(parentPath + "/" + children); } } //刪除父節點 if (test.exists(childrenPath, false) != null) { test.deleteNode(childrenPath); } //釋放鏈接 Thread.sleep(1000); test.releaseConnection(); } /** * 創建節點 * * @param path 節點路徑 * @param data 數據內容 * @param acl 訪問控制列表 * @param createMode znode創建類型 * @return */ public boolean createNode(String path, String data, List<ACL> acl, CreateMode createMode) { try { //設置監控(由於zookeeper的監控都是一次性的,所以每次必須設置監控) exists(path, true); String resultPath = this.zk.create(path, data.getBytes(), acl, createMode); logger.info(String.format("節點創建成功,path: %s,data: %s", resultPath, data)); } catch (Exception e) { logger.error("節點創建失敗", e); return false; } return true; } /** * 更新指定節點數據內容 * * @param path 節點路徑 * @param data 數據內容 * @return */ public boolean updateNode(String path, String data) { try { Stat stat = this.zk.setData(path, data.getBytes(), -1); logger.info("更新節點數據成功,path:" + path + ", stat: " + stat); } catch (Exception e) { logger.error("更新節點數據失敗", e); return false; } return true; } /** * 刪除指定節點 * * @param path * 節點path */ public void deleteNode(String path) { try { this.zk.delete(path, -1); logger.info("刪除節點成功,path:" + path); } catch (Exception e) { logger.error("刪除節點失敗", e); } } /** * 讀取節點數據 * * @param path 節點路徑 * @param needWatch 是否監控這個目錄節點,這里的 watcher是在創建ZooKeeper實例時指定的watcher * @return */ public String getNodeData(String path, boolean needWatch) { try { Stat stat = exists(path, needWatch); if(stat != null){ return new String(this.zk.getData(path, needWatch, stat)); } } catch (Exception e) { logger.error("讀取節點數據內容失敗", e); } return null; } /** * 獲取子節點 * * @param path 節點路徑 * @param needWatch 是否監控這個目錄節點,這里的 watcher是在創建ZooKeeper實例時指定的watcher * @return */ public List<String> getChildren(String path, boolean needWatch) { try { return this.zk.getChildren(path, needWatch); } catch (Exception e) { logger.error("獲取子節點失敗", e); return null; } } /** * 判斷znode節點是否存在 * * @param path 節點路徑 * @param needWatch 是否監控這個目錄節點,這里的 watcher是在創建ZooKeeper實例時指定的watcher * @return */ public Stat exists(String path, boolean needWatch) { try { return this.zk.exists(path, needWatch); } catch (Exception e) { logger.error("判斷znode節點是否存在發生異常", e); } return null; } /** * 創建ZK連接 * * @param connectAddr * @param sessionTimeout */ public void createConnection() { this.releaseConnection(); try { zk = new ZooKeeper(ZOOKEEPER_ADDRESS, SESSION_TIMEOUT, this); logger.info("開始連接ZK服務器..."); //zk連接未創建成功進行阻塞 connectedSemaphore.await(); } catch (Exception e) { logger.error("ZK連接創建失敗", e); } } /** * 關閉ZK連接 */ public void releaseConnection() { if (this.zk != null) { try { this.zk.close(); logger.info("ZK連接關閉成功"); } catch (InterruptedException e) { logger.error("ZK連接關閉失敗", e); } } } @Override public void process(WatchedEvent event) { logger.info("進入process()方法...event = " + event); if (event == null) { return; } KeeperState keeperState = event.getState(); // 連接狀態 EventType eventType = event.getType(); // 事件類型 String path = event.getPath(); // 受影響的path String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】"; logger.info(String.format("%s收到Watcher通知...", logPrefix)); logger.info(String.format("%s連接狀態:%s", logPrefix, keeperState)); logger.info(String.format("%s事件類型:%s", logPrefix, eventType)); logger.info(String.format("%s受影響的path:%s", logPrefix, path)); if (KeeperState.SyncConnected == keeperState) { if (EventType.None == eventType) { // 成功連接上ZK服務器 logger.info(logPrefix + "成功連接上ZK服務器..."); connectedSemaphore.countDown(); } else if (EventType.NodeCreated == eventType) { // 創建節點 logger.info(logPrefix + "節點創建"); this.exists(path, true); } else if (EventType.NodeDataChanged == eventType) { // 更新節點 logger.info(logPrefix + "節點數據更新"); logger.info(logPrefix + "數據內容: " + this.getNodeData(path, true)); } else if (EventType.NodeChildrenChanged == eventType) { // 更新子節點 logger.info(logPrefix + "子節點變更"); logger.info(logPrefix + "子節點列表:" + this.getChildren(path, true)); } else if (EventType.NodeDeleted == eventType) { // 刪除節點 logger.info(logPrefix + "節點 " + path + " 被刪除"); } } else if (KeeperState.Disconnected == keeperState) { logger.info(logPrefix + "與ZK服務器斷開連接"); } else if (KeeperState.AuthFailed == keeperState) { logger.info(logPrefix + "權限檢查失敗"); } else if (KeeperState.Expired == keeperState) { logger.info(logPrefix + "會話失效"); } } }