分布式注冊配置中心:
zookeeper由於擁有watcher機制,使得其擁有發布訂閱的功能,而發布與訂閱模型,即所謂的配置中心,
顧名思義就是發布者將數據發布到 ZK節點上,供訂閱者動態獲取數據,實現配置信息的集中式管理和動態更新。
應用在啟動的時候會主動來獲取一次配置,同時,在節點上注冊一個 Watcher,這樣一來,以后每次配置有更新的時候,
都會實時通知到訂閱的客戶端,從來達到獲取最新配置信息的目的。
<!-- 添加springboot對 zookeeper 的支持 -->
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.12</version> </dependency>
由於客戶端需要與zookeeper建立連接,獲取數據,添加監控等等一系列的事情,
所以這里封裝一個Utils工具類。
然后對於zookeeper連接客戶端的地址的后面可以緊跟一個path,作為在根目錄下的工作目錄。
該目錄就是作為所有操作的根目錄,這里使用 /zkRegistry
監聽的是該節點下的 conf 節點,
由於zookeeper采用的是異步調用,所以這里需要使用一把鎖鎖住主線程,在連接成功后自動解鎖,
主線程再往下進行。這里使用CountDownLatch實現鎖,在主線程創建,傳遞到DafaultWatcher的回掉函數中。
package com..zookeeper.zkRegistry; import org.apache.zookeeper.ZooKeeper; import java.util.concurrent.CountDownLatch; public class ZkClientUtil { private static ZooKeeper zooKeeper; private static DefaultWatcher defaultWatcher = new DefaultWatcher(); private static CountDownLatch countDownLatch = new CountDownLatch(1); private static String address = "localhost:2181,localhost:2182,localhost:2183,localhost:2184/zkRegistry"; public static ZooKeeper getZkClient() throws Exception { zooKeeper = new ZooKeeper(address,1000,defaultWatcher); //等待 鏈接 CountDownLatch -1 defaultWatcher.setCountDownLatch(countDownLatch); //繼續執行 countDownLatch.await(); return zooKeeper; } }
同時由於zookeeper基於watch機制實現發布訂閱,所有的watcher都采用自定義的方式實現,
首先是對連接成功的時候的DefaultWatcher。
package com..zookeeper.zkRegistry; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.concurrent.CountDownLatch; public class DefaultWatcher implements Watcher { private CountDownLatch countDownLatch; public void setCountDownLatch(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } @Override public void process(WatchedEvent watchedEvent) { switch (watchedEvent.getState()) { case Unknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected: //初始化完成 -1 countDownLatch.countDown(); break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; } System.out.println(watchedEvent.toString()); } }
創建一個接收數據的實體類對象:
package com..zookeeper.zkRegistry; public class MyConfData { private String data; public String getData(){ return data; } public void setData(String data){ this.data = data; } }
封裝好所有的watcher和回調函數::
package com..zookeeper.zkRegistry; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; public class MyWatcherAndCallBack implements Watcher,AsyncCallback.StatCallback,AsyncCallback.DataCallback { private ZooKeeper zooKeeper; private MyConfData myConfData; private CountDownLatch countDownLatch = new CountDownLatch(1); public void setZooKeeper(ZooKeeper zooKeeper){ this.zooKeeper = zooKeeper; } public void setMyConfData(MyConfData myConfData){ this.myConfData = myConfData; } //Watcher @Override public void process(WatchedEvent watchedEvent) { switch (watchedEvent.getType()) { case None: break; case NodeCreated: zooKeeper.getData("/conf",this,this,"第二次獲取數據"); break; case NodeDeleted: myConfData.setData(""); countDownLatch = new CountDownLatch(1); break; case NodeDataChanged: zooKeeper.getData("/conf",this,this,"第二次獲取數據"); break; case NodeChildrenChanged: break; } } //StatCallback @Override public void processResult(int i, String s, Object o, Stat stat) { zooKeeper.getData("/conf",this,this,"第一次獲取數據"); } //DataCallback @Override public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) { if(stat!=null){ System.out.println("取得數據 : "+new String(bytes)); myConfData.setData(new String(bytes)); countDownLatch.countDown(); } } public void await() throws InterruptedException { zooKeeper.exists("/conf",this,this,"127.0.0.1"); countDownLatch.await(); } }
測試類:
(可直接運行 getConfData() 方法,跑項目就放開注調的代碼和注解,注@Test)
package com..zookeeper.zkRegistry; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; //@Component public class ZkClientConfing { private ZooKeeper zooKeeper; private MyConfData myConfData = new MyConfData(); private MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack(); @Before public void conn() throws Exception { zooKeeper = ZkClientUtil.getZkClient(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test // @PostConstruct public void getConfData() throws InterruptedException { // try { // zooKeeper = ZkClientUtil.getZkClient(); // } catch (Exception e) { // e.printStackTrace(); // } myWatcherAndCallBack.setZooKeeper(zooKeeper); myWatcherAndCallBack.setMyConfData(myConfData); myWatcherAndCallBack.await(); while (true){ if(myConfData.getData().equals("")){ System.out.println("配置中心數據為空!!!!!"); myWatcherAndCallBack.await();//等待數據輸入 } //為了觀察數據的變化,這里循環打印設置的數據。 System.out.println("數據 : "+myConfData.getData()); Thread.sleep(30000); } } }
logback 關閉zookeeper的心跳日志:
<configuration>
<logger name="org.apache.zookeeper.ClientCnxn" level="info" />
</configuration>
name 為包名
level 為日志級別