zk有四種節點類型:
持久節點,持久順序節點,臨時節點,臨時順序節點。
自定義監聽事件時,在節點的創建,修改,刪除的方法第一行都需要加入是否監聽的一個方法:
//開啟監聽的方法。第二個參數表示是否開啟監聽
zk.exists(path, true);
zk自定義監聽:
package com.kf.zkDemo; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * 自定義zk的事件通知 * @author kf * */ public class MyWatch implements Watcher{ //定義鏈接地址 private static String ADDRESS = "127.0.0.1:2181"; //超時時間 private static int TIMEOUT = 2000; ZooKeeper zk; //阻塞用戶線程,用戶必須等待連接成功 private CountDownLatch countDownLatch = new CountDownLatch(1); public void createZkConnection(String address, int timeout){ //第三個參數是事件通知。這里用的是本類,自定義的事件通知 try { zk = new ZooKeeper(address, timeout, this); countDownLatch.countDown(); } catch (IOException e) { e.printStackTrace(); } } /** * 創建節點內容 * @param path * @param data * @return */ public boolean createNode(String path, String data){ //第三個參數表示權限的,這里開放所有權限,不限制服務器 //第四個參數表示節點的類型。用的持久節點 try { //開啟監聽的方法。第二個參數表示是否開啟監聽 zk.exists(path, true); String result = zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("創建節點成功!節點為:"+path+",值為:"+data); System.out.println("創建結果為:"+result); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 修改節點內容 * @param path * @param data * @return */ public boolean updateNode(String path, String data){ //第三個參數表示權限的,這里開放所有權限,不限制服務器 //第四個參數表示節點的類型。用的持久節點 try { //開啟監聽的方法。第二個參數表示是否開啟監聽 zk.exists(path, true); //第三個參數表示版本號。 zk的數據版本默認從0開始,每次修改都會加1. -1嚴格來說屬於不合法的版本號。表示從最新版本進行更新 Stat result = zk.setData(path, data.getBytes(), -1); System.out.println("修改節點成功!節點為:"+path+",修改后為值為:"+data); System.out.println("修改節點成功,result:"+result); return true; } catch (Exception e) { e.printStackTrace(); return false; } } public boolean deleteNode(String path){ //第二個參數表示版本號 try { //開啟監聽的方法。第二個參數表示是否開啟監聽 zk.exists(path, true); //第二個參數表示版本號。 zk的數據版本默認從0開始,每次修改都會加1. -1嚴格來說屬於不合法的版本號。表示從最新版本進行更新 zk.delete(path, -1); return true; } catch (Exception e) { e.printStackTrace(); return false; } } public void process(WatchedEvent event) { System.out.println("事件通知開始前"); //事件路徑 String path = event.getPath(); //事件狀態 即 連接不連接 KeeperState state = event.getState(); //事件類型 EventType type = event.getType(); System.out.println("事件路徑"+path+",事件狀態"+state+",事件類型"+type); if(KeeperState.SyncConnected == state){ //事件類型 None表示是連接類型 if(EventType.None == type){ System.out.println("連接類型"); //連接上zk服務器后放開信號量 countDownLatch.countDown(); System.out.println("=====ZK連接成功====="); }else if(EventType.NodeCreated == type){ System.out.println("=====新增節點成功=====path:"+path); }else if(EventType.NodeDataChanged == type){ System.out.println("=====修改節點成功=====path:"+path); }else if(EventType.NodeDeleted == type){ System.out.println("=====刪除節點成功=====path:"+path); } } System.out.println("事件通知開始后"); } public static void main(String[] args) { MyWatch w = new MyWatch(); w.createZkConnection(ADDRESS, TIMEOUT); //w.createNode("/zk01", "zk01-value"); //w.updateNode("/zk01", "zk01-value2"); w.deleteNode("/zk01"); } }