Zookeeper JAVA API的使用


0. 前言

  zookeeper安裝及使用  http://www.cnblogs.com/rocky-fang/p/7880309.html

 

1. 開發環境配置

1.1 idea創建一個maven工程

1.2 pom配置jar

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.rocky.learn</groupId>
    <artifactId>zk</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/log4j/log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.16</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.6.1</version>
        </dependency>

    </dependencies>

</project>
View Code

 

2. API使用

2.1 連接Zookeep Server

code

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/12.
 */
public class ZookeeperTestConnection implements Watcher {
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public void process(WatchedEvent event) {
        System.out.println("receive the event:"+event);
        if(Event.KeeperState.SyncConnected == event.getState())
            countDownLatch.countDown();
    }
    public static final String ADDRESS = "192.168.1.8:2181";

    public static void main(String[] args) throws IOException {
        ZooKeeper zooKeeper = new ZooKeeper(ADDRESS, 5000, new ZookeeperTestConnection());
        System.out.println(zooKeeper.getState());
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("zookeeper session established");
    }
}
View Code

控制台

2.2 創建節點

2.2.1 同步創建

code

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/12.
 */
public class ZookeeperTestCreateNodeSync implements Watcher {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final String ADDRESS = "192.168.1.8:2181";
    private static final String PREFIX = "/mytest-sync-create-";
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZooKeeper zooKeeper = new ZooKeeper(ADDRESS, 5000, new ZookeeperTestCreateNodeSync());
        System.out.println("state:"+zooKeeper.getState());
        countDownLatch.await();
        String path1 = zooKeeper.create(PREFIX, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode:"+ path1);
        String path2 = zooKeeper.create(PREFIX, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("success create znode:"+ path2);
        zooKeeper.close();
    }

    public void process(WatchedEvent event) {
        //連上了
        if(Event.KeeperState.SyncConnected == event.getState())
            countDownLatch.countDown();
    }
}
View Code

控制台

2.2.2 異步創建

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/12.
 */
public class ZookeeperTestCreateNodeAsync implements Watcher {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final String ADDRESS = "192.168.1.8:2181";
    private static final String PREFIX_ASYNC = "/mytest-async-create-";
    public static void main(String[] args) throws IOException, InterruptedException {
        ZooKeeper zooKeeper = new ZooKeeper(ADDRESS, 5000, new ZookeeperTestCreateNodeAsync());
        System.out.println("state:"+zooKeeper.getState());
        countDownLatch.await();
        zooKeeper.create(PREFIX_ASYNC, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
                new IStringCallBack(), "my test text...1");
        zooKeeper.create(PREFIX_ASYNC, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
                new IStringCallBack(), "my test text...2");
        zooKeeper.create(PREFIX_ASYNC, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                new IStringCallBack(), "my test text...3");
        Thread.sleep(Integer.MAX_VALUE);
    }
    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected == event.getState())
            countDownLatch.countDown();
    }

}
    class IStringCallBack implements AsyncCallback.StringCallback {
        public void processResult(int rc, String path, Object ctx, String name) {
            System.out.println("rc:"+rc+",path:"+path+",ctx:"+ctx+"name,"+name);
        }
    }
View Code

控制台

說明:同步需要關注接口異常,異步接口不會反回異常,而是在回調函數中通過result code體現

 

2.3 創建節點

只允許刪除葉子節點,即如果節點包含子節點,則必須先刪除子節點才能刪除本節點。

2.3.1 同步方式

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/12.
 */
public class ZookeeperTestDeleteNodeSync implements Watcher {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final String ADDRESS = "192.168.1.8:2181";
    private static final String PREFIX_SYNC = "/mytest-sync-delete-";

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZooKeeper zooKeeper = new ZooKeeper(ADDRESS, 5000, new ZookeeperTestDeleteNodeSync());
        countDownLatch.await();
        zooKeeper.create(PREFIX_SYNC, "mydelete".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zooKeeper.create(PREFIX_SYNC + "/c1", "mydelete".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        try {
            zooKeeper.delete(PREFIX_SYNC, -1);
        } catch (Exception e) {
            System.out.println("faile to delete path:"+PREFIX_SYNC);
        }
        zooKeeper.delete(PREFIX_SYNC + "/c1", -1);
        System.out.println("success to delete /c1");
        zooKeeper.delete(PREFIX_SYNC , -1);
        Thread.sleep(Integer.MAX_VALUE);

    }
    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected == event.getState()){
            if(Event.EventType.None == event.getType() && null == event.getPath())
                countDownLatch.countDown();
        }
    }
}
View Code

控制台

2.3.2 異步方式

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/12.
 */
public class ZookeeperTestDeleteNodeASync implements Watcher {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final String ADDRESS = "192.168.1.8:2181";
    private static final String PREFIX_SYNC = "/mytest-async-delete-";

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZooKeeper zooKeeper = new ZooKeeper(ADDRESS, 5000, new ZookeeperTestDeleteNodeASync());
        countDownLatch.await();
        zooKeeper.create(PREFIX_SYNC, "mydelete".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zooKeeper.create(PREFIX_SYNC + "/c1", "mydelete".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        zooKeeper.delete(PREFIX_SYNC , -1, new IVoidCallback(), null);
        zooKeeper.delete(PREFIX_SYNC + "/c1", -1, new IVoidCallback(), null);
        zooKeeper.delete(PREFIX_SYNC , -1, new IVoidCallback(), null);

        System.out.println("success to delete /c1");
        zooKeeper.delete(PREFIX_SYNC , -1);
        Thread.sleep(Integer.MAX_VALUE);

    }
    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected == event.getState()){
            if(Event.EventType.None == event.getType() && null == event.getPath())
                countDownLatch.countDown();
        }
    }


}
class IVoidCallback implements AsyncCallback.VoidCallback {
    public void processResult(int rc, String path, Object ctx) {
        System.out.println(rc + "::" + path + "::" +ctx);
    }
}
View Code

控制台

 

 2.4 獲取子節點

2.4.1 同步方式

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Id;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/13.
 */
public class ZookeeperTestGetChildrenNodeSync implements Watcher {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final String ADDRESS = "192.168.1.8:2181";
    private static final String PREFIX_SYNC = "/mytest-sync-getChild-";
    private static   ZooKeeper zooKeeper ;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zooKeeper = new ZooKeeper(ADDRESS, 5000, new ZookeeperTestGetChildrenNodeSync());
        countDownLatch.await();
        zooKeeper.create(PREFIX_SYNC, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zooKeeper.create(PREFIX_SYNC + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println(zooKeeper.getChildren(PREFIX_SYNC, true));
        zooKeeper.create(PREFIX_SYNC + "/c2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println(zooKeeper.getChildren(PREFIX_SYNC, true));
        Thread.sleep(1000);
        zooKeeper.create(PREFIX_SYNC + "/c3", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println(zooKeeper.getChildren(PREFIX_SYNC, true));
        Thread.sleep(Integer.MAX_VALUE);

    }
    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected == event.getState()){
            if(Event.EventType.None == event.getType() && null == event.getPath()){
                countDownLatch.countDown();
            }else if(Event.EventType.NodeChildrenChanged == event.getType()){
                try {
                    System.out.println("get Child:" + zooKeeper.getChildren(event.getPath(), true));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
View Code

控制台

Watcher通知是一次性的,即一旦觸發一次通知后,該Watcher就失效了,因此客戶端需要反復注冊Watcher,即程序中在process里面又注冊了Watcher,zooKeeper.getData(path,watch,stat), 第二個參數值設為true則添加一次監聽, 否則,將無法獲取c3節點的創建而導致子節點變化的事件。

2.4.2 異步方式

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/13.
 */
public class ZookeeperTestGetChildrenNodeASync implements Watcher {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final String ADDRESS = "192.168.1.8:2181";
    private static final String PREFIX_SYNC = "/mytest-async-getChild-";
    private static   ZooKeeper zooKeeper ;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zooKeeper = new ZooKeeper(ADDRESS, 5000, new ZookeeperTestGetChildrenNodeASync());
        countDownLatch.await();
        zooKeeper.create(PREFIX_SYNC, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zooKeeper.create(PREFIX_SYNC + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        zooKeeper.getChildren(PREFIX_SYNC, true, new IChildren2Callback(), null);
        zooKeeper.create(PREFIX_SYNC + "/c2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        zooKeeper.getChildren(PREFIX_SYNC, true, new IChildren2Callback(), null);
        Thread.sleep(1000);
        zooKeeper.create(PREFIX_SYNC + "/c3", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        zooKeeper.getChildren(PREFIX_SYNC, true, new IChildren2Callback(), null);
        Thread.sleep(Integer.MAX_VALUE);

    }
    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected == event.getState()){
            if(Event.EventType.None == event.getType() && null == event.getPath()){
                countDownLatch.countDown();
            }else if(Event.EventType.NodeChildrenChanged == event.getType()){
                try {
                    System.out.println("get Child:" + zooKeeper.getChildren(event.getPath(), true));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

}
class IChildren2Callback implements  AsyncCallback.Children2Callback {
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        System.out.println(rc + "::" + path + "::" +ctx +"::" + children + "::" + stat);
    }

}
View Code

控制台

 

2.5 節點的數據獲取

2.5.1 同步方式

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/13.
 */
public class ZookeeperTestGetNodeDataSync implements Watcher {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final String ADDRESS = "10.0.40.10:2181";
    private static final String PREFIX_SYNC = "/mytest-sync-getData4-";
    private static   ZooKeeper zooKeeper ;
    private  static final Stat stat = new Stat();
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zooKeeper = new ZooKeeper(ADDRESS, 5000, new ZookeeperTestGetNodeDataSync());
        countDownLatch.await();
        zooKeeper.create(PREFIX_SYNC, "hellodata".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("the data of node:" + new String(zooKeeper.getData(PREFIX_SYNC, true, stat)));
        System.out.println("czxid::"+stat.getCzxid()+",mzxid::" + stat.getMzxid() + ",version::" +  stat.getVersion());
        zooKeeper.setData(PREFIX_SYNC, "hello2data".getBytes(), -1);
        Thread.sleep(Integer.MAX_VALUE);

    }
    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected == event.getState()){
            if(Event.EventType.None == event.getType() && null == event.getPath()){
                countDownLatch.countDown();
            }else if(Event.EventType.NodeDataChanged == event.getType()){
                try {
                    System.out.println("the data of:" +  event.getPath() + " is::" + new String(zooKeeper.getData(event.getPath(), true, stat)));
                    System.out.println("watch czxid::"+stat.getCzxid()+",mzxid::" + stat.getMzxid() + ",version::" +  stat.getVersion());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
View Code

控制台

 

2.5.2 異步方式

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/13.
 */
public class ZookeeperTestGetNodeDataASync implements Watcher {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final String ADDRESS = "10.0.40.10:2181";
    private static final String PREFIX_SYNC = "/mytest-async-getData8-";
    private static   ZooKeeper zooKeeper ;
    private  static final Stat stat = new Stat();
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zooKeeper = new ZooKeeper(ADDRESS, 5000, new ZookeeperTestGetNodeDataASync());
        countDownLatch.await();
        zooKeeper.create(PREFIX_SYNC, "hello7data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zooKeeper.getData(PREFIX_SYNC, true, new IDataCallback(), null);
        for(int i=0; i<3; i++){
            zooKeeper.setData(PREFIX_SYNC, "hello6data".getBytes(), -1);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected == event.getState()){
            if(Event.EventType.None == event.getType() && null == event.getPath()){
                countDownLatch.countDown();
            }else if(Event.EventType.NodeDataChanged == event.getType()){
                try {
                    zooKeeper.getData(PREFIX_SYNC, true, new IDataCallback(), null);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

}
class IDataCallback implements AsyncCallback.DataCallback {
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        System.out.println("watch rc::" + rc + ", path::" + path + ", ctx::" + ctx +", data::" + new String(data));
        System.out.println("watch czxid::"+stat.getCzxid()+",mzxid::" + stat.getMzxid() + ",version::" +  stat.getVersion());
    }
}
View Code

控制台

 

2.6 更新數據

在更新數據時,setData方法存在一個version參數,其用於指定節點的數據版本,表明本次更新操作是針對指定的數據版本進行的,但是,在getData方法中,並沒有提供根據指定數據版本來獲取數據的接口,那么,這里為何要指定數據更新版本呢,這里方便理解,可以等效於CAS(compare and swap),對於值V,每次更新之前都會比較其值是否是預期值A,只有符合預期,才會將V原子化地更新到新值B。Zookeeper的setData接口中的version參數可以對應預期值,表明是針對哪個數據版本進行更新,假如一個客戶端試圖進行更新操作,它會攜帶上次獲取到的version值進行更新,而如果這段時間內,Zookeeper服務器上該節點的數據已經被其他客戶端更新,那么其數據版本也會相應更新,而客戶端攜帶的version將無法匹配,無法更新成功,因此可以有效地避免分布式更新的並發問題

2.6.1 同步方式

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/13.
 */
public class ZookeeperTestSetNodeDataSync implements Watcher {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final String ADDRESS = "10.0.40.10:2181";
    private static final String PREFIX_SYNC = "/mytest-sync-setData2-";
    private static   ZooKeeper zooKeeper ;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zooKeeper = new ZooKeeper(ADDRESS, 5000, new ZookeeperTestSetNodeDataSync());
        countDownLatch.await();
        zooKeeper.create(PREFIX_SYNC, "hello6data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("the data of node:" + new String(zooKeeper.getData(PREFIX_SYNC, true, null)));
        Stat stat = zooKeeper.setData(PREFIX_SYNC, "hello6data".getBytes(), -1);
        System.out.println("czxid::"+stat.getCzxid()+",mzxid::" + stat.getMzxid() + ",version::" +  stat.getVersion());
        Stat stat2 = zooKeeper.setData(PREFIX_SYNC, "hello6data".getBytes(), stat.getVersion());
        System.out.println("czxid::"+stat2.getCzxid()+",mzxid::" + stat2.getMzxid() + ",version::" +  stat2.getVersion());
        try {
            zooKeeper.setData(PREFIX_SYNC, "hello6data".getBytes(), stat.getVersion());
        } catch (Exception e) {
            System.out.println("exception: " + e);
        }
        Thread.sleep(Integer.MAX_VALUE);

    }
    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected == event.getState()){
            if(Event.EventType.None == event.getType() && null == event.getPath()){
                countDownLatch.countDown();
            }else if(Event.EventType.NodeDataChanged == event.getType()){
                try {
                    System.out.println("watch the data of:" +  event.getPath() + " is::" + new String(zooKeeper.getData(event.getPath(), true, null)));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
View Code

控制台

2.6.2  異步方式

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/13.
 */
public class ZookeeperTestSetNodeDataASync implements Watcher {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final String ADDRESS = "10.0.40.10:2181";
    private static final String PREFIX_SYNC = "/mytest-async-setData4-";
    private static   ZooKeeper zooKeeper ;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zooKeeper = new ZooKeeper(ADDRESS, 5000, new ZookeeperTestSetNodeDataASync());
        countDownLatch.await();
        zooKeeper.create(PREFIX_SYNC, "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        zooKeeper.setData(PREFIX_SYNC, "hello2".getBytes(), -1, new StatCallback(), null);
        Thread.sleep(Integer.MAX_VALUE);

    }
    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected == event.getState()){
            if(Event.EventType.None == event.getType() && null == event.getPath()){
                countDownLatch.countDown();
            }
        }
    }

}
class StatCallback implements AsyncCallback.StatCallback {
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        System.out.println("rc:"+rc + ", path:" + path + ",ctx:" + ctx + ", stat:" + stat);
    }
}
View Code

控制台

rc(ResultCode)為0,表明成功更新節點數據。

 

2.7 檢查節點是否存在

2.7.1 同步方式

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/13.
 */
public class ZookeeperTestExistSync implements Watcher {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final String ADDRESS = "10.0.40.10:2181";
    private static final String PREFIX_SYNC = "/mytest-sync-exist-";
    private static   ZooKeeper zooKeeper ;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zooKeeper = new ZooKeeper(ADDRESS, 5000, new ZookeeperTestExistSync());
        countDownLatch.await();
        zooKeeper.exists(PREFIX_SYNC, true);
        zooKeeper.create(PREFIX_SYNC, "hello6data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        zooKeeper.setData(PREFIX_SYNC, "hello6data".getBytes(), -1);
        zooKeeper.delete(PREFIX_SYNC, -1);
        Thread.sleep(Integer.MAX_VALUE);

    }
    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected == event.getState()){
            try {
                if(Event.EventType.None == event.getType() && null == event.getPath()){
                    countDownLatch.countDown();
                }else if(Event.EventType.NodeDataChanged == event.getType()){
                    System.out.println("node " +  event.getPath() + " changed." );
                    zooKeeper.exists(event.getPath(), true);
                }else if(Event.EventType.NodeCreated == event.getType()){
                    System.out.println("node " + event.getPath() + "  created.");
                    zooKeeper.exists(event.getPath(), true);
                }else if(Event.EventType.NodeDeleted == event.getType()){
                    System.out.println("node " + event.getPath() + " deleted.");
                    zooKeeper.exists(event.getPath(), true);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}
View Code

控制台

 2.7.2 異步方式

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/13.
 */
public class ZookeeperTestExistASync implements Watcher {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final String ADDRESS = "10.0.40.10:2181";
    private static final String PREFIX_SYNC = "/mytest-async-exist8-";
    private static   ZooKeeper zooKeeper ;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zooKeeper = new ZooKeeper(ADDRESS, 5000, new ZookeeperTestExistASync());
        countDownLatch.await();
        zooKeeper.exists(PREFIX_SYNC, true, new IStaCallback(), null);
        zooKeeper.create(PREFIX_SYNC, "111".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zooKeeper.setData(PREFIX_SYNC, "222".getBytes(), -1);
        //判斷存在 並加監聽
        zooKeeper.exists(PREFIX_SYNC + "/c2", true, new IStaCallback(), null);
        zooKeeper.create(PREFIX_SYNC + "/c2", "111".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zooKeeper.delete(PREFIX_SYNC + "/c2", -1);
//        zooKeeper.exists(PREFIX_SYNC , true, new IStaCallback(), null);
        //上面修改數據 那步 對PREFIX_SYNC添加了一個監聽
        zooKeeper.delete(PREFIX_SYNC, -1);
        Thread.sleep(Integer.MAX_VALUE);

    }
    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected == event.getState()){
            try {
                if(Event.EventType.None == event.getType() && null == event.getPath()){
                    countDownLatch.countDown();
                }else if(Event.EventType.NodeDataChanged == event.getType()){
                    System.out.println("node " +  event.getPath() + " changed." );
                    zooKeeper.exists(event.getPath(), true, new IStaCallback(), null);
                }else if(Event.EventType.NodeCreated == event.getType()){
                    System.out.println("node " + event.getPath() + "  created.");
                    zooKeeper.exists(event.getPath(), true, new IStaCallback(), null);
                }else if(Event.EventType.NodeDeleted == event.getType()){
                    System.out.println("node " + event.getPath() + " deleted.");
                    zooKeeper.exists(event.getPath(), true, new IStaCallback(), null);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }

}
class IStaCallback implements AsyncCallback.StatCallback {
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        System.out.println("rc:" + rc + ",path: " + path + ",ctx:" + ctx + ", stat:" + stat);
    }
}
View Code

控制台

rc:-101,path: /mytest-async-exist8-,ctx:null, stat:null
node /mytest-async-exist8-  created.
rc:0,path: /mytest-async-exist8-,ctx:null, stat:8590337352,8590337352,1526279675044,1526279675044,0,0,0,0,3,0,8590337352

node /mytest-async-exist8- changed.
rc:0,path: /mytest-async-exist8-,ctx:null, stat:8590337352,8590337353,1526279675044,1526279675095,1,0,0,0,3,0,8590337352

rc:-101,path: /mytest-async-exist8-/c2,ctx:null, stat:null
node /mytest-async-exist8-/c2  created.
rc:0,path: /mytest-async-exist8-/c2,ctx:null, stat:8590337354,8590337354,1526279675128,1526279675128,0,0,0,0,3,0,8590337354

node /mytest-async-exist8-/c2 deleted.
rc:-101,path: /mytest-async-exist8-/c2,ctx:null, stat:null
node /mytest-async-exist8- deleted.
rc:-101,path: /mytest-async-exist8-,ctx:null, stat:null
View Code

 rc:-101 節點不存在去

 

2.8 權限相關

zookeeper使用ACL機制來實現權限控制, ACL機制主要分為3個方面,權限模式,權限ID和權限

2.8.1 權限模式

 1) IP 

  ip模式是指權限針對這個ip而設置的,比如"ip:192.168.0.6",即允許這個ip訪問數據節點

   2) digest

   digest模式是最常用的一種模式,形如"username:password"的方式。

 3) world

     該模式對所有用戶開放

   4) super

    超級管理員模式。需要在zkServer.sh中配置,形如"super:password" ,需要重啟服務器

2.8.2 權限ID

   根據不同模式 授權給的不同對象, ip模式為ip地址, digest模式為username

2.8.2 權限

即允許的操作 CRUD等

2.8.4 刪除節點+權限

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/13.
 */
public class ZookeeperTestAcl implements Watcher {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final String ADDRESS = "10.0.40.10:2181";
    private static final String PREFIX_SYNC = "/mytest-sync-acl-";
    private static   ZooKeeper zooKeeper ;
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        try {
            zooKeeper = new ZooKeeper(ADDRESS, 5000, new ZookeeperTestAcl());
            countDownLatch.await();
            zooKeeper.addAuthInfo("digest", "true".getBytes());
            zooKeeper.create(PREFIX_SYNC, "111".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
            zooKeeper.create(PREFIX_SYNC+"/c2", "222".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);
            ZooKeeper zk2 = new ZooKeeper(ADDRESS, 5000, null);
            try {
                zk2.delete(PREFIX_SYNC+"/c2", -1);
            } catch (Exception e) {
                System.out.println("delete exception: " + e);
            }
            ZooKeeper zk3 = new ZooKeeper(ADDRESS, 5000, null);
            zk3.addAuthInfo("digest", "true".getBytes());
            zk3.delete(PREFIX_SYNC+"/c2", -1);
            System.out.println("delete mytest-sync-acl-/c2 success");
            ZooKeeper zk4 = new ZooKeeper(ADDRESS, 5000, null);
            zk4.delete(PREFIX_SYNC, -1);
            System.out.println("delete mytest-sync-acl- success");
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            System.out.println("exception ::" +e);
        }

    }
    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected == event.getState()){
            if(Event.EventType.None == event.getType() && null == event.getPath()){
                countDownLatch.countDown();
            }
        }
    }
}
View Code

控制台

說明:

可以看到,第一次我們使用無權限的zk2去刪除,顯然會報錯,第二次我們使用帶權限的zk3去操作,子節點被刪除成功,但是當我們使用zk4去執行刪除操作的時候並沒有指定任何權限,依然能夠刪除其父節點,說明zk在進行刪除操作的時候,其權限的作用范圍是其子節點。也就是說,當我們對一個節點添加了權限之后我們依然可以隨意刪除該節點但是對於這個節點的子節點,就必須擁有相應的權限才能刪除。而且zk原生api不支持遞歸刪除,即在存在子節點的情況下,不允許刪除其父節點。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM