Zookeeper的应用场景中配置中心,其中看到watch机制
zookeeper作为一款成熟的分布式协调框架,订阅-发布功能是很重要的一个。所谓订阅功能,其实说白了就是观察者模式。观察者会订阅一些感兴趣的主题,然后这些主题一旦变化了,就会自动通知到这些观察者。
zookeeper的订阅发布也就是watch机制,是一个轻量级的设计。因为它采用了一种推拉结合的模式。一旦服务端感知主题变了,那么只会发送一个事件类型和节点信息给关注的客户端,而不会包括具体的变更内容,所以事件本身是轻量级的,这就是所谓的“推”部分。然后,收到变更通知的客户端需要自己去拉变更的数据,这就是“拉”部分。watche机制分为添加数据和监听节点。
Curator在这方面做了优化,Curator引入了Cache的概念用来实现对ZooKeeper服务器端进行事件监听。Cache是Curator对事件监听的包装,其对事件的监听可以近似看做是一个本地缓存视图和远程ZooKeeper视图的对比过程。而且Curator会自动的再次监听,我们就不需要自己手动的重复监听了。
Curator中的cache共有三种
-
NodeCache(监听和缓存根节点变化)
-
PathChildrenCache(监听和缓存子节点变化)
-
TreeCache(监听和缓存根节点变化和子节点变化)
下面我们分别对三种cache详解
-
NodeCache是用来监听节点的数据变化的,当监听的节点的数据发生变化的时候就会回调对应的函数。
-
1 //创建重试策略
2 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1); 3 //创建客户端
4 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy); 5 //开启客户端
6 client.start(); 7 System.out.println("连接成功"); 8 //创建节点数据监听对象
9 final NodeCache nodeCache = new NodeCache(client, "/hello"); 10 //开始缓存
11 /**
12 * 参数为true:可以直接获取监听的节点,System.out.println(nodeCache.getCurrentData());为ChildData{path='/aa', stat=607,765,1580205779732,1580973376268,2,1,0,0,5,1,608 13 , data=[97, 98, 99, 100, 101]} 14 * 参数为false:不可以获取监听的节点,System.out.println(nodeCache.getCurrentData());为null 15 */
16 nodeCache.start(true); 17 System.out.println(nodeCache.getCurrentData()); 18 //添加监听对象
19 nodeCache.getListenable().addListener(new NodeCacheListener() { 20 //如果节点数据有变化,会回调该方法
21 public void nodeChanged() throws Exception { 22 String data = new String(nodeCache.getCurrentData().getData()); 23 System.out.println("数据Watcher:路径=" + nodeCache.getCurrentData().getPath() 24 + ":data=" + data); 25 } 26 }); 27 System.in.read();
测试
修改节点数据
控制台显示
-
PathChildrenCache是用来监听指定节点 的子节点变化情况
-
1 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1); 2 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy); 3 client.start(); 4 //监听指定节点的子节点变化情况包括新增子节点 子节点数据变更 和子节点删除 5 //true表示用于配置是否把节点内容缓存起来,如果配置为true,客户端在接收到节点列表变更的同时,也能够获取到节点的数据内容(即:event.getData().getData())ͺ如果为false 则无法取到数据内容(即:event.getData().getData())
6 PathChildrenCache childrenCache = new PathChildrenCache(client,"/hello",true); 7 /**
8 * NORMAL: 普通启动方式, 在启动时缓存子节点数据 9 * POST_INITIALIZED_EVENT:在启动时缓存子节点数据,提示初始化 10 * BUILD_INITIAL_CACHE: 在启动时什么都不会输出 11 * 在官方解释中说是因为这种模式会在start执行执行之前先执行rebuild的方法,而rebuild的方法不会发出任何事件通知。 12 */
13 childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); 14 System.out.println(childrenCache.getCurrentData()); 15 //添加监听
16 childrenCache.getListenable().addListener(new PathChildrenCacheListener() { 17 @Override 18 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { 19 if(event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED){ 20 System.out.println("子节点更新"); 21 System.out.println("节点:"+event.getData().getPath()); 22 System.out.println("数据" + new String(event.getData().getData())); 23 }else if(event.getType() == PathChildrenCacheEvent.Type.INITIALIZED ){ 24 System.out.println("初始化操作"); 25 }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED ){ 26 System.out.println("删除子节点"); 27 System.out.println("节点:"+event.getData().getPath()); 28 System.out.println("数据" + new String(event.getData().getData())); 29 }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ){ 30 System.out.println("添加子节点"); 31 System.out.println("节点:"+event.getData().getPath()); 32 System.out.println("数据" + new String(event.getData().getData())); 33 }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED ){ 34 System.out.println("连接失效"); 35 }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED ){ 36 System.out.println("重新连接"); 37 }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST ){ 38 System.out.println("连接失效后稍等一会儿执行"); 39 } 40 } 41 }); 42 System.in.read(); // 使线程阻塞
-
TreeCache有点像上面两种Cache的结合体,NodeCache能够监听自身节点的数据变化(或者是创建该节点),PathChildrenCache能够监听自身节点下的子节点的变化,而TreeCache既能够监听自身节点的变化、也能够监听子节点的变化。
-
1 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1); 2 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy); 3 client.start(); 4 TreeCache treeCache = new TreeCache(client,"/hello"); 5 treeCache.start(); 6 System.out.println(treeCache.getCurrentData("/hello")); 7 treeCache.getListenable().addListener(new TreeCacheListener() { 8 @Override 9 public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { 10 if(event.getType() == TreeCacheEvent.Type.NODE_ADDED){ 11 System.out.println(event.getData().getPath() + "节点添加"); 12 }else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED){ 13 System.out.println(event.getData().getPath() + "节点移除"); 14 }else if(event.getType() == TreeCacheEvent.Type.NODE_UPDATED){ 15 System.out.println(event.getData().getPath() + "节点修改"); 16 }else if(event.getType() == TreeCacheEvent.Type.INITIALIZED){ 17 System.out.println("初始化完成"); 18 }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_SUSPENDED){ 19 System.out.println("连接过时"); 20 }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_RECONNECTED){ 21 System.out.println("重新连接"); 22 }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_LOST){ 23 System.out.println("连接过时一段时间"); 24 } 25 } 26 }); 27 System.in.read();
1:Zookeeper的数据结构(树型结构)
2:节点的分类(4个)
-
持久性(带序号、不带序号)
-
临时性(带序号、不带序号)
3:客户端命令(创建、查询、修改、删除)
4:Zookeeper的java的api介绍(创建、查询、修改、删除)
-
Curator的客户端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 3000, 3000, retryPolicy);
5:Zookeeper的watch机制
-
NodeCache
-
PathChildrenCache
-
TreeCache(监听和缓存根几点变化和子节点变化)(重点)