正文
一,JavaAPI普通操作
上篇文章已經對zookeeper的使用有了簡單的介紹,api的使用也相對簡單,在使用前需要導入zookeeper的jar包,其他就如下代碼。
package zookeeperTest; import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; public class ZookeeperDemo { ZooKeeper zk=null; @Before public void init() throws Exception { // 構造一個連接zookeeper的客戶端對象 zk = new ZooKeeper("hd1:2181,hd2:2181,hd3:2181", 200, null); } @Test public void testCreate() throws Exception { // 參數1:要創建的節點路徑 參數2:數據 參數3:訪問權限 參數4:節點類型 String path = zk.create("/java", "hello word".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(path); zk.close(); } @Test public void testUpdate() throws Exception { // 參數1:節點路徑 參數2:數據 參數3:所要修改的版本,-1代表任何版本 Stat status = zk.setData("/java", "set data test".getBytes(), -1); zk.close(); } @Test public void testGet() throws Exception { // 參數1:節點路徑 參數2:是否要監聽 參數3:所要獲取的數據的版本,null表示最新版本 byte[] data = zk.getData("/java", false, null); System.out.println(new String(data, "UTF-8")); zk.close(); } @Test public void testGetChild() throws Exception { // 參數1:節點路徑 參數2:是否要監聽 // 注意:返回的結果中只有子節點名字,不帶全路徑 List<String> childs = zk.getChildren("/", false); for (String string : childs) { System.out.println(string); } zk.close(); } @Test public void testRemove() throws Exception { zk.delete("/java", -1); List<String> childs = zk.getChildren("/", false); for (String string : childs) { System.out.println(string); } zk.close(); } }
二,JavaAPI監控操作
上面一節是無監控的使用,下面是有監控的使用:
package zookeeperTest; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import org.junit.Before; import org.junit.Test; public class ZookeeperWatch { static ZooKeeper zk = null; WatchDemo wd = new WatchDemo(); // 創建watcher類實現Watch接口 public static class WatchDemo implements Watcher{ @Override public void process(WatchedEvent event) { // event返回的事件對象 if(event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeDataChanged) { System.out.println("有數據改動了"); try { zk.getData("/server", true, null); // 循環監聽 } catch (Exception e) { } }else if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged) { System.out.println("子節點變化了"); } } } @Before public void init() throws Exception{ // 構造一個連接zookeeper的客戶端對象 zk = new ZooKeeper("hd1:2181,hd2:2181,hd3:2181", 2000, wd); } // 節點數據監聽 @Test public void getUpdateWatch() throws Exception{ zk.getData("/server", true, null); Thread.sleep(Long.MAX_VALUE); } // 節點節點監聽 @Test public void getChildWatch() throws Exception{ zk.getChildren("/server", true); Thread.sleep(Long.MAX_VALUE); } }
三, 利用zookeeper實現服務器上下線動態感知
3.1 Consumer類(client)
package cn.edu360.zk.distributesystem; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; public class Consumer { // 定義一個list用於存放最新的在線服務器列表 private volatile ArrayList<String> onlineServers = new ArrayList<>(); // 構造zk連接對象 ZooKeeper zk = null; // 構造zk客戶端連接 public void connectZK() throws Exception { zk = new ZooKeeper("hd1:2181,hd2:2181,hd3:2181", 2000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == KeeperState.SyncConnected && event.getType() == EventType.NodeChildrenChanged) { try { // 事件回調邏輯中,再次查詢zk上的在線服務器節點即可,查詢邏輯中又再次注冊了子節點變化事件監聽 getOnlineServers(); } catch (Exception e) { e.printStackTrace(); } } } }); } // 查詢在線服務器列表 public void getOnlineServers() throws Exception { List<String> children = zk.getChildren("/servers", true); ArrayList<String> servers = new ArrayList<>(); for (String child : children) { byte[] data = zk.getData("/servers/" + child, false, null); String serverInfo = new String(data); servers.add(serverInfo); } onlineServers = servers; System.out.println("查詢了一次zk,當前在線的服務器有:" + servers); } public void sendRequest() throws Exception { Random random = new Random(); while (true) { try { // 挑選一台當前在線的服務器 int nextInt = random.nextInt(onlineServers.size()); String server = onlineServers.get(nextInt); String hostname = server.split(":")[0]; int port = Integer.parseInt(server.split(":")[1]); System.out.println("本次請求挑選的服務器為:" + server); Socket socket = new Socket(hostname, port); OutputStream out = socket.getOutputStream(); InputStream in = socket.getInputStream(); out.write("haha".getBytes()); out.flush(); byte[] buf = new byte[256]; int read = in.read(buf); System.out.println("服務器響應的時間為:" + new String(buf, 0, read)); out.close(); in.close(); socket.close(); Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Consumer consumer = new Consumer(); // 構造zk連接對象 consumer.connectZK(); // 查詢在線服務器列表 consumer.getOnlineServers(); // 處理業務(向一台服務器發送時間查詢請求) consumer.sendRequest(); } }
3.2 TimeQueryServer(zookeeper注冊服務器)
package cn.edu360.zk.distributesystem; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class TimeQueryServer { ZooKeeper zk = null; // 構造zk客戶端連接 public void connectZK() throws Exception{ zk = new ZooKeeper("hd1:2181,hd2:2181,hd3:2181", 2000, null); } // 注冊服務器信息 public void registerServerInfo(String hostname,String port) throws Exception{ /** * 先判斷注冊節點的父節點是否存在,如果不存在,則創建 */ Stat stat = zk.exists("/servers", false); if(stat==null){ zk.create("/servers", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 注冊服務器數據到zk的約定注冊節點下 String create = zk.create("/servers/server", (hostname+":"+port).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname+" 服務器向zk注冊信息成功,注冊的節點為:" + create); } public static void main(String[] args) throws Exception { TimeQueryServer timeQueryServer = new TimeQueryServer(); // 構造zk客戶端連接 timeQueryServer.connectZK(); // 注冊服務器信息 timeQueryServer.registerServerInfo(args[0], args[1]); // 啟動業務線程開始處理業務 new TimeQueryService(Integer.parseInt(args[1])).start(); } }
3.3 TimeQueryService(server提供服務)
package cn.edu360.zk.distributesystem; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.Date; public class TimeQueryService extends Thread{ int port = 0; public TimeQueryService(int port){ this.port = port; } @Override public void run() { try { ServerSocket ss = new ServerSocket(port); System.out.println("業務線程已綁定端口"+port+"准備接受消費端請求了....."); while(true){ Socket sc = ss.accept(); InputStream inputStream = sc.getInputStream(); OutputStream outputStream = sc.getOutputStream(); outputStream.write(new Date().toString().getBytes()); } } catch (IOException e) { e.printStackTrace(); } } }