curator操作zookeeper


使用zookeeper原生API實現一些復雜的東西比較麻煩。所以,出現了兩款比較好的開源客戶端,對zookeeper的原生API進行了包裝:zkClient和curator。后者是Netflix出版的,必屬精品,也是最好用的zk的開源客戶端。

一  curator基本API使用

推薦博客:https://www.cnblogs.com/java-zhao/p/7350945.html

https://www.cnblogs.com/nevermorewang/p/5815602.html    強烈推薦

pom

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
</dependency>

創建客戶端

private static CuratorFramework client = CuratorFrameworkFactory.builder()
       .authorization("digest","admin:123".getBytes()) //創建客戶端的時候授權 .connectString(
"ip:2181") .sessionTimeoutMs(50000) .connectionTimeoutMs(30000) .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

創建節點

      /**
          * 創建會話
          */
         client.start();
 
         /**
          * 創建節點
          * 注意:
          * 1 除非指明創建節點的類型,默認是持久節點
          * 2 ZooKeeper規定:所有非葉子節點都是持久節點,所以遞歸創建出來的節點,只有最后的數據節點才是指定類型的節點,其父節點是持久節點
          */
         client.create().forPath("/China");//創建一個初始內容為空的節點
         client.create().forPath("/America", "zhangsan".getBytes());
         client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");//創建一個初始內容為空的臨時節點
         client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());//遞歸創建,/Russia是持久節點
         
         
         /**
          * 異步創建節點
          * 注意:如果自己指定了線程池,那么相應的操作就會在線程池中執行,如果沒有指定,那么就會使用Zookeeper的EventThread線程對事件進行串行處理
          */
         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
             @Override
             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                 System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
                                    + ",type:" + event.getType());
             }
         }, Executors.newFixedThreadPool(10)).forPath("/async-curator-my");
 
         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
             @Override
             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                 System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
                                    + ",type:" + event.getType());
             }
         }).forPath("/async-curator-zookeeper");

獲取節點數據

Stat stat = new Stat();
byte[] bytes = zk.curator.getData().storingStatIn(stat).forPath("/liuzhonghua");
System.out.println("路徑/liuzhonghua的數據是:"+new String(bytes));
System.out.println("路徑/liuzhonghua的數據的版本是:"+stat.getVersion());

 路徑/liuzhonghua的數據是:lzh
 路徑/liuzhonghua的數據的版本是:0

 

更新節點

client.setData().withVersion(4).forPath("/America", "lisi".getBytes());

刪除節點

      /**
          * 刪除節點
          */
         client.delete().forPath("/China");//只能刪除葉子節點
         client.delete().deletingChildrenIfNeeded().forPath("/Russia");//刪除一個節點,並遞歸刪除其所有子節點
         client.delete().withVersion(5).forPath("/America");//強制指定版本進行刪除
         client.delete().guaranteed().forPath("/America");//注意:由於一些網絡原因,上述的刪除操作有可能失敗,使用guaranteed(),如果刪除失敗,會記錄下來,只要會話有效,就會不斷的重試,直到刪除成功為止
    

創建Acl權限

        ArrayList<ACL> acls = new ArrayList<ACL>();
        Id id1=new Id("digest", DigestAuthenticationProvider.generateDigest("admin1:123"));
        Id id2=new Id("digest", DigestAuthenticationProvider.generateDigest("admin2:123"));
        acls.add(new ACL(ZooDefs.Perms.ADMIN,id1));
        acls.add(new ACL(ZooDefs.Perms.CREATE,id2));
        acls.add(new ACL(ZooDefs.Perms.ADMIN | ZooDefs.Perms.READ,id2));
        zk.curator.create().creatingParentsIfNeeded().withACL(acls).forPath("/liuzhonghua001/003","lzh".getBytes());

結果:

[zk: localhost:2181(CONNECTED) 5] getAcl /liuzhonghua001/003
'digest,'admin1:CC9El/l/qyakTrK/mNREkrSikQ0=
: a
'digest,'admin2:iZICqg+4cn1ouEHqGPuzGrop/M4=
: c
'digest,'admin2:iZICqg+4cn1ouEHqGPuzGrop/M4=
: ra
[zk: localhost:2181(CONNECTED) 6]

 設置Acl權限

 ArrayList<ACL> acls = new ArrayList<ACL>();
        Id id1=new Id("digest", DigestAuthenticationProvider.generateDigest("admin1:123"));
        Id id2=new Id("digest", DigestAuthenticationProvider.generateDigest("admin2:123"));
        acls.add(new ACL(ZooDefs.Perms.ADMIN,id1));
        acls.add(new ACL(ZooDefs.Perms.CREATE,id2));
        acls.add(new ACL(ZooDefs.Perms.ADMIN | ZooDefs.Perms.READ,id2));

zk.curator.setACL().withACL(acls).forPath("/liuzhonghua001/003");

 

curator實現事件監聽

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

對指定節點進行監聽(監聽指定節點本身的變化,包括節點本身的創建和節點本身數據的變化)

NodeCache nodeCache = new NodeCache(client,"/lzh");
        nodeCache.getListenable().addListener(new NodeCacheListener() {
                                                  @Override
                                                  public void nodeChanged() throws Exception {
                                                      System.out.println("新的節點數據:" + new String(nodeCache.getCurrentData().getData()));
                                                  }
                                              });
nodeCache.start(true);

監聽子節點變化情況(1 新增子節點,2刪除子節點,3數據改變)

/**
         * 監聽子節點變化情況
         * 1 新增子節點
         * 2 刪除子節點
         * 3 子節點數據變更
         */
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/lzh",true);
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()){
                    case CHILD_ADDED:
                        System.out.println("新增子節點:" + event.getData().getPath());
                        break;
                    case CHILD_UPDATED:
                        System.out.println("子節點數據變化:" + event.getData().getPath());
                        break;
                    case CHILD_REMOVED:
                        System.out.println("刪除子節點:" + event.getData().getPath());
                        break;
                    default:
                        break;
                }
            }
        });
        pathChildrenCache.start();
  • PathChildrenCache只會監聽指定節點的一級子節點,不會監聽節點本身(例如:“/book13”),也不會監聽子節點的子節點(例如,“/book13/car/color”)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM