Java操作zookeeper總共有三種方式:
1.原生的Java API
2.zkclient
3.curator
第一種實現代碼:
pom.xml
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.8</version> </dependency>
示例的java代碼如下:
package zook; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs; public class App { public static void main(String[] args) throws IOException, InterruptedException, KeeperException { String connStr = "192.168.126.128:2181"; CountDownLatch countDown = new CountDownLatch(1); Watcher watcher=new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == KeeperState.SyncConnected) { System.err.println("eventType:"+event.getType()); if(event.getType()==Event.EventType.None){ countDown.countDown(); }else if(event.getType()==Event.EventType.NodeCreated){ System.out.println("listen:節點創建"); }else if(event.getType()==Event.EventType.NodeChildrenChanged){ System.out.println("listen:子節點修改"); } } } }; ZooKeeper zookeeper = new ZooKeeper(connStr, 5000,watcher ); countDown.await(); //注冊監聽,每次都要重新注冊,否則監聽不到 zookeeper.exists("/top/jinyong", watcher); // 創建節點 String result = zookeeper.create("/top/jinyong", "一生一世".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(result); Thread.sleep(10); // 獲取節點 byte[] bs = zookeeper.getData("/top/jinyong", true, null); result = new String(bs); System.out.println("創建節點后的數據是:" + result); // 修改節點 zookeeper.setData("/top/jinyong", "I love you".getBytes(), -1); Thread.sleep(10); bs = zookeeper.getData("/top/jinyong", true, null); result = new String(bs); System.out.println("修改節點后的數據是:" + result); // 刪除節點 zookeeper.delete("/top/jinyong", -1); System.out.println("節點刪除成功"); } }
說明:
1.會話連接是異步的,需要自己去處理。此處用的CountDownLatch
2.Watch需要重復注冊,不然就不能生效,比如開始的zookeeper.exists("/top/jinyong", watcher);就是為了注冊監聽
3.開發的復雜性還是比較高的
4.不支持多節點刪除和創建。需要自己去遞歸。后面有一個關於遞歸的示例。
第二種實現:
pom.xml
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
示例的Java代碼如下:
package zook; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.ZkClient; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Watcher.Event.KeeperState; public class Client { public static void main(String[] args) throws InterruptedException { String connStr = "192.168.126.128:2181"; ZkClient zk = new ZkClient(connStr); // 注冊【數據】事件 zk.subscribeDataChanges("/top/zhuzhu", new IZkDataListener() { @Override public void handleDataDeleted(String arg0) throws Exception { System.err.println("數據刪除:" + arg0); } @Override public void handleDataChange(String arg0, Object arg1) throws Exception { System.err.println("數據修改:" + arg0 + "------" + arg1); } }); zk.subscribeChildChanges("/top", new IZkChildListener() { @Override public void handleChildChange(String arg0, List<String> arg1) throws Exception { System.err.println("子節點發生變化:" + arg0); arg1.forEach(f -> { System.out.println("content:" + f); }); } }); List<String> list = zk.getChildren("/"); list.forEach(e -> { System.out.println(e); }); String res = zk.create("/top/zhuzhu", "I love you", CreateMode.PERSISTENT); System.out.println("創建節點/top/zhuzhu成功:" + res); zk.writeData("/top/zhuzhu", "forerver"); System.out.println("修改節點/top/zhuzhu數據成功"); res = zk.readData("/top/zhuzhu"); System.out.println("節點數據:" + res); Thread.sleep(1000); zk.delete("/top/zhuzhu"); System.out.println("刪除節點/top/zhuzhu成功"); Thread.sleep(1000); System.out.println("------------------------------------------------"); for (int i = 0; i < 10; i++) { zk.create("/top/zhuzhu", "I love you", CreateMode.PERSISTENT); Thread.sleep(1000); zk.delete("/top/zhuzhu"); Thread.sleep(1000); } } }
說明:
1.subscribe開頭的為注冊監聽的一些方法
2.addAuthInfo和setAcl為權限相關控制
3.普通使用這種方式還是值得推薦的
第三種實現:
pom.xml
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.11.0</version> </dependency>
示例的Java代碼如下:
package zook; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; public class Curator { public static void main(String[] args) throws Exception { String connStr = "192.168.23.24:2181"; CuratorFramework cur=CuratorFrameworkFactory.builder() .connectString(connStr) .connectionTimeoutMs(5000) .retryPolicy(new ExponentialBackoffRetry(1000,3)) .build(); cur.start();//連接 //創建監聽 PathChildrenCache cache=new PathChildrenCache(cur,"/top",true); cache.start(); cache.rebuild(); cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework framwork, PathChildrenCacheEvent event) throws Exception { System.err.println("節點發生變化:"+event.getType()); } }); Stat stat=cur.checkExists().forPath("/top/zhuzhu"); if(stat!=null){ System.out.println("【/top/zhuzhu】節點存在,直接刪除"); cur.delete().forPath("/top/zhuzhu"); } cur.delete().forPath("/top/zhuzhu"); System.in.read(); System.out.println("准備創建【/top/zhuzhu】"); cur.create().withMode(CreateMode.PERSISTENT) .forPath("/top/zhuzhu", "love forever".getBytes()); System.out.println("節點【/top/zhuzhu】創建成功"); Thread.sleep(1000); byte[] bs=cur.getData().forPath("/top/zhuzhu"); System.out.println("數據:"+new String(bs)); Thread.sleep(1000); cur.delete().forPath("/top/zhuzhu"); Thread.sleep(1000); } /** * 三種watcher來做節點的監聽 * pathcache 監視一個路徑下子節點的創建、刪除、節點數據更新 * NodeCache 監視一個節點的創建、更新、刪除 * TreeCache pathcaceh+nodecache 的合體(監視路徑下的創建、更新、刪除事件), * 緩存路徑下的所有子節點的數據 */ public static void main1(String[] args) throws Exception { String connStr = "192.168.23.24:2181"; CuratorFramework curatorFramework=CuratorFrameworkFactory.builder() .connectString(connStr) .connectionTimeoutMs(5000) .retryPolicy(new ExponentialBackoffRetry(1000,3)) .build(); curatorFramework.start(); /** * 節點變化NodeCache */ /* NodeCache cache=new NodeCache(curatorFramework,"/curator",false); cache.start(true); cache.getListenable().addListener(()-> System.out.println("節點數據發生變化,變化后的結果" + ":"+new String(cache.getCurrentData().getData()))); curatorFramework.setData().forPath("/curator","菲菲".getBytes());*/ /** * PatchChildrenCache */ PathChildrenCache cache=new PathChildrenCache(curatorFramework,"/event",true); cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); cache.rebuild(); // Normal / BUILD_INITIAL_CACHE /POST_INITIALIZED_EVENT cache.getListenable().addListener((curatorFramework1,pathChildrenCacheEvent)->{ switch (pathChildrenCacheEvent.getType()){ case CHILD_ADDED: System.out.println("增加子節點"); break; case CHILD_REMOVED: System.out.println("刪除子節點"); break; case CHILD_UPDATED: System.out.println("更新子節點"); break; default:break; } }); // curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event","event".getBytes()); // TimeUnit.SECONDS.sleep(1); // System.out.println("1"); // curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event/event1","1".getBytes()); // TimeUnit.SECONDS.sleep(1); // System.out.println("2"); // // curatorFramework.setData().forPath("/event/event1","222".getBytes()); // TimeUnit.SECONDS.sleep(1); // System.out.println("3"); curatorFramework.delete().forPath("/event/event1"); System.out.println("4"); System.in.read(); } }
說明:
1.支持事務
2.支持Flush寫法
3.開始測試多次程序啟動就執行刪除節點,而監聽的結果確實新增,后來加了cache.rebuild();代碼就沒問題了。跟源碼,在cache.start()里面有一個構造函數也是調用了rebuild方法的。
4.功能還是比較強大的。高級功能都會用到這種方式
最后貼一個原生API的遞歸操作方式:
package zook; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs; public class ZookManager { ZooKeeper zookeeper = null; public ZookManager(String connStr) throws IOException, InterruptedException { CountDownLatch latch = new CountDownLatch(1); zookeeper = new ZooKeeper(connStr, 5000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == EventType.None) { if (event.getState() == KeeperState.SyncConnected) { latch.countDown(); } else { System.out.println("連接失敗."); latch.countDown(); } } } }); latch.await(); } /** 創建節點,不存在父節點將新增,如果節點已經存在將拋出異常 **/ public String create(String path, String val) throws KeeperException, InterruptedException { if (!checkPath(path)) { return ""; } String p = getParentPath(path); cycleCreate(p); String url = zookeeper.create(path, val.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); return url; } /** 設置節點的數據,如果節點不存在將新增該節點 **/ public Stat setData(String path, String val) throws KeeperException, InterruptedException { if (!checkPath(path)) { return null; } cycleCreate(path); return zookeeper.setData(path, val.getBytes(), -1); } /** 刪除節點,如果存在子節點將遞歸刪除 * @throws InterruptedException * @throws KeeperException **/ public void delete(String path) throws KeeperException, InterruptedException { if (!checkPath(path)) { return; } List<String> chidren = zookeeper.getChildren(path, false); for (String p : chidren) { delete(path + "/" + p); } zookeeper.delete(path, -1); } private void cycleCreate(String path) throws KeeperException, InterruptedException { Stat stat = zookeeper.exists(path, null); if (stat == null) { String p = getParentPath(path); cycleCreate(p);// 遞歸 // 創建 zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } /** * 檢查目錄是否正確 * @param path * @return */ private boolean checkPath(String path) { if (!path.startsWith("/")) { System.err.println("路徑必須以/開頭:" + path); return false; } if (path.endsWith("/")) { System.err.println("路徑不能以/結尾:" + path); return false; } if (path.contains("//")) { System.err.println("路徑格式不對,存在連續的/:" + path); return false; } if (path.equals("/")) { System.err.println("路徑格式不對,只有一個/:" + path); return false; } return true; } /** * 獲得父級目錄 * @param path /root/abc * @return */ private String getParentPath(String path) { int index = path.lastIndexOf("/"); return path.substring(0, index); } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZookManager zoo = new ZookManager("192.168.23.24:2181"); zoo.setData("/top/enjoy/abc", "abc"); zoo.setData("/top/enjoy/bbb", "bbb"); zoo.setData("/top/enjoy/ccc", "ccc"); System.out.println("成功新增"); zoo.delete("/top/enjoy"); System.out.println("成功刪除"); } }
