(原) 2.3 Curator使用


本文為原創文章,轉載請注明出處,謝謝

Curator使用

1、jar包引入,演示版本為2.6.0,非maven項目,可以下載jar包導入到項目中

 

       <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.6.0</version>
        </dependency> 

 

2、RetryPolicy:重試機制

  • ExponentialBackoffRetry:每次重試會增加重試時間baseSleepTimeMs
    • ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
    • ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
      • baseSleepTimeMs:基本重試時間差
      • maxRetries:最大重試次數
      • maxSleepMs:最大重試時間
  • RetryNTimes
    • RetryNTimes(int n, int sleepMsBetweenRetries)
      • n:重試次數
      • sleepMsBetweenRetries:每次重試間隔時間
  • RetryUntilElapsed
    • RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
      • maxElapsedTimeMs:最大重試時間
      • sleepMsBetweenRetries:每次重試間隔時間
  • BoundedExponentialBackoffRetry、RetryOneTime、SleepingRetry

3、創建Zookeeper連接

  • 傳統方式

   示例:CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.117.128:2181",5000,5000,retryPolicy);

   API:  

newClient(java.lang.String connectString, org.apache.curator.RetryPolicy retryPolicy)

newClient(java.lang.String connectString, int sessionTimeoutMs, int connectionTimeoutMs, org.apache.curator.RetryPolicy retryPolicy)
    • connectString:Zookeeper服務器地址
    • retryPolicy:自定義重試機制
    • sessionTimeoutMs:session超時時間
    • connectionTimeoutMs:連接超時時間
  • 鏈式方式 
curatorFramework = CuratorFrameworkFactory.builder()
                                .connectString("192.168.117.128:2181")
                                //.authorization() 設置訪問權限 設置方法同原生API
                                .sessionTimeoutMs(5000).connectionTimeoutMs(5000)
                                .retryPolicy(retryPolicy).build();
  • 代碼示例
    public void createSession() {
        //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);//基本重試間隔時間,重試次數(每次重試時間加長)
        //RetryPolicy retryPolicy = new RetryNTimes(5,1000);//重試次數,重試間隔時間
        RetryPolicy retryPolicy = new RetryUntilElapsed(5000,1000);//重試時間,重試間隔時間
        //curatorFramework = CuratorFrameworkFactory.newClient("192.168.117.128:2181",5000,5000,retryPolicy);
        curatorFramework = CuratorFrameworkFactory.builder()
                                .connectString("192.168.117.128:2181")
                                //.authorization() 設置訪問權限 設置方法同原生API
                                .sessionTimeoutMs(5000).connectionTimeoutMs(5000)
                                .retryPolicy(retryPolicy).build();
        curatorFramework.start();
    }

 

4、創建節點

public void createNode() throws Exception {
        createSession();
        String path = curatorFramework.create()
                .creatingParentsIfNeeded()//如果父節點沒有自動創建
                //.withACL()設置權限  權限創建同原生API
                .withMode(CreateMode.PERSISTENT)//節點類型
                .forPath("/note_curator/02", "02".getBytes());
        System.out.println("path:"+path);
    }

節點類型、權限設置詳見2.1Zookeeper原生API使用

 

5、節點刪除

  public void del() throws Exception {
        createSession();
        curatorFramework.delete()
                .guaranteed()//保證機制,出錯后后台刪除 直到刪除成功
                .deletingChildrenIfNeeded()//刪除當前節點下的所有節點,再刪除自身
                .forPath("/note_curator");
    }

 

6、獲取子節點

public void getChildren() throws Exception {
        createSession();
        List<String> children = curatorFramework.getChildren().forPath("/note_curator");
        System.out.println(children);

    }

 

7、獲取節點信息

public void getData() throws Exception {
        createSession();
        Stat stat = new Stat();
        byte[] u = curatorFramework.getData().storingStatIn(stat).forPath("/note_curator");
        System.out.println(new String(u));
        System.out.println(stat);
    }

 

8、設置節點信息

public void setData() throws Exception {
        createSession();
        curatorFramework.setData()
                //.withVersion(1) 設置版本號 樂觀鎖概念
                .forPath("/note_curator/01", "shengke0815".getBytes());
    }

 

9、是否存在節點

public void exists() throws Exception {
        createSession();
        Stat s = curatorFramework.checkExists().forPath("/note_curator");
        System.out.println(s);
    }

 

10、設置節點信息回調

 ExecutorService executorService = Executors.newFixedThreadPool(5);//線程池
    @Test
    public void setDataAsync() throws Exception {
        createSession();
        curatorFramework.setData().inBackground(new BackgroundCallback() {//設置節點信息時回調方法
            @Override
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {

                System.out.println(curatorFramework.getZookeeperClient());
                System.out.println(curatorEvent.getResultCode());
                System.out.println(curatorEvent.getPath());
                System.out.println(curatorEvent.getContext());
            }
        },"shangxiawen",executorService).forPath("/note_curator","sksujer0815".getBytes());
        Thread.sleep(Integer.MAX_VALUE);
    }

API:

inBackground(org.apache.curator.framework.api.BackgroundCallback backgroundCallback, java.lang.Object o, java.util.concurrent.Executor executor);
    • backgroundCallback:自定義BackgroundCallback
    •   o:上下文信息,回調方法中curatorEvent.getContext()可獲取此信息
    •   executor:線程池

 

11、監聽節點改變事件

public void nodeListen() throws Exception {
        createSession();
        final NodeCache cache = new NodeCache(curatorFramework,"/note_curator");
        cache.start();
        cache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println(new String(cache.getCurrentData().getData()));
                System.out.println(cache.getCurrentData().getPath());
            }
        });

        Thread.sleep(Integer.MAX_VALUE);

    }

 

12、監聽子節點列表改變事件

public void nodeClildrenListen() throws Exception {
        createSession();
        final PathChildrenCache cache = new PathChildrenCache(curatorFramework,"/note_curator",true);
        cache.start();
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                switch (pathChildrenCacheEvent.getType()){
                    case CHILD_ADDED:
                        System.out.println("add children");
                        System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
                        System.out.println(new String(pathChildrenCacheEvent.getData().getPath()));
                        break;
                    case CHILD_REMOVED:
                        System.out.println("remove children");
                        System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
                        System.out.println(new String(pathChildrenCacheEvent.getData().getPath()));
                        break;
                    case CHILD_UPDATED:
                        System.out.println("update children");
                        System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
                        System.out.println(new String(pathChildrenCacheEvent.getData().getPath()));
                        break;
                }
            }
        });

        Thread.sleep(Integer.MAX_VALUE);

    }

 

下一節:3.1 Zookeeper應用 - Master選舉


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM