Curator Cache


1.Curator Cache 與原生ZooKeeper Wacher區別

   原生的ZooKeeper Wacher是一次性的:一個Wacher一旦觸發就會被移出,如果你想要反復使用Wacher,就要在Wacher被移除后重新注冊,使用起來很麻煩。使用Curator Cache 可以反復使用Wacher了。

2.Curator Cache 和Curator Wacher區別

 

2.相關類

Curator Cache主要提供了一下三組類,分別用於實現對節點的監聽,子節點的監聽和二者的混合:

  1.NodeCache,NodeCacheListener,ChildData,節點創建,節點數據內容變更,不能監聽節點刪除。

  2.PathChildrenCache,PathChildrenCacheListener,PathChildrenCacheEvent監聽指定節點的子節點的變更包括添加,刪除,子節點數據數據變更這三類。

  3.TreeCache,TreeCacheListener,TreeCacheEvent,TreeCacheSelector

 

3.節點監聽

package cn.francis.maven.hello.ZooKeeper;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;

public class NodeCacheDemo {
  public static void main(String[]args) throws Exception{
      TestingServer server=null;
      CuratorFramework client=null;
      NodeCache nodeCache=null;
      String path="/francis/nodecache/a";
      
     
      try{ 
         server=new TestingServer();
         client= CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000,3));
         client.start();
         
         path=client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL).forPath(path,"init".getBytes());
       
         
         nodeCache=new NodeCache(client,path,false);
         nodeCache.start();
         addListener(nodeCache);
         
         
        
         client.setData().forPath(path,"hello".getBytes());
         Thread.sleep(1000);
         client.delete().deletingChildrenIfNeeded().forPath(path);
         Thread.sleep(Integer.MAX_VALUE);
         
      }catch(Exception e){
          e.printStackTrace();
      }finally{
    
         
          //這里因為是測試,沒有加他們。
          //CloseableUtils.closeQuietly(nodeCache);
         /// CloseableUtils.closeQuietly(client);
         // CloseableUtils.closeQuietly(server);
      }     
  }
  
  public static void addListener(NodeCache nodeCache){
      //監聽類型:節點是否存在,節點數據內容改變,不監聽節點刪除。
         nodeCache.getListenable().addListener(new NodeCacheListener(){
    
            @Override
            public void nodeChanged() throws Exception {
                // TODO Auto-generated method stub
                if(nodeCache.getCurrentData()!=null)
                    System.out.println("path:"+nodeCache.getCurrentData().getPath()+",data:"+new String(nodeCache.getCurrentData().getData()));
                
            }});
  }
}

 

在上面的代碼中首先創建了一個節點,然后創建用這個節點路徑來創建NodeCache,啟動NodeCache,添加NodeCacheListener。然后調用setData來修改節點數據。上面的代碼輸入如下:

path:/francis/nodecache/_c_d5be73ca-592c-4eda-b7c4-c8ec60ef39a8-a,data:hello

 

子節點監聽:

 PathChildrenCache的構造函數如下:

public PathChildrenCache(CuratorFramework client,
                         String path,
                         boolean cacheData)
Parameters:
client - the client
path - path to watch
cacheData - if true, node contents are cached in addition to the stat

其中最主要的是cacheData,如果為true,那么當對子節點調用setData時,PathChildrenCache會受到這個CHILD_UPDATED事件。

下面看一下demo:

package cn.francis.maven.hello.ZooKeeper;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.hadoop.mapred.join.Parser.TType;
import org.apache.zookeeper.CreateMode;

public class NodeCacheDemo {
  public static void main(String[]args) throws Exception{
      TestingServer server=null;
      CuratorFramework client=null;
      NodeCache nodeCache=null;
      String path="/francis/nodecache/b";
      
     
      try{ 
         server=new TestingServer();
         client= CuratorFrameworkFactory.newClient(server.getConnectString(),new ExponentialBackoffRetry(1000,3));
         client.start();
         
         //這里將第三個參數cacheData設置為true,這樣當對子節點調用setData時,會受到CHILDREN_UPDATE通知。
         PathChildrenCache childrenCache=new PathChildrenCache(client,path,true);
         
childrenCache.start(StartMode.POST_INITIALIZED_EVENT); childrenCache.getListenable().addListener(
new PathChildrenCacheListener(){ @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { // TODO Auto-generated method stub if(event.getType()==Type.INITIALIZED){ System.out.println("create"+event.getData().getPath()); }else if(event.getType()==Type.CHILD_ADDED){ System.out.println("create"+event.getData().getPath()); }else if(event.getType()==Type.CHILD_REMOVED){ System.out.println("remove:"+event.getData().getPath()); }else if(event.getType()==Type.CHILD_UPDATED){ //System.out.println("update:"+event.getData().getPath()); System.out.println("update:"+new String(event.getData().getData())); } }}); //創建父節點 System.out.println(client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes())); Thread.sleep(1000); //創建子節點 String childPath1=ZKPaths.makePath(path, "a"); childPath1=client.create().withMode(CreateMode.PERSISTENT).forPath(childPath1,"1".getBytes()); Thread.sleep(1000); //對子節點賦值 client.setData().forPath(childPath1,"aaa".getBytes()); Thread.sleep(1000); //刪除子節點 client.delete().forPath(childPath1); client.delete().deletingChildrenIfNeeded().forPath("/francis"); Thread.sleep(2000); }catch(Exception e){ e.printStackTrace(); }finally{ CloseableUtils.closeQuietly(nodeCache); CloseableUtils.closeQuietly(client); CloseableUtils.closeQuietly(server); } }

 

 

3.TreeNodeCache

   TreeNodeCache將NodeCache和PathChildrenCache功能結合到一起了。他不僅可以對子節點和父節點同時進行監聽。如下:

 

package cn.francis.maven.hello.ZooKeeper;

import java.util.Map;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.hadoop.mapred.join.Parser.TType;
import org.apache.zookeeper.CreateMode;

public class NodeCacheDemo {
  public static void main(String[]args) throws Exception{
      TestingServer server=null;
      CuratorFramework client=null;
      NodeCache nodeCache=null;
      String path="/francis/nodecache/b";
      
     
      try{ 
         server=new TestingServer();
         client= CuratorFrameworkFactory.newClient(server.getConnectString(),new ExponentialBackoffRetry(1000,3));
         client.start();
         
    
         TreeCache treeNodeCache=new TreeCache(client,path);
         
         treeNodeCache.start();
         
         treeNodeCache.getListenable().addListener(new TreeCacheListener(){

            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                // TODO Auto-generated method stub
                switch(event.getType()){
                  case NODE_ADDED:
                      System.out.println("added:"+event.getData().getPath());
                      break;
                  case NODE_UPDATED:
                      System.out.println("updated:"+event.getData().getPath());
                      break;
                  case NODE_REMOVED: 
                      System.out.println("removed:"+event.getData().getPath());
                      break;
                  default:
                      System.out.println("other:"+event.getType());
                }
           }
             
         });
         
         

         
         //創建父節點
         client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());
         Thread.sleep(1000);
         
         //創建子節點
         String childPath1=ZKPaths.makePath(path, "a");
         childPath1=client.create().withMode(CreateMode.PERSISTENT).forPath(childPath1,"1".getBytes());
         Thread.sleep(1000);
         
         //對子節點賦值
         client.setData().forPath(childPath1,"aaa".getBytes());
         Thread.sleep(1000);

         //對子節點賦值
         client.setData().forPath(path,"aaa".getBytes());
         Thread.sleep(1000);

         //刪除子節點
         client.delete().forPath(childPath1);
         client.delete().deletingChildrenIfNeeded().forPath("/francis");
         
        
         Thread.sleep(2000);
         
      }catch(Exception e){
          e.printStackTrace();
      }finally{
    
         
          //這里因為是測試,沒有加他們。
          CloseableUtils.closeQuietly(nodeCache);
          CloseableUtils.closeQuietly(client);
          CloseableUtils.closeQuietly(server);
      }     
  }
}

 

輸出如下:

other:INITIALIZED
added:/francis/nodecache/b
added:/francis/nodecache/b/a
updated:/francis/nodecache/b/a
updated:/francis/nodecache/b
removed:/francis/nodecache/b/a
removed:/francis/nodecache/b


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM