Java操作zookeeper


Java操作zookeeper總共有三種方式:

1.原生的Java API

2.zkclient

3.curator

 第一種實現代碼:

pom.xml

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.8</version>
</dependency>

 示例的java代碼如下:

package zook;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;

public class App {

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        String connStr = "192.168.126.128:2181";
        CountDownLatch countDown = new CountDownLatch(1);
        
        Watcher watcher=new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == KeeperState.SyncConnected) {
                    System.err.println("eventType:"+event.getType());
                    if(event.getType()==Event.EventType.None){
                        countDown.countDown();
                    }else if(event.getType()==Event.EventType.NodeCreated){
                        System.out.println("listen:節點創建");
                    }else if(event.getType()==Event.EventType.NodeChildrenChanged){
                        System.out.println("listen:子節點修改");
                    }
                }
            }
        };
        
        ZooKeeper zookeeper = new ZooKeeper(connStr, 5000,watcher );
        countDown.await();

        //注冊監聽,每次都要重新注冊,否則監聽不到        
        zookeeper.exists("/top/jinyong", watcher);
        
        // 創建節點
        String result = zookeeper.create("/top/jinyong", "一生一世".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(result);
        
        Thread.sleep(10);

        // 獲取節點
        byte[] bs = zookeeper.getData("/top/jinyong", true, null);
        result = new String(bs);
        System.out.println("創建節點后的數據是:" + result);

        // 修改節點
        zookeeper.setData("/top/jinyong", "I love you".getBytes(), -1);
        
        Thread.sleep(10);

        bs = zookeeper.getData("/top/jinyong", true, null);
        result = new String(bs);
        System.out.println("修改節點后的數據是:" + result);

        // 刪除節點
        zookeeper.delete("/top/jinyong", -1);
        System.out.println("節點刪除成功");
    }

}
View Code

 說明:

1.會話連接是異步的,需要自己去處理。此處用的CountDownLatch

2.Watch需要重復注冊,不然就不能生效,比如開始的zookeeper.exists("/top/jinyong", watcher);就是為了注冊監聽

3.開發的復雜性還是比較高的

4.不支持多節點刪除和創建。需要自己去遞歸。后面有一個關於遞歸的示例。

第二種實現:

pom.xml

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

 示例的Java代碼如下:

package zook;

import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher.Event.KeeperState;

public class Client {

    public static void main(String[] args) throws InterruptedException {
        String connStr = "192.168.126.128:2181";
        ZkClient zk = new ZkClient(connStr);

        // 注冊【數據】事件
        zk.subscribeDataChanges("/top/zhuzhu", new IZkDataListener() {

            @Override
            public void handleDataDeleted(String arg0) throws Exception {
                System.err.println("數據刪除:" + arg0);

            }

            @Override
            public void handleDataChange(String arg0, Object arg1) throws Exception {
                System.err.println("數據修改:" + arg0 + "------" + arg1);

            }
        });

        zk.subscribeChildChanges("/top", new IZkChildListener() {

            @Override
            public void handleChildChange(String arg0, List<String> arg1) throws Exception {
                System.err.println("子節點發生變化:" + arg0);
                arg1.forEach(f -> {
                    System.out.println("content:" + f);
                });
            }
        });

        List<String> list = zk.getChildren("/");
        list.forEach(e -> {
            System.out.println(e);
        });

        String res = zk.create("/top/zhuzhu", "I love you", CreateMode.PERSISTENT);
        System.out.println("創建節點/top/zhuzhu成功:" + res);

        zk.writeData("/top/zhuzhu", "forerver");
        System.out.println("修改節點/top/zhuzhu數據成功");

        res = zk.readData("/top/zhuzhu");
        System.out.println("節點數據:" + res);

        Thread.sleep(1000);

        zk.delete("/top/zhuzhu");
        System.out.println("刪除節點/top/zhuzhu成功");
        
        Thread.sleep(1000);
        
        System.out.println("------------------------------------------------");
        
        for (int i = 0; i < 10; i++) {
            zk.create("/top/zhuzhu", "I love you", CreateMode.PERSISTENT);
            Thread.sleep(1000);
            zk.delete("/top/zhuzhu");
            Thread.sleep(1000);
        }

    }

}
View Code

 說明:

1.subscribe開頭的為注冊監聽的一些方法

2.addAuthInfo和setAcl為權限相關控制

3.普通使用這種方式還是值得推薦的

第三種實現:

pom.xml

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.11.0</version>
</dependency>

 示例的Java代碼如下:

package zook;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class Curator {

    public static void main(String[] args) throws Exception {
        String connStr = "192.168.23.24:2181";
        CuratorFramework cur=CuratorFrameworkFactory.builder()
            .connectString(connStr)
            .connectionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .build();
        cur.start();//連接
        
        //創建監聽
        PathChildrenCache cache=new PathChildrenCache(cur,"/top",true);
        cache.start();
        cache.rebuild();
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework framwork, PathChildrenCacheEvent event) throws Exception {
                System.err.println("節點發生變化:"+event.getType());
            }
        });                
        
        Stat stat=cur.checkExists().forPath("/top/zhuzhu");
        if(stat!=null){
            System.out.println("【/top/zhuzhu】節點存在,直接刪除");
            cur.delete().forPath("/top/zhuzhu");
        }
        cur.delete().forPath("/top/zhuzhu");
        
        System.in.read();
        
        System.out.println("准備創建【/top/zhuzhu】");
        cur.create().withMode(CreateMode.PERSISTENT)
            .forPath("/top/zhuzhu", "love forever".getBytes());
        System.out.println("節點【/top/zhuzhu】創建成功");
        
        Thread.sleep(1000);
        
        byte[] bs=cur.getData().forPath("/top/zhuzhu");
        System.out.println("數據:"+new String(bs));
        
        Thread.sleep(1000);
        
        cur.delete().forPath("/top/zhuzhu");
        
        Thread.sleep(1000);

    }

    
    /**
     * 三種watcher來做節點的監聽
     * pathcache   監視一個路徑下子節點的創建、刪除、節點數據更新
     * NodeCache   監視一個節點的創建、更新、刪除
     * TreeCache   pathcaceh+nodecache 的合體(監視路徑下的創建、更新、刪除事件),
     * 緩存路徑下的所有子節點的數據
     */

    public static void main1(String[] args) throws Exception {        
        String connStr = "192.168.23.24:2181";
        CuratorFramework curatorFramework=CuratorFrameworkFactory.builder()
            .connectString(connStr)
            .connectionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .build();
        curatorFramework.start();

        /**
         * 節點變化NodeCache
         */
       /* NodeCache cache=new NodeCache(curatorFramework,"/curator",false);
        cache.start(true);

        cache.getListenable().addListener(()-> System.out.println("節點數據發生變化,變化后的結果" +
                ":"+new String(cache.getCurrentData().getData())));

        curatorFramework.setData().forPath("/curator","菲菲".getBytes());*/


        /**
         * PatchChildrenCache
         */

        PathChildrenCache cache=new PathChildrenCache(curatorFramework,"/event",true);
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        cache.rebuild();
        // Normal / BUILD_INITIAL_CACHE /POST_INITIALIZED_EVENT

        cache.getListenable().addListener((curatorFramework1,pathChildrenCacheEvent)->{
            switch (pathChildrenCacheEvent.getType()){
                case CHILD_ADDED:
                    System.out.println("增加子節點");
                    break;
                case CHILD_REMOVED:
                    System.out.println("刪除子節點");
                    break;
                case CHILD_UPDATED:
                    System.out.println("更新子節點");
                    break;
                default:break;
            }
        });

      //  curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event","event".getBytes());
       // TimeUnit.SECONDS.sleep(1);
       // System.out.println("1");
//        curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event/event1","1".getBytes());
//        TimeUnit.SECONDS.sleep(1);
//        System.out.println("2");
//
//        curatorFramework.setData().forPath("/event/event1","222".getBytes());
//        TimeUnit.SECONDS.sleep(1);
//        System.out.println("3");

        curatorFramework.delete().forPath("/event/event1");
        System.out.println("4");




        System.in.read();

    }

}
View Code

 說明:

1.支持事務

2.支持Flush寫法

3.開始測試多次程序啟動就執行刪除節點,而監聽的結果確實新增,后來加了cache.rebuild();代碼就沒問題了。跟源碼,在cache.start()里面有一個構造函數也是調用了rebuild方法的。

4.功能還是比較強大的。高級功能都會用到這種方式

 

最后貼一個原生API的遞歸操作方式:

package zook;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;

public class ZookManager {
    ZooKeeper zookeeper = null;

    public ZookManager(String connStr) throws IOException, InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        zookeeper = new ZooKeeper(connStr, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getType() == EventType.None) {
                    if (event.getState() == KeeperState.SyncConnected) {
                        latch.countDown();
                    } else {
                        System.out.println("連接失敗.");
                        latch.countDown();
                    }
                }
            }

        });
        latch.await();
    }

    /** 創建節點,不存在父節點將新增,如果節點已經存在將拋出異常 **/
    public String create(String path, String val) throws KeeperException, InterruptedException {
        if (!checkPath(path)) {
            return "";
        }

        String p = getParentPath(path);
        cycleCreate(p);

        String url = zookeeper.create(path, val.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        return url;
    }

    /** 設置節點的數據,如果節點不存在將新增該節點  **/
    public Stat setData(String path, String val) throws KeeperException, InterruptedException {
        if (!checkPath(path)) {
            return null;
        }

        cycleCreate(path);
        return zookeeper.setData(path, val.getBytes(), -1);
    }

    /** 刪除節點,如果存在子節點將遞歸刪除 
     * @throws InterruptedException 
     * @throws KeeperException **/
    public void delete(String path) throws KeeperException, InterruptedException {
        if (!checkPath(path)) {
            return;
        }

        List<String> chidren = zookeeper.getChildren(path, false);
        for (String p : chidren) {
            delete(path + "/" + p);
        }
        zookeeper.delete(path, -1);
    }

    private void cycleCreate(String path) throws KeeperException, InterruptedException {
        Stat stat = zookeeper.exists(path, null);
        if (stat == null) {
            String p = getParentPath(path);
            cycleCreate(p);// 遞歸
            // 創建
            zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    /**
     * 檢查目錄是否正確
     * @param path
     * @return
     */
    private boolean checkPath(String path) {
        if (!path.startsWith("/")) {
            System.err.println("路徑必須以/開頭:" + path);
            return false;
        }
        if (path.endsWith("/")) {
            System.err.println("路徑不能以/結尾:" + path);
            return false;
        }
        if (path.contains("//")) {
            System.err.println("路徑格式不對,存在連續的/:" + path);
            return false;
        }
        if (path.equals("/")) {
            System.err.println("路徑格式不對,只有一個/:" + path);
            return false;
        }
        return true;
    }

    /**
     * 獲得父級目錄
     * @param path /root/abc
     * @return
     */
    private String getParentPath(String path) {
        int index = path.lastIndexOf("/");
        return path.substring(0, index);
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZookManager zoo = new ZookManager("192.168.23.24:2181");
        zoo.setData("/top/enjoy/abc", "abc");
        zoo.setData("/top/enjoy/bbb", "bbb");
        zoo.setData("/top/enjoy/ccc", "ccc");
        System.out.println("成功新增");        
        zoo.delete("/top/enjoy");
        System.out.println("成功刪除");
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM