初學zookeeper--自定義事件監聽


zk有四種節點類型:

持久節點,持久順序節點,臨時節點,臨時順序節點。

自定義監聽事件時,在節點的創建,修改,刪除的方法第一行都需要加入是否監聽的一個方法:

//開啟監聽的方法。第二個參數表示是否開啟監聽
zk.exists(path, true);

 

zk自定義監聽:

package com.kf.zkDemo;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
 * 自定義zk的事件通知
 * @author kf
 *
 */
public class MyWatch implements Watcher{
    //定義鏈接地址
    private static String ADDRESS = "127.0.0.1:2181";
    //超時時間
    private static int TIMEOUT = 2000;
    
    ZooKeeper zk;
    //阻塞用戶線程,用戶必須等待連接成功
    private CountDownLatch countDownLatch = new CountDownLatch(1);
    
    public void createZkConnection(String address, int timeout){
        //第三個參數是事件通知。這里用的是本類,自定義的事件通知
        try {
            zk = new ZooKeeper(address, timeout, this);
            countDownLatch.countDown();
        } catch (IOException e) {
            e.printStackTrace();
        }
    } 
    /**
     * 創建節點內容
     * @param path
     * @param data
     * @return
     */
    public boolean createNode(String path, String data){
        //第三個參數表示權限的,這里開放所有權限,不限制服務器
        //第四個參數表示節點的類型。用的持久節點
        try {
            //開啟監聽的方法。第二個參數表示是否開啟監聽
            zk.exists(path, true);
            String result = zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("創建節點成功!節點為:"+path+",值為:"+data);
            System.out.println("創建結果為:"+result);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 修改節點內容
     * @param path
     * @param data
     * @return
     */
    public boolean updateNode(String path, String data){
        //第三個參數表示權限的,這里開放所有權限,不限制服務器
        //第四個參數表示節點的類型。用的持久節點
        try {
            //開啟監聽的方法。第二個參數表示是否開啟監聽
            zk.exists(path, true);
            //第三個參數表示版本號。  zk的數據版本默認從0開始,每次修改都會加1.   -1嚴格來說屬於不合法的版本號。表示從最新版本進行更新
            Stat result = zk.setData(path, data.getBytes(), -1);
            System.out.println("修改節點成功!節點為:"+path+",修改后為值為:"+data);
            System.out.println("修改節點成功,result:"+result);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    
    public boolean deleteNode(String path){
        //第二個參數表示版本號
        try {
            //開啟監聽的方法。第二個參數表示是否開啟監聽
            zk.exists(path, true);
            //第二個參數表示版本號。 zk的數據版本默認從0開始,每次修改都會加1.   -1嚴格來說屬於不合法的版本號。表示從最新版本進行更新
            zk.delete(path, -1);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    
    

    public void process(WatchedEvent event) {
        System.out.println("事件通知開始前");
        //事件路徑
        String path = event.getPath();
        //事件狀態  即  連接不連接
        KeeperState state = event.getState();
        //事件類型
        EventType type = event.getType();
        System.out.println("事件路徑"+path+",事件狀態"+state+",事件類型"+type);
        if(KeeperState.SyncConnected == state){
            //事件類型  None表示是連接類型
            if(EventType.None == type){
                System.out.println("連接類型");
                //連接上zk服務器后放開信號量
                countDownLatch.countDown();
                System.out.println("=====ZK連接成功=====");
            }else if(EventType.NodeCreated == type){
                System.out.println("=====新增節點成功=====path:"+path);
            }else if(EventType.NodeDataChanged == type){
                System.out.println("=====修改節點成功=====path:"+path);
            }else if(EventType.NodeDeleted == type){
                System.out.println("=====刪除節點成功=====path:"+path);
            }
        }
        System.out.println("事件通知開始后");
    }
    
    
    public static void main(String[] args) {
        MyWatch w = new MyWatch();
        w.createZkConnection(ADDRESS, TIMEOUT);
        //w.createNode("/zk01", "zk01-value");
        //w.updateNode("/zk01", "zk01-value2");
        w.deleteNode("/zk01");
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM