一、基本介绍
1 、PERSISTENT (0, false, false),
持久节点:节点创建后,会一直存在,不会因客户端会话失效而删除;
2、PERSISTENT_SEQUENTIAL (2, false, true),
持久顺序节点:基本特性与持久节点一致,创建节点的过程中,zookeeper会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名;
3、EPHEMERAL (1, true, false),
临时节点:客户端会话失效或连接关闭后,该节点会被自动删除,且不能再临时节点下面创建子节点,否则报如下错(org.apache.zookeeper.KeeperException$NoChildrenForEphemeralsException: KeeperErrorCode = NoChildrenForEphemerals for /node/child);
4、EPHEMERAL_SEQUENTIAL (3, true, true);
临时顺序节点:基本特性与临时节点一致,创建节点的过程中,zookeeper会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名;
二、JavaApi的操作
1)maven引入

<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.12</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> </dependencies>
2)Java基本节点的操作

public class ZkClientOperator { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.112.131:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 10000;//ms /** * 主要测试增加 临时节点、持久化节点,以及读取内容,删除节点操作 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { //ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME); ZkClient zkc = new ZkClient(CONNECT_ADDR, SESSION_OUTTIME); //1. create and delete方法 zkc.createEphemeral("/temp"); zkc.createPersistent("/super/c1", true); Thread.sleep(10000); zkc.delete("/temp"); zkc.deleteRecursive("/super"); //2. 设置path和data 并且读取子节点和每个节点的内容 zkc.createPersistent("/super", "1234"); zkc.createPersistent("/super/c1", "c1内容"); zkc.createPersistent("/super/c2", "c2内容"); List<String> list = zkc.getChildren("/super"); for(String p : list){ System.out.println(p); String rp = "/super/" + p; String data = zkc.readData(rp); System.out.println("节点为:" + rp + ",内容为: " + data); } //3. 更新和判断节点是否存在 zkc.writeData("/super/c1", "新内容"); System.out.println(zkc.readData("/super/c1").toString()); System.out.println(zkc.exists("/super/c1")); // 4.递归删除/super内容 zkc.deleteRecursive("/super"); } }
3)测试watcher事件

public class ZooKeeperWatcher implements Watcher { /** 定义原子变量 */ AtomicInteger seq = new AtomicInteger(); /** 定义session失效时间 */ private static final int SESSION_TIMEOUT = 10000; /** zookeeper服务器地址 */ private static final String CONNECTION_ADDR = "192.168.112.131:2181"; /** zk父路径设置 */ private static final String PARENT_PATH = "/testWatch"; /** zk子路径设置 */ private static final String CHILDREN_PATH = "/testWatch/children"; /** 进入标识 */ private static final String LOG_PREFIX_OF_MAIN = "【Main】"; /** zk变量 */ private ZooKeeper zk = null; /** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */ private CountDownLatch connectedSemaphore = new CountDownLatch(1); /** * 创建ZK连接 * @param connectAddr ZK服务器地址列表 * @param sessionTimeout Session超时时间 */ public void createConnection(String connectAddr, int sessionTimeout) { this.releaseConnection(); try { zk = new ZooKeeper(connectAddr, sessionTimeout, this); System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器"); connectedSemaphore.await(); } catch (Exception e) { e.printStackTrace(); } } /** * 关闭ZK连接 */ public void releaseConnection() { if (this.zk != null) { try { this.zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 创建节点 * @param path 节点路径 * @param data 数据内容 * @return */ public boolean createPath(String path, String data) { try { //设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控) this.zk.exists(path, true); System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " + this.zk.create( /**路径*/ path, /**数据*/ data.getBytes(), /**所有可见*/ ZooDefs.Ids.OPEN_ACL_UNSAFE, /**永久存储*/ CreateMode.PERSISTENT ) + ", content: " + data); } catch (Exception e) { e.printStackTrace(); return false; } return true; } /** * 读取指定节点数据内容 * @param path 节点路径 * @return */ public String readData(String path, boolean needWatch) { try { return new String(this.zk.getData(path, needWatch, null)); } catch (Exception e) { e.printStackTrace(); return ""; } } /** * 更新指定节点数据内容 * @param path 节点路径 * @param data 数据内容 * @return */ public boolean writeData(String path, String data) { try { System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " + this.zk.setData(path, data.getBytes(), -1)); } catch (Exception e) { e.printStackTrace(); } return false; } /** * 删除指定节点 * * @param path * 节点path */ public void deleteNode(String path) { try { this.zk.delete(path, -1); System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path); } catch (Exception e) { e.printStackTrace(); } } /** * 判断指定节点是否存在 * @param path 节点路径 */ public Stat exists(String path, boolean needWatch) { try { return this.zk.exists(path, needWatch); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 获取子节点 * @param path 节点路径 */ private List<String> getChildren(String path, boolean needWatch) { try { return this.zk.getChildren(path, needWatch); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 删除所有节点 */ public void deleteAllTestPath() { if(this.exists(CHILDREN_PATH, false) != null){ this.deleteNode(CHILDREN_PATH); } if(this.exists(PARENT_PATH, false) != null){ this.deleteNode(PARENT_PATH); } } /** * 收到来自Server的Watcher通知后的处理。 */ @Override public void process(WatchedEvent event) { System.out.println("进入 process 。。。。。event = " + event); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } if (event == null) { return; } // 连接状态 Watcher.Event.KeeperState keeperState = event.getState(); // 事件类型 Watcher.Event.EventType eventType = event.getType(); // 受影响的path String path = event.getPath(); String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】"; System.out.println(logPrefix + "收到Watcher通知"); System.out.println(logPrefix + "连接状态:\t" + keeperState.toString()); System.out.println(logPrefix + "事件类型:\t" + eventType.toString()); if (Event.KeeperState.SyncConnected == keeperState) { // 成功连接上ZK服务器 if (Event.EventType.None == eventType) { System.out.println(logPrefix + "成功连接上ZK服务器"); connectedSemaphore.countDown(); } //创建节点 if (Event.EventType.NodeCreated == eventType) { System.out.println(logPrefix + "节点创建"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } this.exists(path, true); } //更新节点 else if (Event.EventType.NodeDataChanged == eventType) { System.out.println(logPrefix + "节点数据更新"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(logPrefix + "数据内容: " + this.readData(PARENT_PATH, true)); } //更新子节点 else if (Event.EventType.NodeChildrenChanged == eventType) { System.out.println(logPrefix + "子节点变更"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(logPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true)); } //删除节点 else if (Event.EventType.NodeDeleted == eventType) { System.out.println(logPrefix + "节点 " + path + " 被删除"); } } else if (Watcher.Event.KeeperState.Disconnected == keeperState) { System.out.println(logPrefix + "与ZK服务器断开连接"); } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) { System.out.println(logPrefix + "权限检查失败"); } else if (Watcher.Event.KeeperState.Expired == keeperState) { System.out.println(logPrefix + "会话失效"); } System.out.println("--------------------------------------------"); } public static void main(String[] args) throws Exception { //建立watcher ZooKeeperWatcher zkWatch = new ZooKeeperWatcher(); //创建连接 zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT); //System.out.println(zkWatch.zk.toString()); Thread.sleep(1000); // 清理节点 zkWatch.deleteAllTestPath(); if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) { // 读取数据,在操作节点数据之前先调用zookeeper的getData()方法是为了可以watch到对节点的操作。watch是一次性的, // 也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次。 System.out.println("---------------------- read parent ----------------------------"); // zkWatch.readData(PARENT_PATH, true); // 更新数据 zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + ""); /** 读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated,而不会输出NodeChildrenChanged, 也就是说创建子节点时没有watch。 如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT_PATH, ture)只会在创建c1时watch,输出c1的NodeChildrenChanged, 而不会输出创建c2时的NodeChildrenChanged,如果watch到c2的NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法, 其中path="/p/c1" */ System.out.println("---------------------- read children path ----------------------------"); zkWatch.getChildren(PARENT_PATH, true); Thread.sleep(1000); // 创建子节点,同理如果想要watch到NodeChildrenChanged状态,需要调用getChildren(CHILDREN_PATH, true) zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + ""); Thread.sleep(1000); zkWatch.readData(CHILDREN_PATH, true); zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + ""); } Thread.sleep(1000); // 清理节点 zkWatch.deleteAllTestPath(); Thread.sleep(1000); zkWatch.releaseConnection(); } }
三、Curator框架的的基本测试
1)基本操作

package cn.enjoy.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorListener; import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.util.List; import static com.sun.xml.internal.ws.dump.LoggingDumpTube.Position.Before; /** * 测试Apache Curator框架的基本用法 */ public class OperatorTest { //ZooKeeper服务地址 private static final String SERVER = "127.0.0.1:2181"; //会话超时时间 private final int SESSION_TIMEOUT = 30000; //连接超时时间 private final int CONNECTION_TIMEOUT = 5000; //创建连接实例 private CuratorFramework client = null; /** * baseSleepTimeMs:初始的重试等待时间 * maxRetries:最多重试次数 * <p> * <p> * ExponentialBackoffRetry:重试一定次数,每次重试时间依次递增 * RetryNTimes:重试N次 * RetryOneTime:重试一次 * RetryUntilElapsed:重试一定时间 */ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); @org.junit.Before public void init() { //创建 CuratorFrameworkImpl实例 client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy); //启动 client.start(); } /** * 测试创建节点 * * @throws Exception */ @Test public void testCreate() throws Exception { //创建永久节点 client.create().forPath("/curator", "/curator data".getBytes()); //创建永久有序节点 client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/curator_sequential", "/curator_sequential data".getBytes()); //创建临时节点 client.create().withMode(CreateMode.EPHEMERAL) .forPath("/curator/ephemeral", "/curator/ephemeral data".getBytes()); //创建临时有序节点 client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath("/curator/ephemeral_path1", "/curator/ephemeral_path1 data".getBytes()); } /** * 测试检查某个节点是否存在 * * @throws Exception */ @Test public void testCheck() throws Exception { Stat stat1 = client.checkExists().forPath("/curator"); Stat stat2 = client.checkExists().forPath("/curator2"); System.out.println("'/curator'是否存在: " + (stat1 != null ? true : false)); System.out.println("'/curator2'是否存在: " + (stat2 != null ? true : false)); } /** * 测试异步设置节点数据 * * @throws Exception */ @Test public void testSetDataAsync() throws Exception { //创建监听器 CuratorListener listener = new CuratorListener() { @Override public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println(event.getPath()); } }; //添加监听器 client.getCuratorListenable().addListener(listener); //异步设置某个节点数据 client.setData().inBackground().forPath("/curator", "sync".getBytes()); //为了防止单元测试结束从而看不到异步执行结果,因此暂停10秒 Thread.sleep(10000); } /** * 测试另一种异步执行获取通知的方式 * * @throws Exception */ @Test public void testSetDataAsyncWithCallback() throws Exception { BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println(event.getPath()); } }; //异步设置某个节点数据 client.setData().inBackground(callback).forPath("/curator", "/curator modified data with Callback".getBytes()); //为了防止单元测试结束从而看不到异步执行结果,因此暂停10秒 Thread.sleep(10000); } /** * 测试删除节点 * * @throws Exception */ @Test public void testDelete() throws Exception { //创建测试节点 client.create().orSetData().creatingParentsIfNeeded() .forPath("/curator/del_key1", "/curator/del_key1 data".getBytes()); client.create().orSetData().creatingParentsIfNeeded() .forPath("/curator/del_key2", "/curator/del_key2 data".getBytes()); client.create().forPath("/curator/del_key2/test_key", "test_key data".getBytes()); //删除该节点 client.delete().forPath("/curator/del_key1"); //级联删除子节点 client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator/del_key2"); } /* * 测试事务管理:碰到异常,事务会回滚 * @throws Exception */ @Test public void testTransaction() throws Exception { //定义几个基本操作 CuratorOp createOp = client.transactionOp().create() .forPath("/curator/one_path", "some data".getBytes()); CuratorOp setDataOp = client.transactionOp().setData() .forPath("/curator", "other data".getBytes()); CuratorOp deleteOp = client.transactionOp().delete() .forPath("/curator"); //事务执行结果 List<CuratorTransactionResult> results = client.transaction() .forOperations(createOp, setDataOp, deleteOp); //遍历输出结果 for (CuratorTransactionResult result : results) { System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType()); } } }
2) curator监听器的测试

public class EventTest { //ZooKeeper服务地址 private static final String SERVER = "192.168.30.10:2181"; //会话超时时间 private final int SESSION_TIMEOUT = 30000; //连接超时时间 private final int CONNECTION_TIMEOUT = 5000; //创建连接实例 private CuratorFramework client = null; /** * baseSleepTimeMs:初始的重试等待时间 * maxRetries:最多重试次数 * * * ExponentialBackoffRetry:重试一定次数,每次重试时间依次递增 * RetryNTimes:重试N次 * RetryOneTime:重试一次 * RetryUntilElapsed:重试一定时间 */ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); @org.junit.Before public void init(){ //创建 CuratorFrameworkImpl实例 client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy); //启动 client.start(); } /** * * @描述:第一种监听器的添加方式: 对指定的节点进行添加操作 * 仅仅能监控指定的本节点的数据修改,删除 操作 并且只能监听一次 --->不好 */ @Test public void TestListenterOne() throws Exception{ client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes()); // 注册观察者,当节点变动时触发 byte[] data = client.getData().usingWatcher(new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("获取 test 节点 监听器 : " + event); } }).forPath("/test"); client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes()); Thread.sleep(1000); client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes()); Thread.sleep(1000); System.out.println("节点数据: "+ new String(data)); Thread.sleep(10000); } /** * * @描述:第二种监听器的添加方式: Cache 的三种实现 * Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。 * 产生的事件会传递给注册的PathChildrenCacheListener。 * Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。 * Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。 */ //1.path Cache 连接 路径 是否获取数据 //能监听所有的字节点 且是无限监听的模式 但是 指定目录下节点的子节点不再监听 @Test public void setListenterTwoOne() throws Exception{ ExecutorService pool = Executors.newCachedThreadPool(); PathChildrenCache childrenCache = new PathChildrenCache(client, "/test", true); PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("开始进行事件分析:-----"); ChildData data = event.getData(); switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED : "+ data.getPath() +" 数据:"+ data.getData()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED : "+ data.getPath() +" 数据:"+ data.getData()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED : "+ data.getPath() +" 数据:"+ data.getData()); break; case INITIALIZED: System.out.println("INITIALIZED : "+ data.getPath() +" 数据:"+ data.getData()); break; default: break; } } }; childrenCache.getListenable().addListener(childrenCacheListener); System.out.println("Register zk watcher successfully!"); childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); //创建一个节点 client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes()); client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","enjoy".getBytes()); Thread.sleep(1000); client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node02","deer".getBytes()); Thread.sleep(1000); client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node02","demo".getBytes()); Thread.sleep(1000); client.delete().forPath("/test/node02"); Thread.sleep(10000); } //2.Node Cache 监控本节点的变化情况 连接 目录 是否压缩 //监听本节点的变化 节点可以进行修改操作 删除节点后会再次创建(空节点) @Test public void setListenterTwoTwo() throws Exception{ ExecutorService pool = Executors.newCachedThreadPool(); //设置节点的cache final NodeCache nodeCache = new NodeCache(client, "/test", false); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("the test node is change and result is :"); System.out.println("path : "+nodeCache.getCurrentData().getPath()); System.out.println("data : "+new String(nodeCache.getCurrentData().getData())); System.out.println("stat : "+nodeCache.getCurrentData().getStat()); } }); nodeCache.start(); client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes()); Thread.sleep(1000); client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","enjoy".getBytes()); Thread.sleep(10000); } //3.Tree Cache // 监控 指定节点和节点下的所有的节点的变化--无限监听 可以进行本节点的删除(不在创建) @Test public void TestListenterTwoThree() throws Exception{ ExecutorService pool = Executors.newCachedThreadPool(); //设置节点的cache TreeCache treeCache = new TreeCache(client, "/test"); //设置监听器和处理过程 treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { ChildData data = event.getData(); if(data !=null){ switch (event.getType()) { case NODE_ADDED: System.out.println("NODE_ADDED : "+ data.getPath() +" 数据:"+ new String(data.getData())); break; case NODE_REMOVED: System.out.println("NODE_REMOVED : "+ data.getPath() +" 数据:"+ new String(data.getData())); break; case NODE_UPDATED: System.out.println("NODE_UPDATED : "+ data.getPath() +" 数据:"+ new String(data.getData())); break; default: break; } }else{ System.out.println( "data is null : "+ event.getType()); } } }); //开始监听 treeCache.start(); //创建一个节点 client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes()); Thread.sleep(1000); client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","enjoy".getBytes()); Thread.sleep(1000); client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","deer".getBytes()); Thread.sleep(1000); client.create().orSetData().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/test/node02/node02_2","deer".getBytes()); Thread.sleep(10000); } }