使用模板模式,對注冊中心進行設計,可以方便后續添加注冊中心
模板抽象類,提供注冊中心必要的方法。

public abstract class ServiceRegistry { //這是一個模板的抽象類,規定了注冊中心對外提供的方法 //開始注冊服務,參數map中應該包含注冊中心要啟動所需的參數 public abstract void start(Map<String, String> param); //停止注冊中心 public abstract void stop(); //注冊,是從服務端將服務注冊到注冊中心上,keys表示服務端所有的servicekey值,而value表示服務端的地址。 public abstract boolean registry(Set<String> keys, String value); //移除,從注冊中心上將服務移除 public abstract boolean remove(Set<String> keys, String value); //發現,客戶端從注冊中心上發現服務 public abstract Map<String, TreeSet<String>> discovery(Set<String> keys); //發現重載,這里客戶端會發現servicekey下的所有地址。 public abstract TreeSet<String> discovery(String key); }
創建一個本地的注冊中心,使用集合來存儲注冊的數據,實現模板的方法

public class LocalServiceRegistry extends ServiceRegistry { //本地注冊中心的實現 //定義一個map集合來存放注冊的數據 private Map<String, TreeSet<String>> registryData;// @Override public void start(Map<String, String> param) { registryData = new HashMap<String, TreeSet<String>>(); } @Override public void stop() { registryData.clear();//清理數據 } @Override public boolean registry(Set<String> keys, String value) {//這里傳來的是地址和servicekey if (keys==null || keys.size()==0 || value==null || value.trim().length()==0) { return false; } for (String key : keys) {//取出每一個服務key值 TreeSet<String> values = registryData.get(key);//嘗試去取服務的key值 if (values == null) { values = new TreeSet<>(); registryData.put(key, values);//如果取不到,則將每一個服務key都存成一個key值,而values是新建的tree類 } values.add(value);//每次通過不同的地址傳來相同服務key,就將地址存到values中。 //這樣保證了注冊中心的服務key對應了來自不同地址的服務器。 } return true; } @Override public boolean remove(Set<String> keys, String value) { if (keys==null || keys.size()==0 || value==null || value.trim().length()==0) { return false; } for (String key : keys) { TreeSet<String> values = registryData.get(key); if (values != null) { values.remove(value); } //移除每一個key值下的地址 } return true; } @Override public Map<String, TreeSet<String>> discovery(Set<String> keys) { if (keys==null || keys.size()==0) { return null; } Map<String, TreeSet<String>> registryDataTmp = new HashMap<String, TreeSet<String>>(); for (String key : keys) { TreeSet<String> valueSetTmp = discovery(key); if (valueSetTmp != null) { registryDataTmp.put(key, valueSetTmp); } //將每個key值下的地址都取出來,一般不用,只取一個 } return registryDataTmp; } @Override public TreeSet<String> discovery(String key) { return registryData.get(key); //返回其中的一個key值對應的地址 } }
使用zk作為注冊中心。
為什么要選用注冊中心
假設沒有注冊中心,采用直連的方式,如果服務提供者發生變化,那么消費者也要立即更新,耦合度太高
zk作為服務注冊的一個框架,消費者只需要向注冊中心獲取服務提供者的地址,無需自己做更新。達到了解耦合的作用,而且還能實現服務的自動發現。

public class ZkServiceRegistry extends ServiceRegistry { //定義zk的參數 public static final String ENV = "env"; //zk的環境 public static final String ZK_ADDRESS = "zkaddress"; //zk的地址 public static final String ZK_DIGEST = "zkdigest"; //zk的授權方式 //配置 private static final String zkBasePath = "/xxl-rpc"; //zk的基礎路徑 private String zkEnvPath; //zk的環境地址 private XxlZkClient xxlZkClient = null; //zk的操作端 自己定義 private Thread refreshThread;//定義一個刷新的線程,用來執行當服務端地址改變后,注冊中心需要刷新改變 private volatile boolean refreshThreadStop = false;//刷新停止的標志位,線程可見的。 //設置兩個map集合,來存注冊的數據和發現的數據 private volatile ConcurrentMap<String, TreeSet<String>> registryData = new ConcurrentHashMap<String, TreeSet<String>>(); private volatile ConcurrentMap<String, TreeSet<String>> discoveryData = new ConcurrentHashMap<String, TreeSet<String>>(); //提供從key值到路徑的函數 public String keyToPath(String nodeKey){ return zkEnvPath + "/" + nodeKey; } //從路徑到key值 public String pathToKey(String nodePath){ if (nodePath==null || nodePath.length() <= zkEnvPath.length() || !nodePath.startsWith(zkEnvPath)) { return null; } return nodePath.substring(zkEnvPath.length()+1, nodePath.length()); } //開啟注冊 @Override public void start(Map<String, String> param) { //從服務端傳來的數據中取到zk所需的參數 String zkaddress = param.get(ZK_ADDRESS); String zkdigest = param.get(ZK_DIGEST); String env = param.get(ENV); //驗證地址和環境是否為空 // valid if (zkaddress == null || zkaddress.trim().length() == 0) { throw new XxlRpcException("xxl-rpc zkaddress can not be empty"); } if (env == null || env.trim().length() == 0) { throw new XxlRpcException("xxl-rpc env can not be empty"); } //初始化環境地址 zkEnvPath = zkBasePath.concat("/").concat(env); //配置客戶端 //需要完善zk的客戶端 xxlZkClient = new XxlZkClient(zkaddress, zkEnvPath, zkdigest, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { try { // session expire, close old and create new if (watchedEvent.getState() == Event.KeeperState.Expired) {//如果觀察者失效 //刪除舊的,創建新的 xxlZkClient.destroy(); xxlZkClient.getClient(); //刷新發現的數據 refreshDiscoveryData(null); } //得到key值 String path = watchedEvent.getPath(); String key = pathToKey(path); if (key != null) {//如果key不為空 xxlZkClient.getClient().exists(path, true);// // refresh if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {//如果子節點發生改變,則執行刷新的方法 // refreshDiscoveryData (one):one change refreshDiscoveryData(key); } else if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //打印日志,同步連接 } } } catch (Exception e) { //打印日志 } } }); //初始化客戶端 xxlZkClient.getClient(); //刷新的線程啟動 refreshThread=new Thread(new Runnable() { @Override public void run() { //refreshThreadStop一直使false,直到stop,因此,這個線程一直在運行 while (!refreshThreadStop) { try { TimeUnit.SECONDS.sleep(60);// // refreshDiscoveryData (all):cycle check refreshDiscoveryData(null); //刷新發現的數據 refreshRegistryData(); //刷新注冊的數據 } catch (Exception e) { if (!refreshThreadStop) { //打印日志 } } } } }); refreshThread.setName("xxl-rpc, ZkServiceRegistry refresh thread."); refreshThread.setDaemon(true);//設置為守護線程 refreshThread.start(); } //注冊停止 @Override public void stop() { if (xxlZkClient!=null) { xxlZkClient.destroy(); } if (refreshThread != null) { refreshThreadStop = true; refreshThread.interrupt(); } } private void refreshDiscoveryData(String key){ //新建一個set集合來存放key值 Set<String> keys = new HashSet<String>(); if (key!=null && key.trim().length()>0) { keys.add(key);//將key值都加到key里面 } else { if (discoveryData.size() > 0) { keys.addAll(discoveryData.keySet()); } } if (keys.size() > 0) { for (String keyItem: keys) { // add-values String path = keyToPath(keyItem); //子路徑中的數據就是請求的地址 Map<String, String> childPathData = xxlZkClient.getChildPathData(path); // exist-values TreeSet<String> existValues = discoveryData.get(keyItem); if (existValues == null) { existValues = new TreeSet<String>(); discoveryData.put(keyItem, existValues); } if (childPathData.size() > 0) { existValues.clear(); existValues.addAll(childPathData.keySet()); } } } } //刷新注冊數據 private void refreshRegistryData(){ if (registryData.size() > 0) { for (Map.Entry<String, TreeSet<String>> item: registryData.entrySet()) { String key = item.getKey(); for (String value:item.getValue()) { // make path, child path String path = keyToPath(key); xxlZkClient.setChildPathData(path, value, ""); } } } } @Override public boolean registry(Set<String> keys, String value) { //在客戶端里設置數據 for (String key : keys) { // local cache TreeSet<String> values = registryData.get(key); if (values == null) { values = new TreeSet<>(); registryData.put(key, values); } values.add(value); // make path, child path String path = keyToPath(key); xxlZkClient.setChildPathData(path, value, ""); } return true; } @Override public boolean remove(Set<String> keys, String value) { for (String key : keys) { TreeSet<String> values = discoveryData.get(key); if (values != null) { values.remove(value); } String path = keyToPath(key); xxlZkClient.deleteChildPath(path, value); } return true; } @Override public Map<String, TreeSet<String>> discovery(Set<String> keys) { if (keys==null || keys.size()==0) { return null; } Map<String, TreeSet<String>> registryDataTmp = new HashMap<String, TreeSet<String>>(); for (String key : keys) { TreeSet<String> valueSetTmp = discovery(key); if (valueSetTmp != null) { registryDataTmp.put(key, valueSetTmp); } } return registryDataTmp; } @Override public TreeSet<String> discovery(String key) { TreeSet<String> values = discoveryData.get(key); if (values == null) { // refreshDiscoveryData (one):first use refreshDiscoveryData(key); values = discoveryData.get(key); } return values; } }