ZK節點介紹和基本操作


一、基本介紹
1 、PERSISTENT (0, false, false),  
          持久節點:節點創建后,會一直存在,不會因客戶端會話失效而刪除;
2、PERSISTENT_SEQUENTIAL (2, false, true),  
         持久順序節點:基本特性與持久節點一致,創建節點的過程中,zookeeper會在其名字后自動追加一個單調增長的數字后綴,作為新的節點名;  
3、EPHEMERAL (1, true, false),  
        臨時節點:客戶端會話失效或連接關閉后,該節點會被自動刪除,且不能再臨時節點下面創建子節點,否則報如下錯(org.apache.zookeeper.KeeperException$NoChildrenForEphemeralsException: KeeperErrorCode = NoChildrenForEphemerals for /node/child);
 
 4、EPHEMERAL_SEQUENTIAL (3, true, true);
         臨時順序節點:基本特性與臨時節點一致,創建節點的過程中,zookeeper會在其名字后自動追加一個單調增長的數字后綴,作為新的節點名;  
    
    二、JavaApi的操作
  1)maven引入
   
<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.12</version>
    </dependency>
    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.10</version>
    </dependency>


    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.0.0</version>
    </dependency>
</dependencies>
View Code

 2)Java基本節點的操作

 

public class ZkClientOperator {

    /** zookeeper地址 */
    static final String CONNECT_ADDR = "192.168.112.131:2181";
    /** session超時時間 */
    static final int SESSION_OUTTIME = 10000;//ms

    /**
     * 主要測試增加 臨時節點、持久化節點,以及讀取內容,刪除節點操作
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
       //ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);
        ZkClient zkc = new ZkClient(CONNECT_ADDR, SESSION_OUTTIME);

        //1. create and delete方法
        zkc.createEphemeral("/temp");
        zkc.createPersistent("/super/c1", true);

        Thread.sleep(10000);
        zkc.delete("/temp");
        zkc.deleteRecursive("/super");

        //2. 設置path和data 並且讀取子節點和每個節點的內容
        zkc.createPersistent("/super", "1234");
        zkc.createPersistent("/super/c1", "c1內容");
        zkc.createPersistent("/super/c2", "c2內容");

        List<String> list = zkc.getChildren("/super");
        for(String p : list){
            System.out.println(p);
            String rp = "/super/" + p;
            String data = zkc.readData(rp);
            System.out.println("節點為:" + rp + ",內容為: " + data);
        }

        //3. 更新和判斷節點是否存在
        zkc.writeData("/super/c1", "新內容");
        System.out.println(zkc.readData("/super/c1").toString());
        System.out.println(zkc.exists("/super/c1"));

//    4.遞歸刪除/super內容
        zkc.deleteRecursive("/super");
    }
}
View Code

3)測試watcher事件

 

public class ZooKeeperWatcher implements Watcher  {


    /** 定義原子變量 */
    AtomicInteger seq = new AtomicInteger();
    /** 定義session失效時間 */
    private static final int SESSION_TIMEOUT = 10000;
    /** zookeeper服務器地址 */
    private static final String CONNECTION_ADDR = "192.168.112.131:2181";
    /** zk父路徑設置 */
    private static final String PARENT_PATH = "/testWatch";
    /** zk子路徑設置 */
    private static final String CHILDREN_PATH = "/testWatch/children";
    /** 進入標識 */
    private static final String LOG_PREFIX_OF_MAIN = "【Main】";
    /** zk變量 */
    private ZooKeeper zk = null;
    /** 信號量設置,用於等待zookeeper連接建立之后 通知阻塞程序繼續向下執行 */
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);


    /**
     * 創建ZK連接
     * @param connectAddr ZK服務器地址列表
     * @param sessionTimeout Session超時時間
     */
    public void createConnection(String connectAddr, int sessionTimeout) {
        this.releaseConnection();
        try {
            zk = new ZooKeeper(connectAddr, sessionTimeout, this);
            System.out.println(LOG_PREFIX_OF_MAIN + "開始連接ZK服務器");
            connectedSemaphore.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 關閉ZK連接
     */
    public void releaseConnection() {
        if (this.zk != null) {
            try {
                this.zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    /**
     * 創建節點
     * @param path 節點路徑
     * @param data 數據內容
     * @return
     */
    public boolean createPath(String path, String data) {
        try {
            //設置監控(由於zookeeper的監控都是一次性的所以 每次必須設置監控)
            this.zk.exists(path, true);
            System.out.println(LOG_PREFIX_OF_MAIN + "節點創建成功, Path: " +
                    this.zk.create(    /**路徑*/
                            path,
                            /**數據*/
                            data.getBytes(),
                            /**所有可見*/
                            ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            /**永久存儲*/
                            CreateMode.PERSISTENT ) +
                    ", content: " + data);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    /**
     * 讀取指定節點數據內容
     * @param path 節點路徑
     * @return
     */
    public String readData(String path, boolean needWatch) {
        try {
            return new String(this.zk.getData(path, needWatch, null));
        } catch (Exception e) {
            e.printStackTrace();
            return "";
        }
    }

    /**
     * 更新指定節點數據內容
     * @param path 節點路徑
     * @param data 數據內容
     * @return
     */
    public boolean writeData(String path, String data) {
        try {
            System.out.println(LOG_PREFIX_OF_MAIN + "更新數據成功,path:" + path + ", stat: " +
                    this.zk.setData(path, data.getBytes(), -1));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    /**
     * 刪除指定節點
     *
     * @param path
     *            節點path
     */
    public void deleteNode(String path) {
        try {
            this.zk.delete(path, -1);
            System.out.println(LOG_PREFIX_OF_MAIN + "刪除節點成功,path:" + path);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 判斷指定節點是否存在
     * @param path 節點路徑
     */
    public Stat exists(String path, boolean needWatch) {
        try {
            return this.zk.exists(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 獲取子節點
     * @param path 節點路徑
     */
    private List<String> getChildren(String path, boolean needWatch) {
        try {
            return this.zk.getChildren(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 刪除所有節點
     */
    public void deleteAllTestPath() {
        if(this.exists(CHILDREN_PATH, false) != null){
            this.deleteNode(CHILDREN_PATH);
        }
        if(this.exists(PARENT_PATH, false) != null){
            this.deleteNode(PARENT_PATH);
        }
    }

    /**
     * 收到來自Server的Watcher通知后的處理。
     */
    @Override
    public void process(WatchedEvent event) {

        System.out.println("進入 process 。。。。。event = " + event);

        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        if (event == null) {
            return;
        }


        // 連接狀態
        Watcher.Event.KeeperState keeperState = event.getState();
        // 事件類型
        Watcher.Event.EventType eventType = event.getType();
        // 受影響的path
        String path = event.getPath();

        String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";

        System.out.println(logPrefix + "收到Watcher通知");
        System.out.println(logPrefix + "連接狀態:\t" + keeperState.toString());
        System.out.println(logPrefix + "事件類型:\t" + eventType.toString());


        if (Event.KeeperState.SyncConnected == keeperState) {
            // 成功連接上ZK服務器
            if (Event.EventType.None == eventType) {
                System.out.println(logPrefix + "成功連接上ZK服務器");
                connectedSemaphore.countDown();
            }
            //創建節點
             if (Event.EventType.NodeCreated == eventType) {
                System.out.println(logPrefix + "節點創建");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                this.exists(path, true);
            }
            //更新節點
            else if (Event.EventType.NodeDataChanged == eventType) {
                System.out.println(logPrefix + "節點數據更新");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(logPrefix + "數據內容: " + this.readData(PARENT_PATH, true));
            }
            //更新子節點
            else if (Event.EventType.NodeChildrenChanged == eventType) {
                System.out.println(logPrefix + "子節點變更");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(logPrefix + "子節點列表:" + this.getChildren(PARENT_PATH, true));
            }
            //刪除節點
            else if (Event.EventType.NodeDeleted == eventType) {
                System.out.println(logPrefix + "節點 " + path + " 被刪除");
            }
        }
        else if (Watcher.Event.KeeperState.Disconnected == keeperState) {
            System.out.println(logPrefix + "與ZK服務器斷開連接");
        }
        else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {
            System.out.println(logPrefix + "權限檢查失敗");
        }
        else if (Watcher.Event.KeeperState.Expired == keeperState) {
            System.out.println(logPrefix + "會話失效");
        }
        System.out.println("--------------------------------------------");

    }


    public static void main(String[] args) throws Exception {

        //建立watcher
        ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();
        //創建連接
        zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
        //System.out.println(zkWatch.zk.toString());

        Thread.sleep(1000);

        // 清理節點
        zkWatch.deleteAllTestPath();

        if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {


            // 讀取數據,在操作節點數據之前先調用zookeeper的getData()方法是為了可以watch到對節點的操作。watch是一次性的,
            // 也就是說,如果第二次又重新調用了setData()方法,在此之前需要重新調用一次。
             System.out.println("---------------------- read parent ----------------------------");

          //  zkWatch.readData(PARENT_PATH, true);
            // 更新數據
           zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");

            /** 讀取子節點,設置對子節點變化的watch,如果不寫該方法,則在創建子節點是只會輸出NodeCreated,而不會輸出NodeChildrenChanged,
             也就是說創建子節點時沒有watch。
             如果是遞歸的創建子節點,如path="/p/c1/c2"的話,getChildren(PARENT_PATH, ture)只會在創建c1時watch,輸出c1的NodeChildrenChanged,
             而不會輸出創建c2時的NodeChildrenChanged,如果watch到c2的NodeChildrenChanged,則需要再調用一次getChildren(String path, true)方法,
             其中path="/p/c1"
             */
            System.out.println("---------------------- read children path ----------------------------");
            zkWatch.getChildren(PARENT_PATH, true);

           Thread.sleep(1000);

            // 創建子節點,同理如果想要watch到NodeChildrenChanged狀態,需要調用getChildren(CHILDREN_PATH, true)
            zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");

            Thread.sleep(1000);

            zkWatch.readData(CHILDREN_PATH, true);
            zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
        }

        Thread.sleep(1000);
        // 清理節點
        zkWatch.deleteAllTestPath();
        Thread.sleep(1000);
        zkWatch.releaseConnection();
    }


}
View Code
三、Curator框架的的基本測試
  1)基本操作
  
package cn.enjoy.curator;


import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;




import java.util.List;


import static com.sun.xml.internal.ws.dump.LoggingDumpTube.Position.Before;


/**
* 測試Apache Curator框架的基本用法
*/
public class OperatorTest {
    //ZooKeeper服務地址
    private static final String SERVER = "127.0.0.1:2181";
    //會話超時時間
    private final int SESSION_TIMEOUT = 30000;
    //連接超時時間
    private final int CONNECTION_TIMEOUT = 5000;
    //創建連接實例
    private CuratorFramework client = null;


    /**
     * baseSleepTimeMs:初始的重試等待時間
     * maxRetries:最多重試次數
     * <p>
     * <p>
     * ExponentialBackoffRetry:重試一定次數,每次重試時間依次遞增
     * RetryNTimes:重試N次
     * RetryOneTime:重試一次
     * RetryUntilElapsed:重試一定時間
     */
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);


    @org.junit.Before
    public void init() {
        //創建 CuratorFrameworkImpl實例
        client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);


        //啟動
        client.start();
    }


    /**
     * 測試創建節點
     *
     * @throws Exception
     */
    @Test
    public void testCreate() throws Exception {
        //創建永久節點
        client.create().forPath("/curator", "/curator data".getBytes());
        //創建永久有序節點
        client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/curator_sequential", "/curator_sequential data".getBytes());


        //創建臨時節點
        client.create().withMode(CreateMode.EPHEMERAL)
                .forPath("/curator/ephemeral", "/curator/ephemeral data".getBytes());


        //創建臨時有序節點
        client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .forPath("/curator/ephemeral_path1", "/curator/ephemeral_path1 data".getBytes());
    }




    /**
     * 測試檢查某個節點是否存在
     *
     * @throws Exception
     */
    @Test
    public void testCheck() throws Exception {
        Stat stat1 = client.checkExists().forPath("/curator");
        Stat stat2 = client.checkExists().forPath("/curator2");


        System.out.println("'/curator'是否存在: " + (stat1 != null ? true : false));
        System.out.println("'/curator2'是否存在: " + (stat2 != null ? true : false));
    }


    /**
     * 測試異步設置節點數據
     *
     * @throws Exception
     */
    @Test
    public void testSetDataAsync() throws Exception {
        //創建監聽器
        CuratorListener listener = new CuratorListener() {
            @Override
            public void eventReceived(CuratorFramework client, CuratorEvent event)
                    throws Exception {
                System.out.println(event.getPath());
            }
        };
        //添加監聽器
        client.getCuratorListenable().addListener(listener);
        //異步設置某個節點數據
        client.setData().inBackground().forPath("/curator", "sync".getBytes());
        //為了防止單元測試結束從而看不到異步執行結果,因此暫停10秒
        Thread.sleep(10000);
    }
    /**
     * 測試另一種異步執行獲取通知的方式
     *
     * @throws Exception
     */
    @Test
    public void testSetDataAsyncWithCallback() throws Exception {
        BackgroundCallback callback = new BackgroundCallback() {


            @Override
            public void processResult(CuratorFramework client, CuratorEvent event)
                    throws Exception {
                System.out.println(event.getPath());
            }
        };
        //異步設置某個節點數據
        client.setData().inBackground(callback).forPath("/curator", "/curator modified data with Callback".getBytes());


        //為了防止單元測試結束從而看不到異步執行結果,因此暫停10秒
        Thread.sleep(10000);
    }
    /**
     * 測試刪除節點
     *
     * @throws Exception
     */
    @Test
    public void testDelete() throws Exception {
        //創建測試節點
        client.create().orSetData().creatingParentsIfNeeded()
                .forPath("/curator/del_key1", "/curator/del_key1 data".getBytes());
        client.create().orSetData().creatingParentsIfNeeded()
                .forPath("/curator/del_key2", "/curator/del_key2 data".getBytes());
        client.create().forPath("/curator/del_key2/test_key", "test_key data".getBytes());
        //刪除該節點
        client.delete().forPath("/curator/del_key1");
        //級聯刪除子節點
        client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator/del_key2");
    }
    
    /*
     * 測試事務管理:碰到異常,事務會回滾
     * @throws Exception
     */
    @Test
    public void testTransaction() throws Exception {
        //定義幾個基本操作
        CuratorOp createOp = client.transactionOp().create()
                .forPath("/curator/one_path", "some data".getBytes());


        CuratorOp setDataOp = client.transactionOp().setData()
                .forPath("/curator", "other data".getBytes());


        CuratorOp deleteOp = client.transactionOp().delete()
                .forPath("/curator");


        //事務執行結果
        List<CuratorTransactionResult> results = client.transaction()
                .forOperations(createOp, setDataOp, deleteOp);


        //遍歷輸出結果
        for (CuratorTransactionResult result : results) {
            System.out.println("執行結果是: " + result.getForPath() + "--" + result.getType());
        }
    }
}
View Code

 2) curator監聽器的測試

  

 
public class EventTest {


    //ZooKeeper服務地址
    private static final String SERVER = "192.168.30.10:2181";


    //會話超時時間
    private final int SESSION_TIMEOUT = 30000;


    //連接超時時間
    private final int CONNECTION_TIMEOUT = 5000;


    //創建連接實例
    private CuratorFramework client = null;


    /**
     * baseSleepTimeMs:初始的重試等待時間
     * maxRetries:最多重試次數
     *
     *
     * ExponentialBackoffRetry:重試一定次數,每次重試時間依次遞增
     * RetryNTimes:重試N次
     * RetryOneTime:重試一次
     * RetryUntilElapsed:重試一定時間
     */
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);


    @org.junit.Before
    public void init(){
        //創建 CuratorFrameworkImpl實例
        client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);


        //啟動
        client.start();
    }


    /**
     *
     * @描述:第一種監聽器的添加方式: 對指定的節點進行添加操作
     * 僅僅能監控指定的本節點的數據修改,刪除 操作 並且只能監聽一次 --->不好
     */


    @Test
    public  void TestListenterOne() throws Exception{
        client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());


        // 注冊觀察者,當節點變動時觸發
        byte[] data = client.getData().usingWatcher(new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("獲取 test 節點 監聽器 : " + event);
            }
        }).forPath("/test");


        client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());
        Thread.sleep(1000);
        client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());
        Thread.sleep(1000);
        System.out.println("節點數據: "+ new String(data));
        Thread.sleep(10000);
    }




    /**
     *
     * @描述:第二種監聽器的添加方式: Cache 的三種實現
     *   Path Cache:監視一個路徑下1)孩子結點的創建、2)刪除,3)以及結點數據的更新。
     *                  產生的事件會傳遞給注冊的PathChildrenCacheListener。
     *  Node Cache:監視一個結點的創建、更新、刪除,並將結點的數據緩存在本地。
     *  Tree Cache:Path Cache和Node Cache的“合體”,監視路徑下的創建、更新、刪除事件,並緩存路徑下所有孩子結點的數據。
     */


    //1.path Cache  連接  路徑  是否獲取數據
    //能監聽所有的字節點 且是無限監聽的模式 但是 指定目錄下節點的子節點不再監聽
    @Test
    public void setListenterTwoOne() throws Exception{
        ExecutorService pool = Executors.newCachedThreadPool();
        PathChildrenCache childrenCache = new PathChildrenCache(client, "/test", true);
        PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("開始進行事件分析:-----");
                ChildData data = event.getData();
                switch (event.getType()) {
                    case CHILD_ADDED:
                        System.out.println("CHILD_ADDED : "+ data.getPath() +"  數據:"+ data.getData());
                        break;
                    case CHILD_REMOVED:
                        System.out.println("CHILD_REMOVED : "+ data.getPath() +"  數據:"+ data.getData());
                        break;
                    case CHILD_UPDATED:
                        System.out.println("CHILD_UPDATED : "+ data.getPath() +"  數據:"+ data.getData());
                        break;
                    case INITIALIZED:
                        System.out.println("INITIALIZED : "+ data.getPath() +"  數據:"+ data.getData());
                        break;
                    default:
                        break;
                }
            }
        };
        childrenCache.getListenable().addListener(childrenCacheListener);
        System.out.println("Register zk watcher successfully!");
        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);


        //創建一個節點
        client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());


        client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","enjoy".getBytes());
        Thread.sleep(1000);
        client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node02","deer".getBytes());
        Thread.sleep(1000);
        client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node02","demo".getBytes());
        Thread.sleep(1000);
        client.delete().forPath("/test/node02");
        Thread.sleep(10000);
    }


    //2.Node Cache  監控本節點的變化情況   連接 目錄 是否壓縮
    //監聽本節點的變化  節點可以進行修改操作  刪除節點后會再次創建(空節點)
    @Test
    public void setListenterTwoTwo() throws Exception{
        ExecutorService pool = Executors.newCachedThreadPool();
        //設置節點的cache
        final NodeCache nodeCache = new NodeCache(client, "/test", false);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("the test node is change and result is :");
                System.out.println("path : "+nodeCache.getCurrentData().getPath());
                System.out.println("data : "+new String(nodeCache.getCurrentData().getData()));
                System.out.println("stat : "+nodeCache.getCurrentData().getStat());
            }
        });
        nodeCache.start();


        client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());
        Thread.sleep(1000);
        client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","enjoy".getBytes());
        Thread.sleep(10000);
    }
    //3.Tree Cache
    // 監控 指定節點和節點下的所有的節點的變化--無限監聽  可以進行本節點的刪除(不在創建)
    @Test
    public void TestListenterTwoThree() throws Exception{
        ExecutorService pool = Executors.newCachedThreadPool();
        //設置節點的cache
        TreeCache treeCache = new TreeCache(client, "/test");
        //設置監聽器和處理過程
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                ChildData data = event.getData();
                if(data !=null){
                    switch (event.getType()) {
                        case NODE_ADDED:
                            System.out.println("NODE_ADDED : "+ data.getPath() +"  數據:"+ new String(data.getData()));
                            break;
                        case NODE_REMOVED:
                            System.out.println("NODE_REMOVED : "+ data.getPath() +"  數據:"+ new String(data.getData()));
                            break;
                        case NODE_UPDATED:
                            System.out.println("NODE_UPDATED : "+ data.getPath() +"  數據:"+ new String(data.getData()));
                            break;


                        default:
                            break;
                    }
                }else{
                    System.out.println( "data is null : "+ event.getType());
                }
            }
        });
        //開始監聽
        treeCache.start();


        //創建一個節點
        client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());


        Thread.sleep(1000);
        client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","enjoy".getBytes());
        Thread.sleep(1000);
        client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","deer".getBytes());


        Thread.sleep(1000);
        client.create().orSetData().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/test/node02/node02_2","deer".getBytes());
        Thread.sleep(10000);
    }


}
View Code

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM