【zookeeper】4、利用zookeeper,借助觀察模式,判斷服務器的上下線


 首先什么是觀察者模式,可以看看我之前的設計模式的文章

https://www.cnblogs.com/cutter-point/p/5249780.html

 

確定一下,要有觀察者,要有被觀察者,然后要被觀察者觸發事件,事件發生之后,觀察者觸發相應的事件發生

了解了基本概念,我們來看看zookeeper是什么情況

zookeeper也是類似觀察者一樣,我們先把本機信息注冊進入服務器,然后設置一個watch方法,這個在zookeeper節點發生變化的時候通知對應的客戶端,觸發對應的方法

這里先注冊服務,如何向zookeeper進行注冊呢

 

package cn.cutter.demo.hadoop.zookeeper;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * @ProjectName: cutter-point
 * @Package: cn.cutter.demo.hadoop.zookeeper
 * @ClassName: TimeQueryServer
 * @Author: xiaof
 * @Description: 利用zookeeper來進行分布式時間查詢
 * @Date: 2019/4/2 19:37
 * @Version: 1.0
 */
public class TimeQueryServer {

    private ZooKeeper zooKeeper;

    // 構造zk客戶端連接
    public void connectZK() throws Exception{
        zooKeeper = new ZooKeeper("192.168.1.4:2181,192.168.1.4:2182,192.168.1.4:2183", 2000, null);
    }


    // 注冊服務器信息
    public void registerServerInfo(String hostname,String port) throws Exception{

        /**
         * 先判斷注冊節點的父節點是否存在,如果不存在,則創建
         */
        Stat stat = zooKeeper.exists("/servers", false);
        if(stat==null){
            zooKeeper.create("/servers", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

        // 注冊服務器數據到zk的約定注冊節點下
        String create = zooKeeper.create("/servers/server", (hostname+":"+port).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println(hostname+" 服務器向zk注冊信息成功,注冊的節點為:" + create);

    }

}

 

如果注入了服務,那么我們為了監控這個服務的存在,那么是不是應該也模擬一個服務?

好,這里我們就做一個時鍾同步的服務,用消費線程不斷請求服務,並獲取當前時間

package cn.cutter.demo.hadoop.zookeeper;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Date;
import java.util.Iterator;

/**
 * @ProjectName: cutter-point
 * @Package: cn.cutter.demo.hadoop.zookeeper
 * @ClassName: TimeQueryService
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/4/2 19:43
 * @Version: 1.0
 */
public class TimeQueryService extends Thread {

    private static final Log log = LogFactory.getLog(TimeQueryService.class);

    int port = 0;

    public TimeQueryService(int port) {
        this.port = port;
    }

    @Override
    public void run() {

        //1.創建信道選擇器
        Selector selector = null;
        //不斷讀取字符,只有讀到換行我們才進行輸出
//        StringBuffer stringBuffer = new StringBuffer();
        try {
            selector = Selector.open();
            //2.創建對應端口的監聽
            //2.1 創建通道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //2.2 socket 對象綁定端口  socket() 獲取與此通道關聯的服務器套接字
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            //2.3 設置為非阻塞
            serverSocketChannel.configureBlocking(false);
            //注冊到對應的選擇器,讀取信息
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        //3.輪詢獲取信息
        while (true) {
            //獲取socket對象
            //獲取准備好的信道總數
            if (!selector.isOpen()) {
                System.out.println("is close over");
                break;
            }

            try {
                if (selector.select(3000) == 0) {
                    continue; //下一次循環
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

            //獲取信道
            Iterator<SelectionKey> keyIterable = selector.selectedKeys().iterator();
            while (keyIterable.hasNext()) {
                //6.遍歷鍵集,判斷鍵類型,執行相應的操作
                SelectionKey selectionKey = keyIterable.next();
                //判斷鍵類型,執行相應操作
                if (selectionKey.isAcceptable()) {
                    try {
                        //從key中獲取對應信道
                        //接受數據
                        SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
                        //並設置成非阻塞
                        socketChannel.configureBlocking(false);
                        //從新注冊,修改狀態
                        socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

                if (selectionKey.isReadable()) {
                    //讀取數據
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    //獲取當前的附加對象。
                    ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();

                    //判斷是是否斷開連接
                    int count = 0;
                    while (true) {
                        try {
                            if (!((count = socketChannel.read(byteBuffer)) != 0 && count != -1 && selectionKey.isValid())) {
                                if(count == -1) {
                                    //關閉通道
                                    socketChannel.close();
                                }
                                break;
                            }
                        } catch (IOException e) {
//                                e.printStackTrace();
                            try {
                                //如果讀取數據會拋出異常,那么就斷定通道已經被客戶端關閉
                                socketChannel.close();
                            } catch (IOException e1) {
                                e1.printStackTrace();
                            }
                            System.out.println("無法讀取數據!");
                            break;
                        }
                        //判斷是否有換行
//                            byteBuffer.flip();
                        byte msg[] = byteBuffer.array();
                        boolean isOver = false;
                        int i = byteBuffer.position() - count;
                        for (; i < byteBuffer.position(); ++i) {
                            //判斷是否有換行
                            if (byteBuffer.get(i) == '\r' || byteBuffer.get(i) == '\n') {
                                //輸出
                                //先壓縮數據
                                byteBuffer.flip();
                                byte out[] = new byte[byteBuffer.limit()];
                                byteBuffer.get(out, 0, out.length);
                                log.info(new String(out));
                                //設置成可以讀和可寫狀態
                                byteBuffer.compact();
                                byteBuffer.clear();
                                isOver = true;
                            }
                        }
                        if (isOver == true) {
//                            interestOps(SelectionKey.OP_READ);的意思其實就是用同一個KEY重新注冊
                            selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                            break;
                        }
                    }

                    if (count == -1) {
                        //如果是-1 ,那么就關閉客戶端
                        try {
                            socketChannel.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    } else {
//                            selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                    }
                }

                //告知此鍵是否有效。
                if (selectionKey.isValid() && selectionKey.isWritable()) {
                    //獲取當前的附加對象。
//                        ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
                    // 清空,並寫入數據
//                        byteBuffer.clear();
                    byte smsBytes[] = (new Date().toString() + "\n").getBytes();
                    ByteBuffer byteBuffer = ByteBuffer.wrap(smsBytes);
//                        byteBuffer.put(smsBytes);
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    //寫入數據
//                        System.out.println(new String(byteBuffer.array()));
                    while (byteBuffer.hasRemaining()) {
                        //輸出數據
                        try {
                            socketChannel.write(byteBuffer);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    //判斷是否判斷是否有待處理數據
                    if (!byteBuffer.hasRemaining()) {
                        //數據清理干凈
                        selectionKey.interestOps(SelectionKey.OP_READ);
                    }
                    //壓縮此緩沖區將緩沖區的當前位置和界限之間的字節(如果有)復制到緩沖區的開始處。
                    // 即將索引 p = position() 處的字節復制到索引 0 處,將索引 p + 1 處的字節復制到索引 1 處,依此類推,直到將索引 limit() - 1 處的字節復制到索引
                    // n = limit() - 1 - p 處。然后將緩沖區的位置設置為 n+1,並將其界限設置為其容量。如果已定義了標記,則丟棄它。
                    //將緩沖區的位置設置為復制的字節數,而不是零,以便調用此方法后可以緊接着調用另一個相對 put 方法。
                    //從緩沖區寫入數據之后調用此方法,以防寫入不完整。例如,以下循環語句通過 buf 緩沖區將字節從一個信道復制到另一個信道:
                    byteBuffer.compact();
                }

                //執行操作的時候,移除避免下一次循環干擾
//                    原因是Selector不會自己從已選擇鍵集中移除SelectionKey實例。必須在處理完通道時自己移除。下次該通道變成就緒時,Selector會再次將其放入已選擇鍵集中
                keyIterable.remove();

            }

        }

    }
}

 

說實話,這個服務端當時花了好大的力氣寫完的,mmp,因為客戶端進行可以不關閉通道直接kill,導致服務端並不知道對端已經離線,到時候服務端不斷再進行空輪訓,一旦進行read就拋出io異常!!

好了,服務和注冊寫完了,那么我們注冊一把唄

 

@Test
    public void test1() throws Exception {
        TimeQueryServer timeQueryServer = new TimeQueryServer();

        // 構造zk客戶端連接
        timeQueryServer.connectZK();

        // 注冊服務器信息
        timeQueryServer.registerServerInfo("192.168.1.7", "8888");

        // 啟動業務線程開始處理業務
        new TimeQueryService(Integer.parseInt("8888")).start();

        while(true) {
            Thread.sleep(200000);
//            System.out.println("..");
        }
    }

 

 不要在意那個null異常,那是因為我爸watch設置為null的原因

 

服務端寫完了,我們再考慮寫一波客戶端

package cn.cutter.demo.hadoop.zookeeper;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * @ProjectName: cutter-point
 * @Package: cn.cutter.demo.hadoop.zookeeper
 * @ClassName: Consumer
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/4/3 14:11
 * @Version: 1.0
 */
public class Consumer {

    // 定義一個list用於存放最新的在線服務器列表
    private volatile ArrayList<String> onlineServers = new ArrayList<>();

    // 構造zk連接對象
    ZooKeeper zk = null;

    // 構造zk客戶端連接
    public void connectZK() throws Exception {

        zk = new ZooKeeper("192.168.1.4:2181,192.168.1.4:2182,192.168.1.4:2183", 2000, new Watcher() {

            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected && event.getType() == Event.EventType.NodeChildrenChanged) {

                    try {
                        // 事件回調邏輯中,再次查詢zk上的在線服務器節點即可,查詢邏輯中又再次注冊了子節點變化事件監聽
                        getOnlineServers();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }

            }
        });

    }

    // 查詢在線服務器列表
    public void getOnlineServers() throws Exception {

        List<String> children = zk.getChildren("/servers", true);
        ArrayList<String> servers = new ArrayList<>();

        for (String child : children) {
            byte[] data = zk.getData("/servers/" + child, false, null);

            String serverInfo = new String(data);

            servers.add(serverInfo);
        }

        onlineServers = servers;
        System.out.println("查詢了一次zk,當前在線的服務器有:" + servers);

    }

    public void sendRequest() throws Exception {
        Random random = new Random();
        while (true) {
            try {
                // 挑選一台當前在線的服務器
                int nextInt = random.nextInt(onlineServers.size());
                String server = onlineServers.get(nextInt);
                String hostname = server.split(":")[0];
                int port = Integer.parseInt(server.split(":")[1]);

                System.out.println("本次請求挑選的服務器為:" + server);

                //2.打開socket信道,設置成非阻塞模式
                SocketChannel socketChannel = SocketChannel.open();
                socketChannel.configureBlocking(false);

                //3.嘗試建立連接,然后輪詢,判定,連接是否完全建立
                int times = 0;
                if(!socketChannel.connect(new InetSocketAddress(hostname, port))) {
                    while(!socketChannel.finishConnect()) {
//                        System.out.println(times++ + ". ");
                    }
                }

                //4.創建相應的buffer緩沖
                ByteBuffer writeBuffer = ByteBuffer.wrap("test\n".getBytes());
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int totalBytesRcvd = 0;
                int bytesRcvd;

                //5.向socket信道發送數據,然后嘗試讀取數據
                socketChannel.write(writeBuffer);
                //讀取數據
                if((bytesRcvd = socketChannel.read(readBuffer)) == -1) {
                    //這種非阻塞模式,如果讀取不到數據是會返回0的,如果是-1該通道已到達流的末尾
                    throw new SocketException("連接關閉??");
                }

                //不停嘗試獲取數據,這是因為服務端數據反饋太慢了???
                while (bytesRcvd == 0) {
                    bytesRcvd = socketChannel.read(readBuffer);
                }

                //6.輸出
                readBuffer.flip();
                byte reads[] = new byte[readBuffer.limit()];
                readBuffer.get(reads, 0, reads.length);
                System.out.println("收到信息:" + new String(reads));

                //7.關閉信道
                socketChannel.close();

                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
            }

        }

    }

    public static void main(String[] args) throws Exception {

        Consumer consumer = new Consumer();
        // 構造zk連接對象
        consumer.connectZK();

        // 查詢在線服務器列表
        consumer.getOnlineServers();

        // 處理業務(向一台服務器發送時間查詢請求)
        consumer.sendRequest();

    }



}

啟動客戶端:

 

 

 

 為了體現zookeeper監控服務是否在線的操作,我們多起幾個服務端,然后監控客戶端的信息展示

 

 我們再起一個

 

 

 

 

 接下來我們kill掉8888端口的進程

 

 我們當前在線可以看到只有2個節點了,我們上zk看看,確實只有2個了

 

 

到這里zookeeper的上下線的判斷已經完成,我最近再自學大數據的東西,想向大數據進軍一波,歡迎大家一起探討大數據的學習。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM