Zookeeper的watch机制


1.watch机制

Zookeeper的应用场景中配置中心,其中看到watch机制

 

 

1.1 什么是watch机制

zookeeper作为一款成熟的分布式协调框架,订阅-发布功能是很重要的一个。所谓订阅功能,其实说白了就是观察者模式。观察者会订阅一些感兴趣的主题,然后这些主题一旦变化了,就会自动通知到这些观察者。

zookeeper的订阅发布也就是watch机制,是一个轻量级的设计。因为它采用了一种推拉结合的模式。一旦服务端感知主题变了,那么只会发送一个事件类型和节点信息给关注的客户端,而不会包括具体的变更内容,所以事件本身是轻量级的,这就是所谓的“推”部分。然后,收到变更通知的客户端需要自己去拉变更的数据,这就是“拉”部分。watche机制分为添加数据和监听节点。

Curator在这方面做了优化,Curator引入了Cache的概念用来实现对ZooKeeper服务器端进行事件监听。Cache是Curator对事件监听的包装,其对事件的监听可以近似看做是一个本地缓存视图和远程ZooKeeper视图的对比过程。而且Curator会自动的再次监听,我们就不需要自己手动的重复监听了。

Curator中的cache共有三种

  • NodeCache(监听和缓存根节点变化)

  • PathChildrenCache(监听和缓存子节点变化)

  • TreeCache(监听和缓存根节点变化和子节点变化)

下面我们分别对三种cache详解

1.2. NodeCache

  • 介绍

    NodeCache是用来监听节点的数据变化的,当监听的节点的数据发生变化的时候就会回调对应的函数。

  • 增加监听

 1 //创建重试策略
 2 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);  3 //创建客户端
 4 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy);  5 //开启客户端
 6 client.start();  7 System.out.println("连接成功");  8 //创建节点数据监听对象
 9 final NodeCache nodeCache = new NodeCache(client, "/hello"); 10 //开始缓存
11 /**
12  * 参数为true:可以直接获取监听的节点,System.out.println(nodeCache.getCurrentData());为ChildData{path='/aa', stat=607,765,1580205779732,1580973376268,2,1,0,0,5,1,608 13 , data=[97, 98, 99, 100, 101]} 14  * 参数为false:不可以获取监听的节点,System.out.println(nodeCache.getCurrentData());为null 15  */
16 nodeCache.start(true); 17 System.out.println(nodeCache.getCurrentData()); 18 //添加监听对象
19 nodeCache.getListenable().addListener(new NodeCacheListener() { 20     //如果节点数据有变化,会回调该方法
21     public void nodeChanged() throws Exception { 22         String data = new String(nodeCache.getCurrentData().getData()); 23         System.out.println("数据Watcher:路径=" + nodeCache.getCurrentData().getPath() 24                 + ":data=" + data); 25  } 26 }); 27 System.in.read();

 测试

修改节点数据

 

 

控制台显示

1.3. PathChildrenCache

  • 介绍

    PathChildrenCache是用来监听指定节点 的子节点变化情况

  • 增加监听

 1 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);  2 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy);  3 client.start();  4 //监听指定节点的子节点变化情况包括͹新增子节点 子节点数据变更 和子节点删除  5 //true表示用于配置是否把节点内容缓存起来,如果配置为true,客户端在接收到节点列表变更的同时,也能够获取到节点的数据内容(即:event.getData().getData())ͺ如果为false 则无法取到数据内容(即:event.getData().getData())
 6 PathChildrenCache childrenCache = new PathChildrenCache(client,"/hello",true);  7 /**
 8  * NORMAL: 普通启动方式, 在启动时缓存子节点数据  9  * POST_INITIALIZED_EVENT:在启动时缓存子节点数据,提示初始化 10  * BUILD_INITIAL_CACHE: 在启动时什么都不会输出 11  * 在官方解释中说是因为这种模式会在start执行执行之前先执行rebuild的方法,而rebuild的方法不会发出任何事件通知。 12  */
13 childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); 14 System.out.println(childrenCache.getCurrentData()); 15 //添加监听
16 childrenCache.getListenable().addListener(new PathChildrenCacheListener() { 17  @Override 18     public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { 19         if(event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED){ 20             System.out.println("子节点更新"); 21             System.out.println("节点:"+event.getData().getPath()); 22             System.out.println("数据" + new String(event.getData().getData())); 23         }else if(event.getType() == PathChildrenCacheEvent.Type.INITIALIZED ){ 24             System.out.println("初始化操作"); 25         }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED ){ 26             System.out.println("删除子节点"); 27             System.out.println("节点:"+event.getData().getPath()); 28             System.out.println("数据" + new String(event.getData().getData())); 29         }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ){ 30             System.out.println("添加子节点"); 31             System.out.println("节点:"+event.getData().getPath()); 32             System.out.println("数据" + new String(event.getData().getData())); 33         }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED ){ 34             System.out.println("连接失效"); 35         }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED ){ 36             System.out.println("重新连接"); 37         }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST ){ 38             System.out.println("连接失效后稍等一会儿执行"); 39  } 40  } 41 }); 42 System.in.read(); // 使线程阻塞

1.4. TreeCache

  • 介绍

    TreeCache有点像上面两种Cache的结合体,NodeCache能够监听自身节点的数据变化(或者是创建该节点),PathChildrenCache能够监听自身节点下的子节点的变化,而TreeCache既能够监听自身节点的变化、也能够监听子节点的变化。

  • 添加监听

 1 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);  2 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy);  3 client.start();  4 TreeCache treeCache = new TreeCache(client,"/hello");  5 treeCache.start();  6 System.out.println(treeCache.getCurrentData("/hello"));  7 treeCache.getListenable().addListener(new TreeCacheListener() {  8  @Override  9     public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { 10             if(event.getType() == TreeCacheEvent.Type.NODE_ADDED){ 11                 System.out.println(event.getData().getPath() + "节点添加"); 12             }else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED){ 13                 System.out.println(event.getData().getPath() + "节点移除"); 14             }else if(event.getType() == TreeCacheEvent.Type.NODE_UPDATED){ 15                 System.out.println(event.getData().getPath() + "节点修改"); 16             }else if(event.getType() == TreeCacheEvent.Type.INITIALIZED){ 17                 System.out.println("初始化完成"); 18             }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_SUSPENDED){ 19                 System.out.println("连接过时"); 20             }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_RECONNECTED){ 21                 System.out.println("重新连接"); 22             }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_LOST){ 23                 System.out.println("连接过时一段时间"); 24  } 25  } 26 }); 27 System.in.read();

【小结】

1:Zookeeper的数据结构(树型结构)

2:节点的分类(4个)

  • 持久性(带序号、不带序号)

  • 临时性(带序号、不带序号)

3:客户端命令(创建、查询、修改、删除)

4:Zookeeper的java的api介绍(创建、查询、修改、删除)

  • Curator的客户端

    RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,3);
    CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 3000, 3000, retryPolicy);

5:Zookeeper的watch机制

  • NodeCache

  • PathChildrenCache

  • TreeCache(监听和缓存根几点变化和子节点变化)(重点)


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM