1. ZkClient API簡介
zkclient是Github上一個開源的ZooKeeper客戶端,在原生ZooKeeper API接口上進行包裝,同時在內部實現了session超時重連,Watcher反復注冊等功能
2. Maven工程方式導入ZkClient API
通過POM.xml方式,指定依賴的zookeepr包以及zkclient包
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.newforce</groupId> <artifactId>testzkclient</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.5</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.5</version> </dependency> </dependencies> </project>
3. ZkClient API使用
3.1 create session 創建和zookeeper集群間的連接
import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; /** * ZkClient library usage */ public class CreateSession { public static void main(String args[]){ ZkClient zc = new ZkClient("192.168.179.101:2181", 5000, 5000, new SerializableSerializer()); System.out.println("Connection OK!"); } }
核心代碼分析:
1)和zookeeper原生API不同,通過zkclient API創建會話,需要提供session timout, connection timeout兩個定時器
2)同時要提供1個序列化器實例,原因在於后續創建znode節點時,寫入的數據(java對象)會自動通過序列化器來轉換為byte[]
3)同理,讀取出的byte[]的數據,也會自動通過序列化器直接轉換為Java對象
3.2 創建znode節點
/** * ZkClient library usage */
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.apache.zookeeper.CreateMode;
public class CreateNode { public static void main(String args[]){ ZkClient zc = new ZkClient("192.168.179.101:2181", 5000, 5000, new SerializableSerializer());
User u = new User(); String actual_path = zc.create("/node_zkclient", u, CreateMode.PERSISTENT); //直接將實例u寫入,自動通過序列化為byte[]
System.out.println("Create path is: " + actual_path); }
private class User{
private Integer id;
private String name;
public Integer getId(){
return this.id;
}
public String getName(){
return this.name;
}
public String getInfo(){
return this.name + this.id;
}
}//User }
3.3 修改節點數據
/** * ZkClient library usage */ import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.apache.zookeeper.CreateMode; public class SetData { public static void main(String args[]){ ZkClient zc = new ZkClient("192.168.179.101:2181", 5000, 5000, new SerializableSerializer()); User u = new User(); u.setId(1); u.setName("shayzhang"); String actual_path = zc.create("/node_zkclient", u, CreateMode.PERSISTENT);
u.setId(2); u.setName("zhangjin"); zc.writeData(actual_path, u); //直接寫入實例,自動通過連接創建時創建的序列化實例,轉換為byte[] } }
3.4 獲取節點數據
/** * ZkClient library usage */ import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; public class GetData { public static void main(String args[]){ ZkClient zc = new ZkClient("192.168.179.101:2181", 5000, 5000, new SerializableSerializer()); User u = new User(); u.setId(1); u.setName("shayzhang"); String actual_path = zc.create("/node_zkclient", u, CreateMode.PERSISTENT); System.out.println("Create path is: " + actual_path); Stat stat = new Stat(); u = zc.readData(actual_path, stat); if (u == null){ System.out.println("Node doesn't exist!"); }else{ System.out.println(u.getInfo()); System.out.println(stat); } } }
3.5 獲取子節點列表
/** * ZkClient library usage */ import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import java.util.List; public class GetChildren { public static void main(String args[]){ ZkClient zc = new ZkClient("192.168.179.101:2181", 5000, 5000, new SerializableSerializer()); User u = new User(); u.setId(1); u.setName("shayzhang"); String actual_path = zc.create("/node_zkclient", u, CreateMode.PERSISTENT); System.out.println("Create path is: " + actual_path); List<String> children_list = zc.getChildren(actual_path); System.out.println("Children list of /node_zkclient is : " + children_list.toString()); //打印子節點列表 } }
3.6 刪除節點
/** * ZkClient library usage */ import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; public class DeleteNode { public static void main(String args[]){ ZkClient zc = new ZkClient("192.168.179.101:2181", 5000, 5000, new SerializableSerializer()); String delete_node = "/node_zkclient"; //delete node without children boolean e1 = zc.delete(delete_node); System.out.println("Delete node without children: " + e1); // delete node and all children boolean e2 = zc.deleteRecursive(delete_node); System.out.println("Delete node and children: " + e2); } }
3.7 判斷節點是否存在
/** * ZkClient library usage */ import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; public class NodeExist { public static void main(String args[]){ ZkClient zc = new ZkClient("192.168.179.101:2181", 5000, 5000, new SerializableSerializer()); String check_node = "/node_zkclient"; boolean exist = zc.exists(check_node); System.out.println("Node exist status is: " + exist); } }
3.8 訂閱子節點列表發生變化
/** * ZkClient library usage */ import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import java.util.List; public class SubscribeChildren { public static void main(String args[]){ ZkClient zc = new ZkClient("192.168.179.101:2181", 5000, 5000, new SerializableSerializer()); System.out.println("Connected to zk server!"); // subscribe children change event, multiple subscribe List<String> results = zc.subscribeChildChanges("/node_zkclient", new ZkChildListener()); // sleep until receive event notify try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } }//main private static class ZkChildListener implements IZkChildListener{ public void handleChildChange(String s, List<String> list) throws Exception { //print parent path System.out.println("Parent path: " + s); //print current children System.out.println("Current children: " + list.toString()); } }//ZkChildListener }
3.9 訂閱數據變化
/** * ZkClient library usage */ import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer; public class SubscribeData { public static void main(String args[]){ //使用了新的序列化器, zk命令行寫入的數據才能被檢測 ZkClient zc = new ZkClient("192.168.179.101:2181", 5000, 5000, new BytesPushThroughSerializer()); //寫入什么直接當做byte[]進行存儲 System.out.println("Connected to zk server!"); // subscribe data change event, multiple subscribe zc.subscribeDataChanges("/node_zkclient", new ZkDataListener()); // sleep until receive event notify try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } } private static class ZkDataListener implements IZkDataListener{ public void handleDataChange(String s, Object o) throws Exception { //覆寫方法1 System.out.println("Path Data Changed: " + s); System.out.println("Current Data: " + o.toString()); } public void handleDataDeleted(String s) throws Exception { //覆寫方法2 System.out.println("Data Deleted: " + s); } }//ZkDataListener }