在Java中操作Zookeeper


 

依賴

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.6.0</version>
    </dependency>

 

 

連接到zkServer

    //連接字符串,zkServer的ip、port,如果是集群逗號分隔
    String connectStr = "192.168.1.9:2181";

    //zookeeper就是一個zkCli
    ZooKeeper zooKeeper = null;

    try {
     //初始次數為1。后面要在內部類中使用,三種寫法:1、寫成外部類成員變量,不用加final;2、作為函數局部變量,放在try外面,寫成final;3、寫在try中,不加final
       CountDownLatch countDownLatch = new CountDownLatch(1);
        //超時時間ms,監聽器
        zooKeeper = new ZooKeeper(connectStr, 5000, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                //如果狀態變成已連接
                if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
                    System.out.println("連接成功");
                    //次數-1
                    countDownLatch.countDown();
                }
            }
        });
        //等待,次數為0時才會繼續往下執行。等待監聽器監聽到連接成功,才能操作zk
        countDownLatch.await();
    } catch (IOException | InterruptedException e) {
        e.printStackTrace();
    }


    //...操作zk。后面的demo都是寫在此處的


    //關閉連接
    try {
        zooKeeper.close();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

 

 

 

 

檢測節點是否存在

  // 檢測節點是否存在

    // 同步方式
    Stat exists = null;
    try {
        //如果存在,返回節點狀態Stat;如果不存在,返回null。第二個參數是watch
        exists = zooKeeper.exists("/mall",false);
    } catch (KeeperException | InterruptedException e) {
        e.printStackTrace();
    }
    if (exists==null){
        System.out.println("節點不存在");
    }
    else {
        System.out.println("節點存在");
    }


    //異步回調
    zooKeeper.exists("/mall",false, new AsyncCallback.StatCallback() {
        //第二個是path znode路徑,第三個是ctx 后面傳入實參,第四個是znode的狀態
        public void processResult(int i, String s, Object o, Stat stat) {
            //如果節點不存在,返回的stat是null
            if (stat==null){
                System.out.println("節點不存在");
            }
            else{
                System.out.println("節點存在");
            }
        }
    // 傳入ctx,Object類型
    },null);

操作后,服務端會返回處理結果,返回void、null也算處理結果。

同步指的是當前線程阻塞,等待服務端返回數據,收到返回的數據才繼續往下執行;

異步回調指的是,把對結果(返回的數據)的處理寫在回調函數中,當前線程不等待返回的數據,繼續往下執行,收到返回的數據時自動調用回調函數來處理。

 

如果處理返回數據的代碼之后的操作,不依賴返回數據、對返回數據的處理,那么可以把返回數據的處理寫成回調函數。

 

 

 

 

創建節點

    //創建節點

    //同步方式
    try {
        //數據要寫成byte[],不攜帶數據寫成null;默認acl權限使用ZooDefs.Ids.OPEN_ACL_UNSAFE;最后一個是節點類型,P是永久,E是臨時,S是有序
        zooKeeper.create("/mall", "abcd".getBytes(),  ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("已創建節點/mall");
        //如果節點已存在,會拋出異常
    } catch (KeeperException | InterruptedException e) {
     System.out.println("創建節點/mall失敗,請檢查節點是否已存在"); e.printStackTrace(); }
//異步回調 zooKeeper.create("/mall", "abcd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.Create2Callback(){ //第二個path,第三個ctx,第四個節點狀態 public void processResult(int i, String s, Object o, String s1, Stat stat) { //回調方式不拋出異常,返回的stat是創建節點的狀態,如果節點已存在,返回的stat是null if (stat==null){ System.out.println("創建節點/mall失敗,請檢查節點是否已存在"); } else { System.out.println("節點創建成功"); } } //ctx實參 },null);

 

 

 

刪除節點

    //刪除節點

    //同步方式
    try {
        //第二個參數是版本號,-1表示可以是任何版本
        zooKeeper.delete("/mall1",-1);
        System.out.println("成功刪除節點/mall");
    } catch (InterruptedException | KeeperException e) {
        System.out.println("刪除節點/mall失敗");
        e.printStackTrace();
    }


    //異步回調
    zooKeeper.delete("/mall2", -1, new AsyncCallback.VoidCallback() {
        //第二個是path,第三個是ctx
        public void processResult(int i, String s, Object o) {
            
        }
    //
    //ctx實參
    },null);

delete()只能刪除沒有子節點的znode,如果該znode有子節點會拋出異常。

沒有提供遞歸刪除子節點的方法,如果要刪除帶有子節點的znode,需要自己實現遞歸刪除。可以先getChildren()獲取子節點列表,遍歷列表依次刪除子節點,再刪除父節點。

 

 

 

 

獲取子節點列表

  //獲取子節點列表,List<String>,比如/mall/user,/mall/order,返回的是["user"、"order"]

    //同步方式
    List<String> children = null;
    try {
        //第二個參數是watch
        children = zooKeeper.getChildren("/mall", false);
    } catch (KeeperException | InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("子節點列表:" + children);


    //異步
    zooKeeper.getChildren("/mall", false, new AsyncCallback.ChildrenCallback() {
        //第二個起依次是:path、ctx、返回的子節點列表
        public void processResult(int i, String s, Object o, List<String> list) {
            System.out.println("子節點列表:" + list);
        }
    //ctx實參
    }, null);

只獲取子節點,不獲取孫節點。

watch都是:可以寫boolean,要添加監聽就寫true,不監聽寫false;也可以寫Watcher對象,new一個Watcher對象表示要監聽,null表示不監聽。

 

 

 

 

獲取節點數據

    //獲取節點數據,返回byte[]

    //同步方式
    byte[] data = null;
    try {
        //第二個參數是watch,第三個是stat
        data = zooKeeper.getData("/mall", false, null);
    } catch (KeeperException | InterruptedException e) {
        e.printStackTrace();
    }
    //調用new String()時要判斷data是否為null,如果是null會拋NPE
    if (data==null){
        System.out.println("該節點沒有數據");
    }
    else{
        System.out.println("節點數據:"+new String(data));
    }


    //異步回調
    zooKeeper.getData("/mall", false, new AsyncCallback.DataCallback() {
        //第二個起依次是:path、ctx、返回的節點數據、節點狀態
        public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
            //不必判斷bytes是否是null,如果節點沒有數據,不會調用回調函數;執行到此,說明bytes不是null
            System.out.println("節點數據:" + new String(bytes) );
        }
        //ctx實參
    }, null);

 

 

 

設置|修改節點數據

  //設置|更新節點據

    //同步方式
    try {
        //最后一個參數是版本號
        zooKeeper.setData("/mall", "1234".getBytes(), -1);
        System.out.println("設置節點數據成功");
    } catch (KeeperException | InterruptedException e) {
        System.out.println("設置節點數據失敗");
        e.printStackTrace();
    }


    //異步回調
    zooKeeper.setData("/mall", "1234".getBytes(), -1, new AsyncCallback.StatCallback() {
        //第二個是path,第三個是ctx
        public void processResult(int i, String s, Object o, Stat stat) {

        }
    // ctx
    },null);

 

 

 

 

設置acl權限

  //設置acl權限
        
    //第一個參數指定權限,第二個是Id對象
    ACL acl = new ACL(ZooDefs.Perms.ALL, new Id("auth", "chy:abcd"));
    
    List<ACL> aclList = new ArrayList<>();
    aclList.add(acl);
    
    //如果List中只有一個ACL對象,也可以這樣寫
    //List<ACL> aclList = Collections.singletonList(auth);
        
    //驗證權限,需寫在設置權限之前。如果之前沒有設置權限,也需要先驗證本次即將設置的用戶
    zooKeeper.addAuthInfo("digest","chy:abcd".getBytes());

    
    //方式一  setAcl
    try {
        //第二個參數是List<ACL>,第三個參數是版本號
        zooKeeper.setACL("/mall", aclList, -1);
        System.out.println("設置權限成功");
    } catch (KeeperException | InterruptedException e) {
        System.out.println("設置權限失敗");
        e.printStackTrace();
    }

    
    //方式二 在創建節點時設置權限
    try {
        zooKeeper.create("/mall","abcd".getBytes(),aclList,CreateMode.PERSISTENT);
        System.out.println("已創建節點並設置權限");
    } catch (KeeperException | InterruptedException e) {
        e.printStackTrace();
    }

設置權限之后,連接zkServer進行操作時,都需要先驗證用戶。

此處未寫對應的異步回調。

 

 

 

查看acl權限

    //查看acl權限
        
    //設置權限之后,以后操作時需要先驗證用戶,一次session中驗證一次即可
    zooKeeper.addAuthInfo("digest","chy:abcd".getBytes());

    
    //同步方式
    try {
        List<ACL> aclList = zooKeeper.getACL("/mall", null);
        System.out.println("acl權限:"+aclList);
    } catch (KeeperException | InterruptedException e) {
        System.out.println("獲取acl權限失敗");
        e.printStackTrace();
    }


    //異步回調
    zooKeeper.getACL("/mall3", null, new AsyncCallback.ACLCallback() {
        //第二個起:path、ctx、獲取到的List<ACL>、節點狀態
        public void processResult(int i, String s, Object o, List<ACL> list, Stat stat) {
            //就算沒有手動設置acl權限,默認也是有值的
            System.out.println("acl權限:"+list);
        }
    //ctx實參
    },null);

 

 

 

 

添加監聽器

  //添加監聽  方式一
    try {
        CountDownLatch countDownLatch = new CountDownLatch(1);

        zooKeeper.getData("/mall", new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                //watcher會監聽該節點所有的事件,不管發生什么事件都會調用process()來處理,需要先判斷一下事件類型
                if (watchedEvent.getType().equals(Event.EventType.NodeDataChanged)){
                    System.out.println("節點數據改變了");
                    //會一直監聽,如果只監聽一次數據改變,將下面這句代碼取消注釋即可
                    //countDownLatch.countDown();
                }
            }
        }, null);
        //默認watcher是一次性的,如果要一直監聽,需要借助CountDownLatch
        countDownLatch.await();
    } catch (KeeperException | InterruptedException e) {
        e.printStackTrace();
    }

ZooKeeper類的exists()、getData()、getChildren()方法都具有添加監聽的功能,用法類似。

 

watchedEvent.getType().equals(Event.EventType.NodeDataChanged)
watchedEvent.getState().equals(Event.KeeperState.SyncConnected)

getType是獲取事件類型,getState是獲取連接狀態。

 

上面這種方式,會遞歸監聽子孫節點,子孫節點的數據改變也算NodeDataChanged,子孫節點的創建|刪除也算NodeCreated|NodeDeleted。

 

 

   //添加監聽  方式二   
   try {
        CountDownLatch countDownLatch1 = new CountDownLatch(1);
        zooKeeper.addWatch("/mall", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getType().equals(Event.EventType.NodeDataChanged)){
                    System.out.println("節點數據改變了");
                    //如果只監聽一次數據改變,將下面這句代碼注釋掉
                    //countDownLatch1.countDown();
                }
            }
        //監聽模式,PERSISTENT是不監聽子孫節點,PERSISTENT_RECURSIVE是遞歸監聽子孫節點
        }, AddWatchMode.PERSISTENT_RECURSIVE);
        countDownLatch1.await();
    } catch (KeeperException | InterruptedException e) {
        e.printStackTrace();
    }

 

 

countDownLatch1.await();要阻塞線程,最好啟動一條新線程來監聽。

只有設置了監聽的zkCli,該節點發生事件時才會收到zkServer的通知。

watch只保存在zkServer的內存中(zk依賴jdk,運行在jvm上,堆中的session對象),不持久化到硬盤,就是說設置的監聽只在本次會話期間有效,zkCli關閉連接,zkServer在指定時間后(默認連續沒有收到10個心跳),zkServer會自動刪除相關session,watcher丟失。

 

 

 

 

移除監聽

    //移除監聽 方式一
    try {
        zooKeeper.addWatch("/mall",null,AddWatchMode.PERSISTENT);
        System.out.println("已移除監聽");
    } catch (KeeperException | InterruptedException e) {
        e.printStackTrace();
    }

就是上面添加監聽的哪些方法,watch|watcher參數,如果是boolean類型,設置為false即可關閉監聽;如果是Watcher類型,可以設置null覆蓋掉之前設置的監聽。

 

 

    //移除監聽 方式二
    try {
        //第二個參數是Watcher,原來添加的那個Watcher監聽對象,不能是null
        //第三個參數指定要移除監聽的哪部分,Any是移除整個監聽,Data是移除對數據的監聽,Children是移除對子節點的遞歸監聽
        //最后一個參數指定未連接到zkServe時,是否移除本地監聽部分
        zooKeeper.removeWatches("/mall",watcher, Watcher.WatcherType.Children,true);
    } catch (InterruptedException | KeeperException e) {
        e.printStackTrace();
    }

監聽由2部分組成,一部分在zkServer上,事件發生時通知對應的zkCli;一部分在zkCli,收到zkServer的通知時做出一些處理。

最后一個參數指定未連接到zkServer,是否移除本地(zkCli)監聽部分,true——移除,false——不移除。

比如說沒有連接到zkServer,移除本地監聽,10個心跳內連上了zkServer,zkServer的監聽部分仍在,發生事件時仍會通知此zkCli,但zkCli本地監聽已經移除了,對通知不會做出處理。

 

 

第一種方式會移除整個監聽,不需要傳入監聽對象watcher;

第二種方式功能更全,可以指定移除監聽的哪個部分,但需要傳入watcher對象,添加監聽時要用一個變量來保存watcher對象。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM