1. 基本使用
org.apache.zookeeper.Zookeeper是客戶端入口主類,負責建立與server的會話。它提供了表1所示幾類主要方法:
功能 | 描述 |
create | 在本地目錄樹中創建一個節點 |
delete | 刪除一個節點 |
exists | 測試本地是否存在目標節點 |
get/set data | 從目標節點上讀取/寫數據 |
get/set ACL | 獲取/設置目標節點訪問控制列表信息 |
get children | 檢索一個子節點上的列表 |
sync | 等待要被傳送的數據 |
表1:ZooKeeper API描述
2.pom.xml配置依賴包
在本篇文章中共用一個pom.xml
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>jline</groupId> <artifactId>jline</artifactId> <version>0.9.94</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty</artifactId> <version>3.9.4.Final</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.1</version> </dependency> </dependencies>
3. 增刪改查demo
zookeeper上的znode節點增刪改查demo
package cn.itcast.zk; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.util.List; /** * @author y15079 * @create 2018-03-16 16:53 * @desc 簡單的增刪改查demo **/ public class SimpleZkClient { private static final String connectString = "mimi1:2181,mimi2:2181,mini3:2181"; private static final int sessionTimeout = 2000; ZooKeeper zkClient = null; @Before public void init() throws Exception { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent watchedEvent) { //收到事件通知后的回調函數(應該是我們自己的事件處理邏輯) System.out.println(watchedEvent.getType() + "---" + watchedEvent.getPath()); try { zkClient.getChildren("/", true); } catch (Exception e) { } } }); } /** * 數據的增刪改查 */ //創建數據節點到zk中 @Test public void testCreate(String[] args) throws Exception{ //參數1,要創建的節點的路徑 參數2:節點數據 參數3:節點的權限 參數4:節點的類型 String node = zkClient.create("/idea", "hellozk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //上傳的數據可以是任何類型,但都要轉成byte[] } //判斷znode是否存在 public void testExist() throws Exception{ //節點元數據 Stat stat = zkClient.exists("/idea", false); System.out.println(stat == null ? "not exist": "exist"); } //獲取子節點 @Test public void getChildren() throws Exception{ List<String> children = zkClient.getChildren("/", true); for (String child: children){ System.out.println(child); } Thread.sleep(Long.MAX_VALUE);//真正運行時可以注釋 } //獲取znode的數據 @Test public void getData() throws Exception{ byte[] data = zkClient.getData("/idea",false, null); System.out.println(new String(data)); } //刪除znode的數據 @Test public void deleteZnode() throws Exception{ //參數2:指定要刪除的版本,-1表示刪除所有版本 zkClient.delete("/idea", -1); } //設置znode @Test public void setData() throws Exception{ zkClient.setData("/app1","hello".getBytes(), -1); byte[] data = zkClient.getData("/app1", false, null); System.out.println(new String(data)); } }
3. Zookeeper 應用案例(分布式共享鎖 ||分布式應用系統服務器上下線動態感知)
3.1 分布式共享鎖的簡單實現
package cn.itcast.zkclock; import org.apache.zookeeper.*; import java.util.Collections; import java.util.List; import java.util.Random; /** * @author y15079 * @create 2018-03-17 18:40 * @desc 分布式共享鎖 單線程的 **/ public class DistributedClientLock { //會話超時 private static final int SESSION_TIMEOUT = 5000; //zookeeper集群地址 private String hosts = "hadoop1:2181,hadoop2:2181,hadoop3:2181"; private String groupNode = "locks"; private String subNode = "sub"; private boolean haveLock = false; private ZooKeeper zk; //記錄client創建的子節點路徑 private volatile String thisPath; /** * 連接zk * @throws Exception */ public void connectZookeeper() throws Exception{ zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() { public void process(WatchedEvent watchedEvent) { try { //判斷事件類型,此處只處理子節點變化事件 if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged && watchedEvent.getPath().equals("/" + groupNode)){ //獲取子節點,並對父節點進行監聽, 開始監聽,監聽鎖是否有變化 List<String> childrenNodes = zk.getChildren("/" + groupNode, true); String thisNode = thisPath.substring(("/" + groupNode + "/").length()); //獲取父目錄下的子節點路徑,不帶父目錄 //去比較是否自己是最小id, 約定id最小最先拿到鎖 Collections.sort(childrenNodes); if (childrenNodes.indexOf(thisNode) == 0){ //indexOf是索引的意思,查看子節點路徑在所有子節點中的索引是否為0,為0則證明子節點id最小,最先獲取鎖 //訪問共享資源處理業務,並且在處理完成之后刪除鎖 doSomething(); //刪除鎖,監聽到,暫時不會調用watcher里面的process方法 //重新注冊一把新的鎖,不懂為什么注冊新的鎖 thisPath = zk.create("/" + groupNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //在上面創建好鎖后才調用watcher里面的process方法 } } } catch (Exception e) { } } }); //程序一進來就先注冊一把鎖到zk上,即創建子節點,還沒有開始監聽,zk watcher里面的process方法不會執行 thisPath = zk.create("/" + groupNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //wait一小會,便於觀察 Thread.sleep(new Random().nextInt(1000)); //從zk的鎖目錄下,獲取所有子節點,並且監聽子節點的變化, 開始監聽,監聽鎖是否有變化 List<String> childrenNodes = zk.getChildren("/" + groupNode, true); //列表中只有一個子節點,那肯定就是thisPath,說明client獲得鎖 if (childrenNodes.size() == 1){ doSomething(); //刪除鎖,監聽到,暫時不會調用watcher里面的process方法 //為什么還創建鎖?為了下一次client獲取鎖做好准備 thisPath = zk.create("/" + groupNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //在上面創建好鎖后才調用watcher里面的process方法 } } /** * 共享資源的訪問邏輯寫在這個方法中 * 處理業務邏輯,並釋放 * @throws Exception */ private void doSomething() throws Exception{ try { System.out.println("gain lock: " + thisPath); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("finished: " + thisPath ); //將thisPath刪除,監聽thisPath的client將獲得通知 //相當於釋放鎖 zk.delete(this.thisPath, -1); } } public static void main(String[] args) throws Exception{ DistributedClientLock dcl = new DistributedClientLock(); dcl.connectZookeeper(); Thread.sleep(Long.MAX_VALUE); } }
3.2 分布式應用系統服務器上下線動態感知
客戶端實現:
package cn.itcast.zkdist; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.util.ArrayList; import java.util.List; /** * @author y15079 * @create 2018-03-17 15:20 * @desc 分布式HA應用 客戶端 **/ public class DistributedClient { private static final String connectString = "hadoop1:2181,hadoop2:2181,hadoop3:2181"; private static final int sessionTimeout = 2000; private static final String parentNode = "/servers"; //注意:加volatile的意義何在?保持一致性 private volatile List<String> serverList; private ZooKeeper zk = null; /** * 創建到zk的客戶端連接 * @throws Exception */ public void getConnect() throws Exception{ zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent watchedEvent) { //收到事件通知后的回調函數(應該是我們自己的事件處理邏輯) try { //重新更新服務器列表,並且注冊了監聽 getServerList(); } catch (Exception e) { } } }); } /** * 獲取服務器信息列表 * @throws Exception */ public void getServerList() throws Exception{ //獲取服務器子節點信息,並且對父節點進行監聽 List<String> children = zk.getChildren(parentNode, true); //先創建一個局部的list來存服務器信息 List<String> servers = new ArrayList<String>(); for (String child: children){ //child只是子節點的節點名 byte[] data = zk.getData(parentNode + "/" + child, false, null); servers.add(new String(data)); } //把servers賦值給成員變量serverList,已提供給各業務線程使用 serverList = servers; //打印服務器列表 System.out.println(serverList); } /** * 業務功能 * @throws Exception */ public void handleBusiness() throws Exception{ System.out.println("client start working......"); Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { //獲取zk連接 DistributedClient client = new DistributedClient(); client.getConnect(); //獲取servers的子節點信息(並監聽),從中獲取服務器信息列表 client.getServerList(); //啟動業務功能 client.handleBusiness(); } }
服務端實現:
package cn.itcast.zkdist; import org.apache.zookeeper.*; /** * @author y15079 * @create 2018-03-17 1:12 * @desc 分布式HA應用 服務端 **/ public class DistributedServer { private static final String connectString = "hadoop1:2181,hadoop2:2181,hadoop3:2181"; private static final int sessionTimeout = 2000; private static final String parentNode = "/servers"; private ZooKeeper zk = null; /** * 創建到zk的客戶端連接 * @throws Exception */ public void getConnect() throws Exception{ zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent event) { //收到事件通知后的回調函數(應該是我們自己的事件處理邏輯) System.out.println(event.getType() + "---" + event.getPath()); try { zk.getChildren("/",true); } catch (Exception e) { } } }); } /** * 向zk集群注冊服務器信息 * @param hostname * @throws Exception */ public void registerServer(String hostname) throws Exception{ String create = zk.create(parentNode+"server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname + " is online..." + create); } /** * 業務功能 * @throws Exception */ public void handleBusiness(String hostname) throws Exception{ System.out.println(hostname + " start working......"); Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { //獲取zk連接 DistributedServer server = new DistributedServer(); server.getConnect(); //利用zk連接注冊服務器信息 server.registerServer(args[0]); //啟動業務功能 server.handleBusiness(args[0]); } }
github地址:
https://github.com/SwordfallYeung/Zookeeper-Java-API-Demo