Zookeeper客戶端Curator基本API


  在使用zookeper的時候一般不使用原生的API,Curator,解決了很多Zookeeper客戶端非常底層的細節開發工作,包括連接重連、反復注冊Watcher和NodeExistsException異常等等。

Curator包含了幾個包:

  • curator-framework:對zookeeper的底層api的一些封裝
  • curator-client:提供一些客戶端的操作,例如重試策略等
  • curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分布式鎖、分布式計數器、分布式Barrier等

Maven依賴:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

 

 ====================基本的API==============

1.創建會話

(1)靜態工廠創建會話

源碼如下:

    public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
    {
        return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
    }
    /**
     * Create a new client
     *
     * @param connectString       list of servers to connect to
     * @param sessionTimeoutMs    session timeout
     * @param connectionTimeoutMs connection timeout
     * @param retryPolicy         retry policy to use
     * @return client
     */
    public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
    {
        return builder().
            connectString(connectString).
            sessionTimeoutMs(sessionTimeoutMs).
            connectionTimeoutMs(connectionTimeoutMs).
            retryPolicy(retryPolicy).
            build();
    }

 

測試代碼:采用4個參數的方法

    private static CuratorFramework getClient() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        return CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);
    }

參數解釋:

connectString: 鏈接 URL

sessionTimeoutMs: 會話超時時間,單位毫秒,默認60000ms

connectionTimeoutMs:    連接創建超時時間,單位毫秒,默認60000ms

retryPolicy:  重試策略,內建有四種重試策略,也可以自行實現RetryPolicy接口

1.RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries) 
以sleepMsBetweenRetries的間隔重連,直到超過maxElapsedTimeMs的時間設置
2.RetryNTimes(int n, int sleepMsBetweenRetries) 
指定重連次數
3.RetryOneTime(int sleepMsBetweenRetry)
重連一次,簡單粗暴
4.ExponentialBackoffRetry
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) 
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) 
時間間隔 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))

 

(2)第二種方法:

    private static CuratorFramework getClient() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(URL)
                .sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build();
        return client;
    }

 

(3)創建包含隔離命名空間的會話(下面也基於這種方式)

  為了實現不同的Zookeeper業務之間的隔離,需要為每個業務分配一個獨立的命名空間(NameSpace),即指定一個Zookeeper的根路徑。如果設置了該值,那么該客戶端對Zookeeper上的數據節點的操作都是基於該目錄進行的。
    private static CuratorFramework getClient() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString(URL).sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("curator").build();
        return client;
    }

 

得到會話之后,調用   client.start();   即可啟動客戶端。

原來zookeper數據結構如下:

 

關閉會話的方法如下:(必須在開啟之后在關閉,否則會報非法狀態異常)

client.close();

或者:

CloseableUtils.closeQuietly(client);

 

 

2.創建節點

  這個可以遞歸創建父節點並不拋出異常;指定節點類型(臨時、順序、臨時永久),默認是永久

        CuratorFramework client = getClient();
        client.start();
        // 創建普通節點(默認是持久節點),內容為空
        client.create().forPath("/t1");
        // 創建普通節點(默認是持久節點)
        client.create().forPath("/t2", "123456".getBytes());
        // 創建永久順序節點
        client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/t3", "123456".getBytes());
        // 地櫃創建,如果父節點不存在也會創建
        client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
                .forPath("/t4/t41/t411", "123456".getBytes());

 結果:

3.刪除節點

   也可以遞歸刪除。當然下面的多個流式接口可以隨意組合。

        CuratorFramework client = getClient();
        client.start();
        // 刪除子節點,只能刪除葉子節點
        client.delete().forPath("/t2");
        // 遞歸刪除
        client.delete().deletingChildrenIfNeeded().forPath("/t4/t41");
        // 指定版本進行刪除
        client.delete().withVersion(0).forPath("/t1");
        // 強制刪除。guaranteed()接口是一個保障措施,只要客戶端會話有效,那么Curator會在后台持續進行刪除操作,直到刪除節點成功。
        client.delete().guaranteed().forPath("/t30000000002");

 結果:

4.讀取節點數據---可以注冊監聽

        // 讀取數據不獲取stat
        byte[] forPath = client.getData().forPath("/t4");
        System.out.println(new String(forPath, "UTF-8"));

        // 讀取數據且獲取stat
        Stat stat = new Stat();
        byte[] forPath2 = client.getData().storingStatIn(stat).forPath("/t4");
        System.out.println(new String(forPath2, "UTF-8"));
        System.out.println(stat);

        // 注冊觀察者,當節點變動時觸發
        byte[] data = client.getData().usingWatcher(new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println(event.getType());
            }
        }).forPath("/t4");
        System.out.println("/t4: " + new String(data));

 5.更新數據節點數據

        // 更新數據,返回的是stat
        Stat forPath = client.setData().forPath("/t4", "data".getBytes());

        // 更新一個節點的數據內容,強制指定版本進行更新
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath("/t4");
        Stat forPath2 = client.setData().withVersion(stat.getVersion()).forPath("/t4", "data222".getBytes());

6. 檢查節點是否存在

        Stat forPath = client.checkExists().forPath("/t4");
        if (forPath != null) {
            System.out.println("exists");
        } else {
            System.out.println("not exists");
        }

7. 獲取某個節點的所有子節點路徑--這個獲取的是子節點的名稱且不帶/

        List<String> forPath = client.getChildren().forPath("/");
        System.out.println(forPath);

結果:

[t4]

8.事務

  允許作為一個原子操作進行提交。

        // inTransaction( )方法開啟一個ZooKeeper事務.可以復合create, setData, check, and/or delete 等操作然后調用commit()作為一個原子操作提交
        client.inTransaction().
            check().forPath("/t4").
            and().
            create().withMode(CreateMode.EPHEMERAL).forPath("/t3", "data".getBytes()).
            and().
            setData().forPath("/t3", "data2".getBytes()).
            and().
            commit();

9.  異步接口

  上面提到的創建、刪除、更新、讀取等方法都是同步的,Curator提供異步接口,引入了BackgroundCallback接口用於處理異步接口調用之后服務端返回的結果信息。
  BackgroundCallback接口中一個重要的回調值為CuratorEvent,里面包含事件類型、響應碼和節點的詳細信息。
CuratorEventType:

響應碼(#getResultCode())

如下:

        client.create().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                System.out.println(curatorEvent.getType() + "   " + curatorEvent.getResultCode());
            }
        }, Executors.newFixedThreadPool(2)).forPath("/t2", "測試值".getBytes());

注意:    如果#inBackground()方法不指定executor,那么會默認使用Curator的EventThread去進行異步處理。

 

 ====================監聽機制==============

 

   Curator的監聽實現是對zookeeper原生監聽方法的高級封裝,主要體現在兩點:監聽重復注冊,事件發生信息。而且監聽事件返回詳細的信息,如變動的節點信息,節點的value等。

  Curator 提供了3個接口:PathChildrenCacheListener、 NodeCache、TreeCache。三個接口都可以對一個不存在的節點進行監聽。

1.   PathChildrenCache 

  對指定的路徑節點的一級子目錄進行監聽,不對該節點的操作進行監聽,對其子目錄的節點進行增、刪、改的操作監聽。

  如果監聽的節點不存在會創建節點,如果節點是多級目錄會遞歸創建,節點刪除之后監聽事件會失效。

如下:

    /**
     * 
     * @param client
     * @throws Exception
     */
    private static void setListenterThreeOne(CuratorFramework client) throws Exception {
        PathChildrenCache childrenCache = new PathChildrenCache(client, "/t4", true);
        PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                ChildData data = event.getData();
                switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("CHILD_ADDED : " + data.getPath() + "  數據:" + data.getData());
                    break;
                case CHILD_REMOVED:
                    System.out.println("CHILD_REMOVED : " + data.getPath() + "  數據:" + data.getData());
                    break;
                case CHILD_UPDATED:
                    System.out.println("CHILD_UPDATED : " + data.getPath() + "  數據:" + data.getData());
                    break;
                default:
                    break;
                }
            }
        };

        // 在注冊監聽器的時候,如果傳入此參數,當事件觸發時,邏輯由線程池處理。如果不傳會采用默認的線程池
        ExecutorService pool = Executors.newFixedThreadPool(2);
        // childrenCache.getListenable().addListener(childrenCacheListener);
        childrenCache.getListenable().addListener(childrenCacheListener, pool);
        // 設置監聽模式
        childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
    }

 StartMode:初始化方式

  POST_INITIALIZED_EVENT:異步初始化。初始化后會觸發事件。如果節點不存在會創建節點。如果節點下面有子節點會觸發CHILD_ADDED事件。
  NORMAL:異步初始化。如果節點不存在會創建節點。如果節點下面有子節點會觸發CHILD_ADDED事件。
  BUILD_INITIAL_CACHE:同步初始化。如果節點不存在會創建節點。如果節點下面有子節點會觸發CHILD_ADDED事件。

2.NodeCache  

  對一個節點進行監聽,監聽事件包括指定的路徑節點的增、刪、改的操作。

  可以對一個不存在的節點進行監控,當節點創建之后會觸發對應事件;節點被刪除並且重建之后事件也仍然。

    // Node Cache 監控本節點的變化情況 連接 目錄 是否壓縮
    // 監聽本節點的變化 節點可以進行修改操作 刪除節點后會再次創建(空節點)
    private static void setListenterThreeTwo(CuratorFramework client) throws Exception {
        ExecutorService pool = Executors.newCachedThreadPool();
        // 設置節點的cache
        final NodeCache nodeCache = new NodeCache(client, "/t5", false);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("the test node is change and result is :");
                System.out.println("path : " + nodeCache.getCurrentData().getPath());
                System.out.println("data : " + new String(nodeCache.getCurrentData().getData()));
                System.out.println("stat : " + nodeCache.getCurrentData().getStat());
            }
        });
        nodeCache.start();
    }

3.TreeCache

   監控指定節點和節點下的所有的節點的變化--無限監聽 ,也就是可以監聽子孫目錄。

  可以對一個不存在的節點進行監控,當節點創建之后會觸發對應事件;節點被刪除並且重建之后事件也仍然。

    // 監控 指定節點和節點下的所有的節點的變化--無限監聽 
    private static void setListenterThreeThree(CuratorFramework client) throws Exception {
        // 設置節點的cache
        TreeCache treeCache = new TreeCache(client, "/t5");
        // 設置監聽器和處理過程
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                ChildData data = event.getData();
                if (data != null) {
                    switch (event.getType()) {
                    case NODE_ADDED:
                        System.out.println("NODE_ADDED : " + data.getPath() + "  數據:" + new String(data.getData()));
                        break;
                    case NODE_REMOVED:
                        System.out.println("NODE_REMOVED : " + data.getPath() + "  數據:" + new String(data.getData()));
                        break;
                    case NODE_UPDATED:
                        System.out.println("NODE_UPDATED : " + data.getPath() + "  數據:" + new String(data.getData()));
                        break;

                    default:
                        break;
                    }
                } else {
                    System.out.println("data is null : " + event.getType());
                }
            }
        });
        // 開始監聽
        treeCache.start();
    }

 

 補充:上面的三個接口在設置監聽器的時候都可以傳入自定義的線程池,也可不傳,如下:

        // 在注冊監聽器的時候,如果傳入此參數,當事件觸發時,邏輯由線程池處理。如果不傳會采用默認的線程池
        ExecutorService pool = Executors.newFixedThreadPool(2);
        // childrenCache.getListenable().addListener(childrenCacheListener);
        childrenCache.getListenable().addListener(childrenCacheListener, pool);

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM