在使用zookeper的時候一般不使用原生的API,Curator,解決了很多Zookeeper客戶端非常底層的細節開發工作,包括連接重連、反復注冊Watcher和NodeExistsException異常等等。
Curator包含了幾個包:
- curator-framework:對zookeeper的底層api的一些封裝
- curator-client:提供一些客戶端的操作,例如重試策略等
- curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分布式鎖、分布式計數器、分布式Barrier等
Maven依賴:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>
====================基本的API==============
1.創建會話
(1)靜態工廠創建會話
源碼如下:
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) { return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy); } /** * Create a new client * * @param connectString list of servers to connect to * @param sessionTimeoutMs session timeout * @param connectionTimeoutMs connection timeout * @param retryPolicy retry policy to use * @return client */ public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) { return builder(). connectString(connectString). sessionTimeoutMs(sessionTimeoutMs). connectionTimeoutMs(connectionTimeoutMs). retryPolicy(retryPolicy). build(); }
測試代碼:采用4個參數的方法
private static CuratorFramework getClient() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); return CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy); }
參數解釋:
connectString: 鏈接 URL
sessionTimeoutMs: 會話超時時間,單位毫秒,默認60000ms
connectionTimeoutMs: 連接創建超時時間,單位毫秒,默認60000ms
retryPolicy: 重試策略,內建有四種重試策略,也可以自行實現RetryPolicy接口
1.RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries) 以sleepMsBetweenRetries的間隔重連,直到超過maxElapsedTimeMs的時間設置 2.RetryNTimes(int n, int sleepMsBetweenRetries) 指定重連次數 3.RetryOneTime(int sleepMsBetweenRetry) 重連一次,簡單粗暴 4.ExponentialBackoffRetry ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) 時間間隔 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
(2)第二種方法:
private static CuratorFramework getClient() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.builder().connectString(URL) .sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build(); return client; }
(3)創建包含隔離命名空間的會話(下面也基於這種方式)
private static CuratorFramework getClient() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.builder().connectString(URL).sessionTimeoutMs(5000) .connectionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("curator").build(); return client; }
得到會話之后,調用 client.start(); 即可啟動客戶端。
原來zookeper數據結構如下:
關閉會話的方法如下:(必須在開啟之后在關閉,否則會報非法狀態異常)
client.close();
或者:
CloseableUtils.closeQuietly(client);
2.創建節點
這個可以遞歸創建父節點並不拋出異常;指定節點類型(臨時、順序、臨時永久),默認是永久
CuratorFramework client = getClient(); client.start(); // 創建普通節點(默認是持久節點),內容為空 client.create().forPath("/t1"); // 創建普通節點(默認是持久節點) client.create().forPath("/t2", "123456".getBytes()); // 創建永久順序節點 client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/t3", "123456".getBytes()); // 地櫃創建,如果父節點不存在也會創建 client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL) .forPath("/t4/t41/t411", "123456".getBytes());
結果:
3.刪除節點
也可以遞歸刪除。當然下面的多個流式接口可以隨意組合。
CuratorFramework client = getClient(); client.start(); // 刪除子節點,只能刪除葉子節點 client.delete().forPath("/t2"); // 遞歸刪除 client.delete().deletingChildrenIfNeeded().forPath("/t4/t41"); // 指定版本進行刪除 client.delete().withVersion(0).forPath("/t1"); // 強制刪除。guaranteed()接口是一個保障措施,只要客戶端會話有效,那么Curator會在后台持續進行刪除操作,直到刪除節點成功。 client.delete().guaranteed().forPath("/t30000000002");
結果:
4.讀取節點數據---可以注冊監聽
// 讀取數據不獲取stat byte[] forPath = client.getData().forPath("/t4"); System.out.println(new String(forPath, "UTF-8")); // 讀取數據且獲取stat Stat stat = new Stat(); byte[] forPath2 = client.getData().storingStatIn(stat).forPath("/t4"); System.out.println(new String(forPath2, "UTF-8")); System.out.println(stat); // 注冊觀察者,當節點變動時觸發 byte[] data = client.getData().usingWatcher(new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getType()); } }).forPath("/t4"); System.out.println("/t4: " + new String(data));
5.更新數據節點數據
// 更新數據,返回的是stat Stat forPath = client.setData().forPath("/t4", "data".getBytes()); // 更新一個節點的數據內容,強制指定版本進行更新 Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("/t4"); Stat forPath2 = client.setData().withVersion(stat.getVersion()).forPath("/t4", "data222".getBytes());
6. 檢查節點是否存在
Stat forPath = client.checkExists().forPath("/t4"); if (forPath != null) { System.out.println("exists"); } else { System.out.println("not exists"); }
7. 獲取某個節點的所有子節點路徑--這個獲取的是子節點的名稱且不帶/
List<String> forPath = client.getChildren().forPath("/");
System.out.println(forPath);
結果:
[t4]
8.事務
允許作為一個原子操作進行提交。
// inTransaction( )方法開啟一個ZooKeeper事務.可以復合create, setData, check, and/or delete 等操作然后調用commit()作為一個原子操作提交 client.inTransaction(). check().forPath("/t4"). and(). create().withMode(CreateMode.EPHEMERAL).forPath("/t3", "data".getBytes()). and(). setData().forPath("/t3", "data2".getBytes()). and(). commit();
9. 異步接口

響應碼(#getResultCode())
如下:
client.create().inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println(curatorEvent.getType() + " " + curatorEvent.getResultCode()); } }, Executors.newFixedThreadPool(2)).forPath("/t2", "測試值".getBytes());
注意: 如果#inBackground()方法不指定executor,那么會默認使用Curator的EventThread去進行異步處理。
====================監聽機制==============
Curator的監聽實現是對zookeeper原生監聽方法的高級封裝,主要體現在兩點:監聽重復注冊,事件發生信息。而且監聽事件返回詳細的信息,如變動的節點信息,節點的value等。
Curator 提供了3個接口:PathChildrenCacheListener、 NodeCache、TreeCache。三個接口都可以對一個不存在的節點進行監聽。
1. PathChildrenCache
對指定的路徑節點的一級子目錄進行監聽,不對該節點的操作進行監聽,對其子目錄的節點進行增、刪、改的操作監聽。
如果監聽的節點不存在會創建節點,如果節點是多級目錄會遞歸創建,節點刪除之后監聽事件會失效。
如下:
/** * * @param client * @throws Exception */ private static void setListenterThreeOne(CuratorFramework client) throws Exception { PathChildrenCache childrenCache = new PathChildrenCache(client, "/t4", true); PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { ChildData data = event.getData(); switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED : " + data.getPath() + " 數據:" + data.getData()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED : " + data.getPath() + " 數據:" + data.getData()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED : " + data.getPath() + " 數據:" + data.getData()); break; default: break; } } }; // 在注冊監聽器的時候,如果傳入此參數,當事件觸發時,邏輯由線程池處理。如果不傳會采用默認的線程池 ExecutorService pool = Executors.newFixedThreadPool(2); // childrenCache.getListenable().addListener(childrenCacheListener); childrenCache.getListenable().addListener(childrenCacheListener, pool); // 設置監聽模式 childrenCache.start(StartMode.BUILD_INITIAL_CACHE); }
StartMode:初始化方式
POST_INITIALIZED_EVENT:異步初始化。初始化后會觸發事件。如果節點不存在會創建節點。如果節點下面有子節點會觸發CHILD_ADDED事件。
NORMAL:異步初始化。如果節點不存在會創建節點。如果節點下面有子節點會觸發CHILD_ADDED事件。
BUILD_INITIAL_CACHE:同步初始化。如果節點不存在會創建節點。如果節點下面有子節點會觸發CHILD_ADDED事件。
2.NodeCache
對一個節點進行監聽,監聽事件包括指定的路徑節點的增、刪、改的操作。
可以對一個不存在的節點進行監控,當節點創建之后會觸發對應事件;節點被刪除並且重建之后事件也仍然。
// Node Cache 監控本節點的變化情況 連接 目錄 是否壓縮 // 監聽本節點的變化 節點可以進行修改操作 刪除節點后會再次創建(空節點) private static void setListenterThreeTwo(CuratorFramework client) throws Exception { ExecutorService pool = Executors.newCachedThreadPool(); // 設置節點的cache final NodeCache nodeCache = new NodeCache(client, "/t5", false); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("the test node is change and result is :"); System.out.println("path : " + nodeCache.getCurrentData().getPath()); System.out.println("data : " + new String(nodeCache.getCurrentData().getData())); System.out.println("stat : " + nodeCache.getCurrentData().getStat()); } }); nodeCache.start(); }
3.TreeCache
監控指定節點和節點下的所有的節點的變化--無限監聽 ,也就是可以監聽子孫目錄。
可以對一個不存在的節點進行監控,當節點創建之后會觸發對應事件;節點被刪除並且重建之后事件也仍然。
// 監控 指定節點和節點下的所有的節點的變化--無限監聽 private static void setListenterThreeThree(CuratorFramework client) throws Exception { // 設置節點的cache TreeCache treeCache = new TreeCache(client, "/t5"); // 設置監聽器和處理過程 treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { ChildData data = event.getData(); if (data != null) { switch (event.getType()) { case NODE_ADDED: System.out.println("NODE_ADDED : " + data.getPath() + " 數據:" + new String(data.getData())); break; case NODE_REMOVED: System.out.println("NODE_REMOVED : " + data.getPath() + " 數據:" + new String(data.getData())); break; case NODE_UPDATED: System.out.println("NODE_UPDATED : " + data.getPath() + " 數據:" + new String(data.getData())); break; default: break; } } else { System.out.println("data is null : " + event.getType()); } } }); // 開始監聽 treeCache.start(); }
補充:上面的三個接口在設置監聽器的時候都可以傳入自定義的線程池,也可不傳,如下:
// 在注冊監聽器的時候,如果傳入此參數,當事件觸發時,邏輯由線程池處理。如果不傳會采用默認的線程池 ExecutorService pool = Executors.newFixedThreadPool(2); // childrenCache.getListenable().addListener(childrenCacheListener); childrenCache.getListenable().addListener(childrenCacheListener, pool);