目錄
一、介紹
2.1、導入依賴
2.2、連接zk集群
2.3、操作數據操作
3.1、導入依賴
3.2、使用示例
一、介紹
這里主要記錄通過Java調用API來操作Zookeeper集群的數據,對於zookeeper集群的搭建或者命令,可以參考:
目前接觸到的Java操作Zookeeper,有兩套API,一套是zookeeper官方提供的(zookeeper),另外一套是封裝了官方API的API(zkClient),從描述上來看,就知道,官方的API可能不是那么好用,不然也不會在封裝。
二、zookeeper API
2.1、導入依賴
使用zookeeper官方api的時候,請保證jar包的版本,和zk集群中zk的版本相同
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.7</version> </dependency>
2.2、連接zk集群
下面是代碼示例,兩種形式(分別使用匿名類和Lambda表達式):
package cn.ganlixin.zk; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.junit.Test; import java.io.IOException; public class ZookeeperDemo { @Test public void connectZkCluster() throws IOException, KeeperException, InterruptedException { // 構造方法 // ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) // 匿名對象形式 ZooKeeper zooKeeper = new ZooKeeper( "192.168.1.3:2181,192.168.1.4:2181,192.168.1.5:2181", 20000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { // 發生變更的節點路徑 String path = watchedEvent.getPath(); System.out.println("path:" + path); // 通知狀態 Watcher.Event.KeeperState state = watchedEvent.getState(); System.out.println("KeeperState:" + state); // 事件類型 Watcher.Event.EventType type = watchedEvent.getType(); System.out.println("EventType:" + type); } } ); // 關閉連接 zooKeeper.close(); // Lamdba形式 ZooKeeper zk = new ZooKeeper( "192.168.1.3:2181,192.168.1.4:2181,192.168.1.5:2181", 20000, watchedEvent -> { // 發生變更的節點路徑 String path = watchedEvent.getPath(); System.out.println("path:" + path); // 通知狀態 Watcher.Event.KeeperState state = watchedEvent.getState(); System.out.println("KeeperState:" + state); // 事件類型 Watcher.Event.EventType type = watchedEvent.getType(); System.out.println("EventType:" + type); } ); zk.close(); } }
運行上面的代碼,控制台輸出如下(輸出了兩遍,是因為創建了兩次連接)
path:null KeeperState:SyncConnected EventType:None path:null KeeperState:SyncConnected EventType:None
2.3、操作數據操作
操作Zk中的數據,方式也很簡單,只需要使用創建的zk連接,調用對應的方法即可(方法名與zk命令行中命令相同)
package cn.ganlixin.zk; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.junit.Test; import java.io.IOException; public class ZookeeperDemo { @Test public void manageData() throws KeeperException, InterruptedException, IOException { // 創建zk連接 ZooKeeper zk = new ZooKeeper( "192.168.1.3:2181,192.168.1.4:2181,192.168.1.5:2181", 20000, null ); // 創建節點 zk.create("/abc", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 獲取節點數據 // getData(String path, boolean watch, Stat stat); Stat stat = new Stat(); byte[] data = zk.getData("/abc", false, stat); System.out.println(new String(data)); // 數據內容 123 System.out.println(stat.getDataLength()); // 節點狀態(數據長度) 3 // 對/abc進行watch zk.getData("/abc", watchedEvent -> { System.out.println("path:" + watchedEvent.getPath()); System.out.println("KeeperState:" + watchedEvent.getState()); System.out.println("EventType:" + watchedEvent.getType()); }, null); // 設置節點數據 // setData(String path, byte[] data, int version) // 指定version為-1,表示不關心版本 zk.setData("/abc", "456".getBytes(), -1); // 設置兩次,第二次不會觸發通知 zk.setData("/abc", "789".getBytes(), -1); // 阻塞,以等待通知 Thread.sleep(1000); zk.close(); } }
上面的程序,運行輸出結果如下:
123 3 path:/abc KeeperState:SyncConnected EventType:NodeDataChanged
可以看到,只顯示了一次通知,和與預期相符。
三、zkClient API
因為Zookeeper API比較復雜,使用並不方便,所以出現了ZkClient,ZkClient對Zookeeper API進行了封裝,利用ZkClient可以更加方便地對Zookeeper進行操作。
3.1、導入依賴
因為zkClient是對zookeeper的再封裝,所以需要注意zkClient中zookeeper的版本與zk集群的版本相同,可以在maven倉庫中查看對應關系
<!-- https://mvnrepository.com/artifact/com.101tec/zkclient --> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
3.2、使用示例
下面是個簡單的示例:
package cn.ganlixin.zk; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.ZkClient; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Watcher; import org.junit.Test; import java.util.List; public class ZkClientDemo { @Test public void testConn() throws InterruptedException { ZkClient zkClient = new ZkClient( "192.168.1.3:2181,192.168.1.4:2181,192.168.1.5:2181", 20000 ); // 創建節點 zkClient.createPersistent("/abc", "hello"); zkClient.createEphemeral("/xyz", "world"); zkClient.create("/opq", "world", CreateMode.EPHEMERAL_SEQUENTIAL); String data = zkClient.readData("/abc"); System.out.println(data); // 監聽狀態變化 zkClient.subscribeStateChanges(new IZkStateListener() { @Override public void handleStateChanged(Watcher.Event.KeeperState keeperState) throws Exception { System.out.println("state:" + keeperState); } @Override public void handleNewSession() throws Exception { System.out.println("new session"); } @Override public void handleSessionEstablishmentError(Throwable throwable) throws Exception { throwable.printStackTrace(); } }); // 監聽子節點發生變化 zkClient.subscribeChildChanges("/", new IZkChildListener() { @Override public void handleChildChange(String path, List<String> list) throws Exception { System.out.println("watch path:" + path); // 輸出所有子節點 list.forEach(str -> { System.out.println(str); }); } }); Thread.sleep(100000); } }