zookeeper 實現注冊中心 (關閉心跳日志)


 

分布式注冊配置中心:

zookeeper由於擁有watcher機制,使得其擁有發布訂閱的功能,而發布與訂閱模型,即所謂的配置中心,

顧名思義就是發布者將數據發布到 ZK節點上,供訂閱者動態獲取數據,實現配置信息的集中式管理和動態更新。

應用在啟動的時候會主動來獲取一次配置,同時,在節點上注冊一個 Watcher,這樣一來,以后每次配置有更新的時候,

都會實時通知到訂閱的客戶端,從來達到獲取最新配置信息的目的。

 

<!-- 添加springboot對 zookeeper 的支持 -->

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.12</version>
</dependency>

 

由於客戶端需要與zookeeper建立連接,獲取數據,添加監控等等一系列的事情,

所以這里封裝一個Utils工具類。

然后對於zookeeper連接客戶端的地址的后面可以緊跟一個path,作為在根目錄下的工作目錄。

該目錄就是作為所有操作的根目錄,這里使用 /zkRegistry

監聽的是該節點下的  conf  節點,

 

 

 

由於zookeeper采用的是異步調用,所以這里需要使用一把鎖鎖住主線程,在連接成功后自動解鎖,

主線程再往下進行。這里使用CountDownLatch實現鎖,在主線程創建,傳遞到DafaultWatcher的回掉函數中。

 

package com..zookeeper.zkRegistry;

import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;

public class ZkClientUtil {

    private static ZooKeeper zooKeeper;

    private static DefaultWatcher defaultWatcher = new DefaultWatcher();

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static String address = "localhost:2181,localhost:2182,localhost:2183,localhost:2184/zkRegistry";


    public static ZooKeeper getZkClient() throws Exception {

        zooKeeper = new ZooKeeper(address,1000,defaultWatcher);
        
        //等待 鏈接 CountDownLatch -1
        defaultWatcher.setCountDownLatch(countDownLatch);
        //繼續執行
        countDownLatch.await(); 

        return zooKeeper;

    }
}

 

同時由於zookeeper基於watch機制實現發布訂閱,所有的watcher都采用自定義的方式實現,

首先是對連接成功的時候的DefaultWatcher。

 

package com..zookeeper.zkRegistry;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.util.concurrent.CountDownLatch;

public class DefaultWatcher implements Watcher {

    private CountDownLatch countDownLatch;

    public void setCountDownLatch(CountDownLatch countDownLatch){

        this.countDownLatch = countDownLatch;
    }



    @Override
    public void process(WatchedEvent watchedEvent) {

        switch (watchedEvent.getState()) {
            case Unknown:
                break;
            case Disconnected:
                break;
            case NoSyncConnected:
                break;
            case SyncConnected:
                //初始化完成 -1
                countDownLatch.countDown();
                break;
            case AuthFailed:
                break;
            case ConnectedReadOnly:
                break;
            case SaslAuthenticated:
                break;
            case Expired:
                break;
        }
        System.out.println(watchedEvent.toString());

    }

}

 

創建一個接收數據的實體類對象:

package com..zookeeper.zkRegistry;

public class MyConfData {

    private String data;

    public String getData(){

        return data;
    }

    public void setData(String data){

        this.data = data;
    }
}

 

封裝好所有的watcher和回調函數::

package com..zookeeper.zkRegistry;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

public class MyWatcherAndCallBack implements Watcher,AsyncCallback.StatCallback,AsyncCallback.DataCallback {

    private ZooKeeper zooKeeper;

    private MyConfData myConfData;

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public void setZooKeeper(ZooKeeper zooKeeper){

        this.zooKeeper = zooKeeper;
    }

    public void setMyConfData(MyConfData myConfData){

        this.myConfData = myConfData;
    }



    //Watcher
    @Override
    public void process(WatchedEvent watchedEvent) {

        switch (watchedEvent.getType()) {
            case None:
                break;
            case NodeCreated:
                zooKeeper.getData("/conf",this,this,"第二次獲取數據");
                break;
            case NodeDeleted:
                myConfData.setData("");
                countDownLatch = new CountDownLatch(1);
                break;
            case NodeDataChanged:
                zooKeeper.getData("/conf",this,this,"第二次獲取數據");
                break;
            case NodeChildrenChanged:
                break;
        }

    }


    //StatCallback
    @Override
    public void processResult(int i, String s, Object o, Stat stat) {

        zooKeeper.getData("/conf",this,this,"第一次獲取數據");


    }

    //DataCallback
    @Override
    public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {

        if(stat!=null){
            System.out.println("取得數據 : "+new String(bytes));
            myConfData.setData(new String(bytes));

            countDownLatch.countDown();
        }
    }

    public void await() throws InterruptedException {

        zooKeeper.exists("/conf",this,this,"127.0.0.1");

        countDownLatch.await();
    }
}

 

測試類:

(可直接運行 getConfData() 方法,跑項目就放開注調的代碼和注解,注@Test)

package com..zookeeper.zkRegistry;

import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

//@Component
public class ZkClientConfing {

    private ZooKeeper zooKeeper;

    private MyConfData myConfData = new MyConfData();

    private MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack();


    @Before
    public void conn() throws Exception {

        zooKeeper = ZkClientUtil.getZkClient();
    }


    @After
    public void close() throws InterruptedException {

        zooKeeper.close();
    }


    @Test
//    @PostConstruct
    public void getConfData() throws InterruptedException {

//        try {
//            zooKeeper = ZkClientUtil.getZkClient();
//        } catch (Exception e) {
//            e.printStackTrace();
//        }

        myWatcherAndCallBack.setZooKeeper(zooKeeper);

        myWatcherAndCallBack.setMyConfData(myConfData);

        myWatcherAndCallBack.await();

        while (true){

            if(myConfData.getData().equals("")){
                System.out.println("配置中心數據為空!!!!!");
                myWatcherAndCallBack.await();//等待數據輸入
            }
            //為了觀察數據的變化,這里循環打印設置的數據。
            System.out.println("數據 : "+myConfData.getData());

            Thread.sleep(30000);
        }

    }
}

 

 

logback 關閉zookeeper的心跳日志:

<configuration> 

  <logger name="org.apache.zookeeper.ClientCnxn" level="info" />  

</configuration> 

name 為包名

level  為日志級別

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM