基於Zookeeper實現客戶端動態監聽服務器上下線


一、在具體實現之前,先來了解一下Zookeeper的監聽器的原理:

   

  圖中Main()線程作為客戶端,當在主線程中創建Zookeeper客戶端時,會默認創建兩個子線程:Listenerconnectconnect線程負責將某一操作對應的的監聽事件發送給Zookeeper服務集群。Zookeeper收到監聽事件后會在該操作對應的監聽器列表中注冊該事件。

  比如圖中的獲取節點/”的子節點getChildren這一事件,並設置了true,表示監聽此事件,那么Zookeeper就會在監聽器列表中注冊該事件。一旦“/”節點的子節點發生變化,getChildren的結果就隨之發生變化,Zookeeper就會通知客戶端的Listener線程,Listener就會去調用process方法對“/”的變化做出應對處理。“/”的變化可能是客戶端不能控制的,但是為了適應這種變化,客戶端在收到服務器的通知后可根據自身情況做出應對。

二、這樣說可能比較抽象,我們用一個案例來說明:

public class ZkDemo {
    private String connect = "hadoop101:2181";
    private int timeout = 2000;
    private ZooKeeper zooKeeper = null;

    //1、獲取Zookeeper客戶端,用於連接Zookeeper集群,其功能類似於Linux中啟動./zkCli.sh
    @Before
    public void getClient() throws Exception{
        zooKeeper = new ZooKeeper(connect, timeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println(event.getPath() + "已被修改,是否確定?");
                try {
                    zooKeeper.getChildren("/lsj",true);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    /**
     * 查看子節點
     */
    @Test
    public void testList() throws Exception {
        List<String> children = zooKeeper.getChildren("/lsj", true);
        for (String str: children) {
            System.out.println(str);
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}

附:Thread.sleep()的設置是為了讓testList方法從某種意義上更像是客戶端那樣持續保持連接,所以必須讓testList方法處在執行的過程中。

  默認的監聽次數是一次(List<String> children = zooKeeper.getChildren("/lsj", true);執行后,Zookeeper會在/lsj節點的監聽列表中注冊一個監聽事件,如果該節點發生變化,就通知給申請監聽的客戶端的listener,並將該監聽事件從節點的監聽列表中刪除),服務端並不會持續為某客戶端監聽某一節點的變化。如果被監聽的節點發生變化,會調用監聽器的process方法,可以在process方法中再次調用getChildre方法並申請對目標節點的監聽,通過這個小技巧使得監聽的次數變得無限多了。只需記住:這段代碼啟動后對Zookeeper來說也是一個客戶端,設為客戶端1。zooKeeper.getChildren("/lsj", true)設置了監聽此事件,當/lsj節點下的子節點發生變化時getChildren方法結果會發生變化,會觸發監聽器Watcher(就是那個內部類)執行process方法,在process方法中執行了應對變化的處理后再次調用getChildren方法,這才使得監聽好像變的無限多了。

  下圖所示:在Linux中啟動的客戶端(設為客戶端2每一次對/lsj節點增加節點都會視為getChildren結果的變化,故而Zookeeper會通知客戶端1這種變化,進而觸發監聽器執行process方法。

   

 

 

   

 

 

 

三、下面進入Zookeeper的正題:如何利用Zookeeper監聽服務器集群的變化,並在服務器集群變化時通知客戶端呢?

  原理和上面一樣一樣的,只需將客戶端1想象成下文的客戶端群體,把客戶端2想想成服務器群體。如果沒有Zookeeper這個角色,讓客戶端直接和服務器接觸,當客戶端請求的一台服務器正好宕機時,客戶端將無法獲取資源,但又不知道這是服務器宕機所造成的問題,也無法改變請求到另一台正常運行的服務器,那么這個問題如何解決呢?

  思路是這樣的,每一台服務器上線時都會在Zookeeper上的/ServerCluster的節點下創建一個標識本機的子節點(臨時的 -e )。當,某一台服務器宕機時,那么其在/ServerCluster下創建的節點就會隨之消失。我們讓客戶端獲取服務器信息時,監聽/ServerCluster的子節點變化,那么當某一台服務器宕機時(臨時節點隨之消失)Zookeeper會通知客戶端/ServerCluster的變化,客戶端也就知道了具體的哪一台服務器當機,哪一台服務器正常運行可以訪問了。

  在這個過程中,最重要的一點:服務器和客戶端對於Zookeeper集群來說,都是客戶端。

  

 

 

具體實現:

public class Server {

    private static String connect = "hadoop101:2181";
    private static int timeout = 2000;
    private static ZooKeeper zooKeeper = null;
    private static String parentPath = "/ServerCluster";

    public static void main(String[] args) throws Exception {

        //1、獲取一個Zookeeper的客戶端對象,用於服務器向Zookeeper集群注冊自己。
        getClient();

        //2、把本服務器的主機名注冊到Zookeeper中的特定節點中
        registServer(args[0]);

        //3、服務器本身的業務邏輯
        getBusiness(args[0]);
    }

    private static void getBusiness(String hostname) throws InterruptedException {
        System.out.println(hostname + " is working...");
        Thread.sleep(Long.MAX_VALUE);
    }

    private static void registServer(String hostname) throws KeeperException, InterruptedException {

        //創建臨時節點
        String path = zooKeeper.create(parentPath + "/server",
                hostname.getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname + " is online...");
    }

    private static void getClient() throws Exception {
        zooKeeper = new ZooKeeper(connect, timeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println(event.getType() + "---" + event.getPath());
            }
        });
    }
}
public class Client {
    private static String connect = "hadoop101:2181";
    private static int timeout = 2000;
    private static ZooKeeper zooKeeper = null;
    private static String parentPath = "/ServerCluster";
public static void main(String[] args) throws Exception { //1、獲取一個Zookeeper的客戶端對象,用於服務器向Zookeeper集群注冊自己。 getClient(); //2、獲取服務器列表(主機名),並監聽 getServers(); //3、客戶端的業務邏輯 getBusiness(); } private static void getBusiness() throws InterruptedException { System.out.println("Client is working..."); Thread.sleep(Long.MAX_VALUE); } private static void getServers() throws KeeperException, InterruptedException { //向Zookeeper給getChildren方法注冊監聽,一旦parentPath節點發生變化,就會通知監聽器觸發process方法 List<String> children = zooKeeper.getChildren(parentPath, true); //此集合用於保存服務器主機名 ArrayList<String> hosts = new ArrayList(); for (String child: children) { byte[] data = zooKeeper.getData(parentPath + "/" + child, false, null); hosts.add(new String(data)); } System.out.println(hosts); } private static void getClient() throws Exception { zooKeeper = new ZooKeeper(connect, timeout, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getType() + "---" + event.getPath()); //執行到本方法就說明parentPath已經被修改了,即服務器列表發生變化,需要重新獲取。 try { getServers(); } catch (Exception e) { e.printStackTrace(); } } }); } }

  

通過配置參數開啟三個服務器hadoop101,開啟一個Client

   

關閉hadoop101

  

 

Client觀察到這種變化,打印新的服務器列表:

   

 

   

當然,如果在Linux中加入一個“服務器”,Client也可以監聽到:

   


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM