Apache Curator是用於Apache ZooKeeper的一個Java客戶端庫;它包括一個高級API框架和實用程序,使使用Apache ZooKeeper更加容易和可靠。Curator之於ZooKeeper就像Cuava之於Java。
本文件主要介紹使用Curator操作Zookeeper,文中所使用到的軟件版本:Java 1.8.0_191、Zookeeper 3.6.0、Junit 4.13。
1、引入依賴
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> </dependency>
2、基本操作
package com.inspur.demo.general.zookeeper; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; /** * 使用Curator操作Zookeeper */ public class CuratorCase { //Zookeeper地址,集群多個地址間用逗號分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181 private static String connectString = "10.49.196.10:2181"; private static int sessionTimeout = 20 * 1000; private static int connectionTimeout = 10 * 1000; private CuratorFramework cf; @Before public void before() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); cf = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectionTimeout) .retryPolicy(retryPolicy) .build(); cf.start(); } @After public void after() throws Exception { cf.close(); } /** * 創建節點 */ @Test public void create() throws Exception { /* * 同步創建節點 * 1.除非指明創建節點的類型,默認是持久節點 * 2.臨時節點沒有子節點;所以遞歸創建出來的節點,只有最后的數據節點才是指定類型的節點,其父節點是持久節點 */ //創建一個內容為空的節點 cf.create().forPath("/curator/node1"); //創建一個內容為aaa的節點 cf.create().forPath("/curator/node2", "aaa".getBytes()); //創建一個臨時節點 cf.create().withMode(CreateMode.EPHEMERAL).forPath("/curator/node3"); //遞歸創建,最后的節點類型為臨時節點 cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/curator/node4/a/b"); //創建一個節點,ACL為digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa cf.create().withACL(Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "jack:tgi9UCnyPo5FJjVylKr05nAlWeg=")))).forPath("/curator/node5"); /* * 異步創建節點 * 可以指定線程池,不指定則使用Zookeeper的EventThread線程對事件進行串行處理 */ CountDownLatch counter = new CountDownLatch(2); cf.create().inBackground(new BackgroundCallback(){ @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println(event); counter.countDown(); } }, Executors.newFixedThreadPool(1)).forPath("/curator/node6"); cf.create().inBackground(new BackgroundCallback(){ @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println(event); counter.countDown(); } }).forPath("/curator/node7"); counter.await(); } /** * 獲取節點內容 * @throws Exception */ @Test public void getData() throws Exception { Stat stat = new Stat(); byte[] bytes = cf.getData() .storingStatIn(stat)//狀態,可選 .forPath("/curator/node2"); System.out.println("狀態信息:" + stat); System.out.println("內容:" + new String(bytes)); //異步獲取數據 CountDownLatch counter = new CountDownLatch(1); cf.getData().inBackground(new BackgroundCallback(){ @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("event:" + event); System.out.println("內容:"+ new String(event.getData())); counter.countDown(); } }).forPath("/curator/node2"); counter.await(); } /** * 設置節點的值 * @throws Exception */ @Test public void setData() throws Exception { cf.setData() .withVersion(0) //指定版本,可選 .forPath("/curator/node2", "測試修改".getBytes()); } /** * 刪除節點 * @throws Exception */ @Test public void delete() throws Exception { cf.delete() .guaranteed() //如果刪除失敗,只要會話有效就會不斷的重試,直到刪除成功為止 .deletingChildrenIfNeeded()//刪除子節點,可選 .withVersion(0) //指定版本,可選 .forPath("/curator/node4"); } /** * 獲取子節點 * @throws Exception */ @Test public void getChildren() throws Exception { List<String> list = cf.getChildren().forPath("/curator"); System.out.println("子節點:" + list); } }
3、監控數據變化
package com.inspur.demo.general.zookeeper; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.*; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.concurrent.*; public class CuratorWatchCase { //Zookeeper地址,集群多個地址間用逗號分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181 private static String connectString = "10.49.196.10:2181"; private static int sessionTimeout = 20 * 1000; private static int connectionTimeout = 10 * 1000; private CuratorFramework cf; @Before public void before() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); cf = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectionTimeout) .retryPolicy(retryPolicy) .build(); cf.start(); } @After public void after() throws Exception { cf.close(); } /** * 監控節點變化 * @throws Exception */ @Test public void watchNode() throws Exception { CountDownLatch counter = new CountDownLatch(1); NodeCache cache = new NodeCache(cf, "/curator/node2", false); cache.start(); cache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("路徑為:" + cache.getCurrentData().getPath()); System.out.println("數據為:" + new String(cache.getCurrentData().getData())); System.out.println("狀態為:" + cache.getCurrentData().getStat()); //某種情況下退出監控 //if (...) { // counter.countDown(); //} } }); counter.await(); } /** * 監控子節點變化 * @throws Exception */ @Test public void watchChildren() throws Exception { //使用自定義的線程池 ExecutorService threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(32), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); CountDownLatch counter = new CountDownLatch(1); PathChildrenCache cache = new PathChildrenCache(cf, "/curator/node2", true); cache.start(); cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED"); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED"); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED"); break; default: System.out.println(event.getType()); } System.out.println("子節點信息:" + event.getData()); //某種情況下退出監控 //if (...) { // counter.countDown(); //} } }, threadPool); counter.await(); threadPool.shutdownNow(); } }
可以看到不管是基本的增刪改查還是監控數據變化,Curator都比原生的API好用很多。