使用zookeeper原生API實現一些復雜的東西比較麻煩。所以,出現了兩款比較好的開源客戶端,對zookeeper的原生API進行了包裝:zkClient和curator。后者是Netflix出版的,必屬精品,也是最好用的zk的開源客戶端。
一 curator基本API使用
引入依賴:
1 <dependency> 2 <groupId>org.apache.curator</groupId> 3 <artifactId>curator-framework</artifactId> 4 <version>2.12.0</version> 5 </dependency>
該依賴引入后,默認引入的zookeeper版本是3.4.8。
注意:不要引入>=3.0.0的curator-framework,默認引入的zookeeper版本是3.5.x(該版本還不穩定),目前測試起來還是有點問題的。
完整代碼:
1 package com.hulk.curator; 2 3 import org.apache.curator.framework.CuratorFramework; 4 import org.apache.curator.framework.CuratorFrameworkFactory; 5 import org.apache.curator.framework.api.BackgroundCallback; 6 import org.apache.curator.framework.api.CuratorEvent; 7 import org.apache.curator.retry.ExponentialBackoffRetry; 8 import org.apache.zookeeper.CreateMode; 9 import org.apache.zookeeper.data.Stat; 10 11 import java.util.concurrent.Executors; 12 13 public class CuratorTest { 14 private static CuratorFramework client = CuratorFrameworkFactory.builder() 15 .connectString("10.211.55.4:2181") 16 .sessionTimeoutMs(50000) 17 .connectionTimeoutMs(30000) 18 .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); 19 20 public static void main(String[] args) throws Exception { 21 /** 22 * 創建會話 23 */ 24 client.start(); 25 26 /** 27 * 創建節點 28 * 注意: 29 * 1 除非指明創建節點的類型,默認是持久節點 30 * 2 ZooKeeper規定:所有非葉子節點都是持久節點,所以遞歸創建出來的節點,只有最后的數據節點才是指定類型的節點,其父節點是持久節點 31 */ 32 client.create().forPath("/China");//創建一個初始內容為空的節點 33 client.create().forPath("/America", "zhangsan".getBytes()); 34 client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");//創建一個初始內容為空的臨時節點 35 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());//遞歸創建,/Russia是持久節點 36 37 /** 38 * 異步創建節點 39 * 注意:如果自己指定了線程池,那么相應的操作就會在線程池中執行,如果沒有指定,那么就會使用Zookeeper的EventThread線程對事件進行串行處理 40 */ 41 client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() { 42 @Override 43 public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { 44 System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode() 45 + ",type:" + event.getType()); 46 } 47 }, Executors.newFixedThreadPool(10)).forPath("/async-curator-my"); 48 49 client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() { 50 @Override 51 public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { 52 System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode() 53 + ",type:" + event.getType()); 54 } 55 }).forPath("/async-curator-zookeeper"); 56 57 /** 58 * 獲取節點內容 59 */ 60 byte[] data = client.getData().forPath("/America"); 61 System.out.println(new String(data)); 62 byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America"); //傳入一個舊的stat變量,來存儲服務端返回的最新的節點狀態信息 63 System.out.println(new String(data2)); 64 /** 65 * 更新數據 66 */ 67 Stat stat = client.setData().forPath("/America"); 68 client.setData().withVersion(4).forPath("/America", "lisi".getBytes()); 69 70 /** 71 * 刪除節點 72 */ 73 client.delete().forPath("/China");//只能刪除葉子節點 74 client.delete().deletingChildrenIfNeeded().forPath("/Russia");//刪除一個節點,並遞歸刪除其所有子節點 75 client.delete().withVersion(5).forPath("/America");//強制指定版本進行刪除 76 client.delete().guaranteed().forPath("/America");//注意:由於一些網絡原因,上述的刪除操作有可能失敗,使用guaranteed(),如果刪除失敗,會記錄下來,只要會話有效,就會不斷的重試,直到刪除成功為止 77 78 Thread.sleep(Integer.MAX_VALUE); 79 } 80 }
1 創建會話
curator創建會話有兩種方式,推薦流式API。
1 CuratorFramework client = CuratorFrameworkFactory.builder() 2 .connectString("10.211.55.4:2181") 3 .sessionTimeoutMs(50000) 4 .connectionTimeoutMs(30000) 5 .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
參數:
- connectString:zk的server地址,多個server之間使用英文逗號分隔開
- connectionTimeoutMs:連接超時時間,如上是30s,默認是15s
- sessionTimeoutMs:會話超時時間,如上是50s,默認是60s
- retryPolicy:失敗重試策略
- ExponentialBackoffRetry:構造器含有三個參數 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
- baseSleepTimeMs:初始的sleep時間,用於計算之后的每次重試的sleep時間,
- 計算公式:當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
- maxRetries:最大重試次數
- maxSleepMs:最大sleep時間,如果上述的當前sleep計算出來比這個大,那么sleep用這個時間
- baseSleepTimeMs:初始的sleep時間,用於計算之后的每次重試的sleep時間,
- 其他,查看org.apache.curator.RetryPolicy接口的實現類
- ExponentialBackoffRetry:構造器含有三個參數 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
此時會話還沒創建,使用如下代碼創建會話:
1 client.start();
start()會阻塞到會話創建成功為止。
2 創建節點
2.1 同步創建
1 client.create().forPath("/China");//創建一個初始內容為空的節點 2 client.create().forPath("/America", "zhangsan".getBytes()); 3 client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");//創建一個初始內容為空的臨時節點 4 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());//遞歸創建,/Russia是持久節點
注意:
- 除非指明創建節點的類型,默認是持久節點
- ZooKeeper規定:所有非葉子節點都是持久節點,所以遞歸創建出來的節點,只有最后的數據節點才是指定類型的節點,其父節點是持久節點
- creatingParentsIfNeeded():可以實現遞歸創建
2.2 異步創建
1 client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() { 2 @Override 3 public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { 4 System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode() 5 + ",type:" + event.getType()); 6 } 7 }, Executors.newFixedThreadPool(10)).forPath("/async-curator-my"); 8 9 client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() { 10 @Override 11 public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { 12 System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode() 13 + ",type:" + event.getType()); 14 } 15 }).forPath("/async-curator-zookeeper");
注意:
- 在curator中所有異步操作,都使用org.apache.curator.framework.api.BackgroundCallback接口的實現類完成
- 如果在BackgroundCallback中自己指定了線程池,那么相應的操作就會在線程池中執行,如果沒有指定,那么就會使用Zookeeper的EventThread線程對事件進行串行處理,所以上述的兩個輸出分別如下:
當前線程:pool-3-thread-1,code:0,type:CREATE 當前線程:main-EventThread,code:0,type:CREATE
3 獲取節點內容
1 byte[] data = client.getData().forPath("/America"); 2 System.out.println(new String(data)); 3 byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America"); //傳入一個舊的stat變量,來存儲服務端返回的最新的節點狀態信息 4 System.out.println(new String(data2));
4 獲取節點子節點列表
1 List<String> children = client.getChildren().forPath("/Russia");
5 更新數據
1 Stat stat = client.setData().forPath("/America"); 2 client.setData().withVersion(4).forPath("/America", "lisi".getBytes());
注意:
- version版本號還是為了實現CAS並發處理,也會強制某個線程必須更新相應的版本的數據
6 刪除節點
1 client.delete().forPath("/China");//只能刪除葉子節點 2 client.delete().deletingChildrenIfNeeded().forPath("/Russia");//刪除一個節點,並遞歸刪除其所有子節點 3 client.delete().withVersion(5).forPath("/America");//強制指定版本進行刪除 4 client.delete().guaranteed().forPath("/America");
注意:
- deletingChildrenIfNeeded()實現級聯刪除
- guaranteed()由於一些網絡原因,上述的刪除操作有可能失敗,使用guaranteed(),如果刪除失敗,會記錄下來,只要會話有效,就會不斷的重試,直到刪除成功為止
二 curator實現事件監聽
引入兩個依賴:
1 <dependency> 2 <groupId>org.apache.curator</groupId> 3 <artifactId>curator-framework</artifactId> 4 <version>2.12.0</version> 5 </dependency> 6 <dependency> 7 <groupId>org.apache.curator</groupId> 8 <artifactId>curator-recipes</artifactId> 9 <version>2.12.0</version> 10 </dependency>
給出全部代碼:
1 package com.hulk.curator; 2 3 import org.apache.curator.framework.CuratorFramework; 4 import org.apache.curator.framework.CuratorFrameworkFactory; 5 import org.apache.curator.framework.recipes.cache.NodeCache; 6 import org.apache.curator.framework.recipes.cache.NodeCacheListener; 7 import org.apache.curator.framework.recipes.cache.PathChildrenCache; 8 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; 9 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; 10 import org.apache.curator.retry.ExponentialBackoffRetry; 11 12 /** 13 * 事件監聽器 14 */ 15 public class CuratorWatcherTest { 16 private static CuratorFramework client = CuratorFrameworkFactory.builder() 17 .connectString("10.211.55.4:2181") 18 .sessionTimeoutMs(50000) 19 .connectionTimeoutMs(30000) 20 .retryPolicy(new ExponentialBackoffRetry(1000, 3)) 21 .build(); 22 23 public static void main(String[] args) throws Exception { 24 /** 25 * 創建會話 26 */ 27 client.start(); 28 client.create().creatingParentsIfNeeded().forPath("/book/computer","java".getBytes()); 29 /** 30 * 監聽指定節點本身的變化,包括節點本身的創建和節點本身數據的變化 31 */ 32 NodeCache nodeCache = new NodeCache(client,"/book/computer"); 33 nodeCache.getListenable().addListener(new NodeCacheListener() { 34 @Override 35 public void nodeChanged() throws Exception { 36 System.out.println("新的節點數據:" + new String(nodeCache.getCurrentData().getData())); 37 } 38 }); 39 nodeCache.start(true); 40 41 client.setData().forPath("/book/computer","c++".getBytes()); 42 /** 43 * 監聽子節點變化情況 44 * 1 新增子節點 45 * 2 刪除子節點 46 * 3 子節點數據變更 47 */ 48 PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/book13",true); 49 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { 50 @Override 51 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { 52 switch (event.getType()){ 53 case CHILD_ADDED: 54 System.out.println("新增子節點:" + event.getData().getPath()); 55 break; 56 case CHILD_UPDATED: 57 System.out.println("子節點數據變化:" + event.getData().getPath()); 58 break; 59 case CHILD_REMOVED: 60 System.out.println("刪除子節點:" + event.getData().getPath()); 61 break; 62 default: 63 break; 64 } 65 } 66 }); 67 pathChildrenCache.start(); 68 69 client.create().forPath("/book13"); 70 71 client.create().forPath("/book13/car", "bmw".getBytes()); 72 73 client.setData().forPath("/book13/car", "audi".getBytes()); 74 75 client.delete().forPath("/book13/car"); 76 } 77 }
curator的事件監聽分為:
- NodeCache:對節點本身的監聽
- 監聽節點本身的創建
- 監聽節點本身的數據的變化
- PathChildrenCache:對節點的子節點的監聽
- 監聽新增子節點
- 監聽刪除子節點
- 監聽子節點數據變化
注意:
- PathChildrenCache只會監聽指定節點的一級子節點,不會監聽節點本身(例如:“/book13”),也不會監聽子節點的子節點(例如,“/book13/car/color”)
三 zkui
zk的操作我們一般可以登上zk所在的機器,然后執行“sh zkCli.sh”,之后執行一些命令,但是由於這樣始終效率低下,這里推薦一款比較好用的zk的ui界面:zkui。
假設我們要在10.211.55.5機器上安裝該程序。
1 下載打包
1 git clone https://github.com/DeemOpen/zkui.git 2 cd zkui/ 3 mvn clean install
通過上述的操作,在zkui/target目錄下我們會生成一個fatjar:zkui-2.0-SNAPSHOT-jar-with-dependencies.jar,在啟動這個jar之前先要進行相關配置。
2 配置zkui
1 cp config.cfg target/ 2 vi config.cfg 3 修改內容如下,其他不變: 4 zkServer=10.211.55.5:2181
注意:需要將配置文件config.cfg與fatjar放在同一個目錄下。
3 啟動zkui
之后進入target/目錄下,執行:
1 nohup java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &
4 瀏覽器訪問
瀏覽器訪問“http://10.211.55.5:9090”,之后在登錄頁面輸入用戶名密碼:admin/manager進行登錄。(可以去config.cfg進行配置)