首先什么是觀察者模式,可以看看我之前的設計模式的文章
https://www.cnblogs.com/cutter-point/p/5249780.html
確定一下,要有觀察者,要有被觀察者,然后要被觀察者觸發事件,事件發生之后,觀察者觸發相應的事件發生
了解了基本概念,我們來看看zookeeper是什么情況
zookeeper也是類似觀察者一樣,我們先把本機信息注冊進入服務器,然后設置一個watch方法,這個在zookeeper節點發生變化的時候通知對應的客戶端,觸發對應的方法
這里先注冊服務,如何向zookeeper進行注冊呢
package cn.cutter.demo.hadoop.zookeeper; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * @ProjectName: cutter-point * @Package: cn.cutter.demo.hadoop.zookeeper * @ClassName: TimeQueryServer * @Author: xiaof * @Description: 利用zookeeper來進行分布式時間查詢 * @Date: 2019/4/2 19:37 * @Version: 1.0 */ public class TimeQueryServer { private ZooKeeper zooKeeper; // 構造zk客戶端連接 public void connectZK() throws Exception{ zooKeeper = new ZooKeeper("192.168.1.4:2181,192.168.1.4:2182,192.168.1.4:2183", 2000, null); } // 注冊服務器信息 public void registerServerInfo(String hostname,String port) throws Exception{ /** * 先判斷注冊節點的父節點是否存在,如果不存在,則創建 */ Stat stat = zooKeeper.exists("/servers", false); if(stat==null){ zooKeeper.create("/servers", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 注冊服務器數據到zk的約定注冊節點下 String create = zooKeeper.create("/servers/server", (hostname+":"+port).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname+" 服務器向zk注冊信息成功,注冊的節點為:" + create); } }
如果注入了服務,那么我們為了監控這個服務的存在,那么是不是應該也模擬一個服務?
好,這里我們就做一個時鍾同步的服務,用消費線程不斷請求服務,並獲取當前時間
package cn.cutter.demo.hadoop.zookeeper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Date; import java.util.Iterator; /** * @ProjectName: cutter-point * @Package: cn.cutter.demo.hadoop.zookeeper * @ClassName: TimeQueryService * @Author: xiaof * @Description: ${description} * @Date: 2019/4/2 19:43 * @Version: 1.0 */ public class TimeQueryService extends Thread { private static final Log log = LogFactory.getLog(TimeQueryService.class); int port = 0; public TimeQueryService(int port) { this.port = port; } @Override public void run() { //1.創建信道選擇器 Selector selector = null; //不斷讀取字符,只有讀到換行我們才進行輸出 // StringBuffer stringBuffer = new StringBuffer(); try { selector = Selector.open(); //2.創建對應端口的監聽 //2.1 創建通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //2.2 socket 對象綁定端口 socket() 獲取與此通道關聯的服務器套接字 serverSocketChannel.socket().bind(new InetSocketAddress(port)); //2.3 設置為非阻塞 serverSocketChannel.configureBlocking(false); //注冊到對應的選擇器,讀取信息 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } //3.輪詢獲取信息 while (true) { //獲取socket對象 //獲取准備好的信道總數 if (!selector.isOpen()) { System.out.println("is close over"); break; } try { if (selector.select(3000) == 0) { continue; //下一次循環 } } catch (IOException e) { e.printStackTrace(); } //獲取信道 Iterator<SelectionKey> keyIterable = selector.selectedKeys().iterator(); while (keyIterable.hasNext()) { //6.遍歷鍵集,判斷鍵類型,執行相應的操作 SelectionKey selectionKey = keyIterable.next(); //判斷鍵類型,執行相應操作 if (selectionKey.isAcceptable()) { try { //從key中獲取對應信道 //接受數據 SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept(); //並設置成非阻塞 socketChannel.configureBlocking(false); //從新注冊,修改狀態 socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(1024)); } catch (IOException e) { e.printStackTrace(); } } if (selectionKey.isReadable()) { //讀取數據 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //獲取當前的附加對象。 ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment(); //判斷是是否斷開連接 int count = 0; while (true) { try { if (!((count = socketChannel.read(byteBuffer)) != 0 && count != -1 && selectionKey.isValid())) { if(count == -1) { //關閉通道 socketChannel.close(); } break; } } catch (IOException e) { // e.printStackTrace(); try { //如果讀取數據會拋出異常,那么就斷定通道已經被客戶端關閉 socketChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } System.out.println("無法讀取數據!"); break; } //判斷是否有換行 // byteBuffer.flip(); byte msg[] = byteBuffer.array(); boolean isOver = false; int i = byteBuffer.position() - count; for (; i < byteBuffer.position(); ++i) { //判斷是否有換行 if (byteBuffer.get(i) == '\r' || byteBuffer.get(i) == '\n') { //輸出 //先壓縮數據 byteBuffer.flip(); byte out[] = new byte[byteBuffer.limit()]; byteBuffer.get(out, 0, out.length); log.info(new String(out)); //設置成可以讀和可寫狀態 byteBuffer.compact(); byteBuffer.clear(); isOver = true; } } if (isOver == true) { // interestOps(SelectionKey.OP_READ);的意思其實就是用同一個KEY重新注冊 selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); break; } } if (count == -1) { //如果是-1 ,那么就關閉客戶端 try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } else { // selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } //告知此鍵是否有效。 if (selectionKey.isValid() && selectionKey.isWritable()) { //獲取當前的附加對象。 // ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment(); // 清空,並寫入數據 // byteBuffer.clear(); byte smsBytes[] = (new Date().toString() + "\n").getBytes(); ByteBuffer byteBuffer = ByteBuffer.wrap(smsBytes); // byteBuffer.put(smsBytes); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //寫入數據 // System.out.println(new String(byteBuffer.array())); while (byteBuffer.hasRemaining()) { //輸出數據 try { socketChannel.write(byteBuffer); } catch (IOException e) { e.printStackTrace(); } } //判斷是否判斷是否有待處理數據 if (!byteBuffer.hasRemaining()) { //數據清理干凈 selectionKey.interestOps(SelectionKey.OP_READ); } //壓縮此緩沖區將緩沖區的當前位置和界限之間的字節(如果有)復制到緩沖區的開始處。 // 即將索引 p = position() 處的字節復制到索引 0 處,將索引 p + 1 處的字節復制到索引 1 處,依此類推,直到將索引 limit() - 1 處的字節復制到索引 // n = limit() - 1 - p 處。然后將緩沖區的位置設置為 n+1,並將其界限設置為其容量。如果已定義了標記,則丟棄它。 //將緩沖區的位置設置為復制的字節數,而不是零,以便調用此方法后可以緊接着調用另一個相對 put 方法。 //從緩沖區寫入數據之后調用此方法,以防寫入不完整。例如,以下循環語句通過 buf 緩沖區將字節從一個信道復制到另一個信道: byteBuffer.compact(); } //執行操作的時候,移除避免下一次循環干擾 // 原因是Selector不會自己從已選擇鍵集中移除SelectionKey實例。必須在處理完通道時自己移除。下次該通道變成就緒時,Selector會再次將其放入已選擇鍵集中 keyIterable.remove(); } } } }
說實話,這個服務端當時花了好大的力氣寫完的,mmp,因為客戶端進行可以不關閉通道直接kill,導致服務端並不知道對端已經離線,到時候服務端不斷再進行空輪訓,一旦進行read就拋出io異常!!
好了,服務和注冊寫完了,那么我們注冊一把唄
@Test public void test1() throws Exception { TimeQueryServer timeQueryServer = new TimeQueryServer(); // 構造zk客戶端連接 timeQueryServer.connectZK(); // 注冊服務器信息 timeQueryServer.registerServerInfo("192.168.1.7", "8888"); // 啟動業務線程開始處理業務 new TimeQueryService(Integer.parseInt("8888")).start(); while(true) { Thread.sleep(200000); // System.out.println(".."); } }
不要在意那個null異常,那是因為我爸watch設置為null的原因
服務端寫完了,我們再考慮寫一波客戶端
package cn.cutter.demo.hadoop.zookeeper; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.net.InetSocketAddress; import java.net.SocketException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; import java.util.Random; /** * @ProjectName: cutter-point * @Package: cn.cutter.demo.hadoop.zookeeper * @ClassName: Consumer * @Author: xiaof * @Description: ${description} * @Date: 2019/4/3 14:11 * @Version: 1.0 */ public class Consumer { // 定義一個list用於存放最新的在線服務器列表 private volatile ArrayList<String> onlineServers = new ArrayList<>(); // 構造zk連接對象 ZooKeeper zk = null; // 構造zk客戶端連接 public void connectZK() throws Exception { zk = new ZooKeeper("192.168.1.4:2181,192.168.1.4:2182,192.168.1.4:2183", 2000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected && event.getType() == Event.EventType.NodeChildrenChanged) { try { // 事件回調邏輯中,再次查詢zk上的在線服務器節點即可,查詢邏輯中又再次注冊了子節點變化事件監聽 getOnlineServers(); } catch (Exception e) { e.printStackTrace(); } } } }); } // 查詢在線服務器列表 public void getOnlineServers() throws Exception { List<String> children = zk.getChildren("/servers", true); ArrayList<String> servers = new ArrayList<>(); for (String child : children) { byte[] data = zk.getData("/servers/" + child, false, null); String serverInfo = new String(data); servers.add(serverInfo); } onlineServers = servers; System.out.println("查詢了一次zk,當前在線的服務器有:" + servers); } public void sendRequest() throws Exception { Random random = new Random(); while (true) { try { // 挑選一台當前在線的服務器 int nextInt = random.nextInt(onlineServers.size()); String server = onlineServers.get(nextInt); String hostname = server.split(":")[0]; int port = Integer.parseInt(server.split(":")[1]); System.out.println("本次請求挑選的服務器為:" + server); //2.打開socket信道,設置成非阻塞模式 SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); //3.嘗試建立連接,然后輪詢,判定,連接是否完全建立 int times = 0; if(!socketChannel.connect(new InetSocketAddress(hostname, port))) { while(!socketChannel.finishConnect()) { // System.out.println(times++ + ". "); } } //4.創建相應的buffer緩沖 ByteBuffer writeBuffer = ByteBuffer.wrap("test\n".getBytes()); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int totalBytesRcvd = 0; int bytesRcvd; //5.向socket信道發送數據,然后嘗試讀取數據 socketChannel.write(writeBuffer); //讀取數據 if((bytesRcvd = socketChannel.read(readBuffer)) == -1) { //這種非阻塞模式,如果讀取不到數據是會返回0的,如果是-1該通道已到達流的末尾 throw new SocketException("連接關閉??"); } //不停嘗試獲取數據,這是因為服務端數據反饋太慢了??? while (bytesRcvd == 0) { bytesRcvd = socketChannel.read(readBuffer); } //6.輸出 readBuffer.flip(); byte reads[] = new byte[readBuffer.limit()]; readBuffer.get(reads, 0, reads.length); System.out.println("收到信息:" + new String(reads)); //7.關閉信道 socketChannel.close(); Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Consumer consumer = new Consumer(); // 構造zk連接對象 consumer.connectZK(); // 查詢在線服務器列表 consumer.getOnlineServers(); // 處理業務(向一台服務器發送時間查詢請求) consumer.sendRequest(); } }
啟動客戶端:
為了體現zookeeper監控服務是否在線的操作,我們多起幾個服務端,然后監控客戶端的信息展示
我們再起一個
接下來我們kill掉8888端口的進程
我們當前在線可以看到只有2個節點了,我們上zk看看,確實只有2個了
到這里zookeeper的上下線的判斷已經完成,我最近再自學大數據的東西,想向大數據進軍一波,歡迎大家一起探討大數據的學習。