7.5 zookeeper客戶端curator的基本使用 + zkui


使用zookeeper原生API實現一些復雜的東西比較麻煩。所以,出現了兩款比較好的開源客戶端,對zookeeper的原生API進行了包裝:zkClient和curator。后者是Netflix出版的,必屬精品,也是最好用的zk的開源客戶端。

一  curator基本API使用

引入依賴:

1         <dependency>
2             <groupId>org.apache.curator</groupId>
3             <artifactId>curator-framework</artifactId>
4             <version>2.12.0</version>
5         </dependency>

該依賴引入后,默認引入的zookeeper版本是3.4.8。

注意:不要引入>=3.0.0的curator-framework,默認引入的zookeeper版本是3.5.x(該版本還不穩定),目前測試起來還是有點問題的。

完整代碼:

 1 package com.hulk.curator;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 import org.apache.curator.framework.CuratorFrameworkFactory;
 5 import org.apache.curator.framework.api.BackgroundCallback;
 6 import org.apache.curator.framework.api.CuratorEvent;
 7 import org.apache.curator.retry.ExponentialBackoffRetry;
 8 import org.apache.zookeeper.CreateMode;
 9 import org.apache.zookeeper.data.Stat;
10 
11 import java.util.concurrent.Executors;
12 
13 public class CuratorTest {
14     private static CuratorFramework client = CuratorFrameworkFactory.builder()
15             .connectString("10.211.55.4:2181")
16             .sessionTimeoutMs(50000)
17             .connectionTimeoutMs(30000)
18             .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
19 
20     public static void main(String[] args) throws Exception {
21         /**
22          * 創建會話
23          */
24         client.start();
25 
26         /**
27          * 創建節點
28          * 注意:
29          * 1 除非指明創建節點的類型,默認是持久節點
30          * 2 ZooKeeper規定:所有非葉子節點都是持久節點,所以遞歸創建出來的節點,只有最后的數據節點才是指定類型的節點,其父節點是持久節點
31          */
32         client.create().forPath("/China");//創建一個初始內容為空的節點
33         client.create().forPath("/America", "zhangsan".getBytes());
34         client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");//創建一個初始內容為空的臨時節點
35         client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());//遞歸創建,/Russia是持久節點
36 
37         /**
38          * 異步創建節點
39          * 注意:如果自己指定了線程池,那么相應的操作就會在線程池中執行,如果沒有指定,那么就會使用Zookeeper的EventThread線程對事件進行串行處理
40          */
41         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
42             @Override
43             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
44                 System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
45                                    + ",type:" + event.getType());
46             }
47         }, Executors.newFixedThreadPool(10)).forPath("/async-curator-my");
48 
49         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
50             @Override
51             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
52                 System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
53                                    + ",type:" + event.getType());
54             }
55         }).forPath("/async-curator-zookeeper");
56 
57         /**
58          * 獲取節點內容
59          */
60         byte[] data = client.getData().forPath("/America");
61         System.out.println(new String(data));
62         byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America"); //傳入一個舊的stat變量,來存儲服務端返回的最新的節點狀態信息
63         System.out.println(new String(data2));
64         /**
65          * 更新數據
66          */
67         Stat stat = client.setData().forPath("/America");
68         client.setData().withVersion(4).forPath("/America", "lisi".getBytes());
69 
70         /**
71          * 刪除節點
72          */
73         client.delete().forPath("/China");//只能刪除葉子節點
74         client.delete().deletingChildrenIfNeeded().forPath("/Russia");//刪除一個節點,並遞歸刪除其所有子節點
75         client.delete().withVersion(5).forPath("/America");//強制指定版本進行刪除
76         client.delete().guaranteed().forPath("/America");//注意:由於一些網絡原因,上述的刪除操作有可能失敗,使用guaranteed(),如果刪除失敗,會記錄下來,只要會話有效,就會不斷的重試,直到刪除成功為止
77 
78         Thread.sleep(Integer.MAX_VALUE);
79     }
80 }

 

1  創建會話

curator創建會話有兩種方式,推薦流式API。

1 CuratorFramework client = CuratorFrameworkFactory.builder()
2             .connectString("10.211.55.4:2181")
3             .sessionTimeoutMs(50000)
4             .connectionTimeoutMs(30000)
5             .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

參數:

  • connectString:zk的server地址,多個server之間使用英文逗號分隔開
  • connectionTimeoutMs:連接超時時間,如上是30s,默認是15s
  • sessionTimeoutMs:會話超時時間,如上是50s,默認是60s
  • retryPolicy:失敗重試策略
    • ExponentialBackoffRetry:構造器含有三個參數 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
      • baseSleepTimeMs:初始的sleep時間,用於計算之后的每次重試的sleep時間,
        • 計算公式:當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
      • maxRetries:最大重試次數
      • maxSleepMs:最大sleep時間,如果上述的當前sleep計算出來比這個大,那么sleep用這個時間
    • 其他,查看org.apache.curator.RetryPolicy接口的實現類

此時會話還沒創建,使用如下代碼創建會話:

1 client.start();

start()會阻塞到會話創建成功為止。

 

2  創建節點

2.1  同步創建

1         client.create().forPath("/China");//創建一個初始內容為空的節點
2         client.create().forPath("/America", "zhangsan".getBytes());
3         client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");//創建一個初始內容為空的臨時節點
4         client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());//遞歸創建,/Russia是持久節點

注意:

  • 除非指明創建節點的類型,默認是持久節點
  • ZooKeeper規定:所有非葉子節點都是持久節點,所以遞歸創建出來的節點,只有最后的數據節點才是指定類型的節點,其父節點是持久節點
  • creatingParentsIfNeeded():可以實現遞歸創建

2.2  異步創建

 1         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
 2             @Override
 3             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
 4                 System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
 5                                    + ",type:" + event.getType());
 6             }
 7         }, Executors.newFixedThreadPool(10)).forPath("/async-curator-my");
 8 
 9         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
10             @Override
11             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
12                 System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
13                                    + ",type:" + event.getType());
14             }
15         }).forPath("/async-curator-zookeeper");

注意:

  • 在curator中所有異步操作,都使用org.apache.curator.framework.api.BackgroundCallback接口的實現類完成
  • 如果在BackgroundCallback中自己指定了線程池,那么相應的操作就會在線程池中執行,如果沒有指定,那么就會使用Zookeeper的EventThread線程對事件進行串行處理,所以上述的兩個輸出分別如下:
    當前線程:pool-3-thread-1,code:0,type:CREATE
    當前線程:main-EventThread,code:0,type:CREATE

     

3  獲取節點內容

1         byte[] data = client.getData().forPath("/America");
2         System.out.println(new String(data));
3         byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America"); //傳入一個舊的stat變量,來存儲服務端返回的最新的節點狀態信息
4         System.out.println(new String(data2));

4  獲取節點子節點列表

1 List<String> children = client.getChildren().forPath("/Russia");

 

5  更新數據

1         Stat stat = client.setData().forPath("/America");
2         client.setData().withVersion(4).forPath("/America", "lisi".getBytes());

注意:

  • version版本號還是為了實現CAS並發處理,也會強制某個線程必須更新相應的版本的數據

 

6  刪除節點

1         client.delete().forPath("/China");//只能刪除葉子節點
2         client.delete().deletingChildrenIfNeeded().forPath("/Russia");//刪除一個節點,並遞歸刪除其所有子節點
3         client.delete().withVersion(5).forPath("/America");//強制指定版本進行刪除
4         client.delete().guaranteed().forPath("/America");

注意:

  • deletingChildrenIfNeeded()實現級聯刪除
  • guaranteed()由於一些網絡原因,上述的刪除操作有可能失敗,使用guaranteed(),如果刪除失敗,會記錄下來,只要會話有效,就會不斷的重試,直到刪除成功為止

 

二  curator實現事件監聽

引入兩個依賴:

 1         <dependency>
 2             <groupId>org.apache.curator</groupId>
 3             <artifactId>curator-framework</artifactId>
 4             <version>2.12.0</version>
 5         </dependency>
 6         <dependency>
 7             <groupId>org.apache.curator</groupId>
 8             <artifactId>curator-recipes</artifactId>
 9             <version>2.12.0</version>
10         </dependency>

給出全部代碼:

 1 package com.hulk.curator;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 import org.apache.curator.framework.CuratorFrameworkFactory;
 5 import org.apache.curator.framework.recipes.cache.NodeCache;
 6 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 7 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 8 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 9 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
10 import org.apache.curator.retry.ExponentialBackoffRetry;
11 
12 /**
13  * 事件監聽器
14  */
15 public class CuratorWatcherTest {
16     private static CuratorFramework client = CuratorFrameworkFactory.builder()
17             .connectString("10.211.55.4:2181")
18             .sessionTimeoutMs(50000)
19             .connectionTimeoutMs(30000)
20             .retryPolicy(new ExponentialBackoffRetry(1000, 3))
21             .build();
22 
23     public static void main(String[] args) throws Exception {
24         /**
25          * 創建會話
26          */
27         client.start();
28         client.create().creatingParentsIfNeeded().forPath("/book/computer","java".getBytes());
29         /**
30          * 監聽指定節點本身的變化,包括節點本身的創建和節點本身數據的變化
31          */
32         NodeCache nodeCache = new NodeCache(client,"/book/computer");
33         nodeCache.getListenable().addListener(new NodeCacheListener() {
34             @Override
35             public void nodeChanged() throws Exception {
36                 System.out.println("新的節點數據:" + new String(nodeCache.getCurrentData().getData()));
37             }
38         });
39         nodeCache.start(true);
40 
41         client.setData().forPath("/book/computer","c++".getBytes());
42         /**
43          * 監聽子節點變化情況
44          * 1 新增子節點
45          * 2 刪除子節點
46          * 3 子節點數據變更
47          */
48         PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/book13",true);
49         pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
50             @Override
51             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
52                 switch (event.getType()){
53                     case CHILD_ADDED:
54                         System.out.println("新增子節點:" + event.getData().getPath());
55                         break;
56                     case CHILD_UPDATED:
57                         System.out.println("子節點數據變化:" + event.getData().getPath());
58                         break;
59                     case CHILD_REMOVED:
60                         System.out.println("刪除子節點:" + event.getData().getPath());
61                         break;
62                     default:
63                         break;
64                 }
65             }
66         });
67         pathChildrenCache.start();
68 
69         client.create().forPath("/book13");
70 
71         client.create().forPath("/book13/car", "bmw".getBytes());
72 
73         client.setData().forPath("/book13/car", "audi".getBytes());
74 
75         client.delete().forPath("/book13/car");
76     }
77 }

curator的事件監聽分為:

  • NodeCache:對節點本身的監聽
    • 監聽節點本身的創建
    • 監聽節點本身的數據的變化
  • PathChildrenCache:對節點的子節點的監聽
    • 監聽新增子節點
    • 監聽刪除子節點
    • 監聽子節點數據變化

注意

  • PathChildrenCache只會監聽指定節點的一級子節點,不會監聽節點本身(例如:“/book13”),也不會監聽子節點的子節點(例如,“/book13/car/color”)

 

三  zkui

zk的操作我們一般可以登上zk所在的機器,然后執行“sh zkCli.sh”,之后執行一些命令,但是由於這樣始終效率低下,這里推薦一款比較好用的zk的ui界面:zkui。

假設我們要在10.211.55.5機器上安裝該程序。

1  下載打包

1 git clone https://github.com/DeemOpen/zkui.git
2 cd zkui/
3 mvn clean install

通過上述的操作,在zkui/target目錄下我們會生成一個fatjar:zkui-2.0-SNAPSHOT-jar-with-dependencies.jar,在啟動這個jar之前先要進行相關配置。

2  配置zkui

1 cp config.cfg target/
2 vi config.cfg
3 修改內容如下,其他不變:
4 zkServer=10.211.55.5:2181

注意:需要將配置文件config.cfg與fatjar放在同一個目錄下。

3  啟動zkui

之后進入target/目錄下,執行:

1 nohup java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &

4  瀏覽器訪問

瀏覽器訪問“http://10.211.55.5:9090”,之后在登錄頁面輸入用戶名密碼:admin/manager進行登錄。(可以去config.cfg進行配置)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM