rpc中的注冊中心


使用模板模式,對注冊中心進行設計,可以方便后續添加注冊中心

模板抽象類,提供注冊中心必要的方法。

public abstract class ServiceRegistry {

    //這是一個模板的抽象類,規定了注冊中心對外提供的方法

    //開始注冊服務,參數map中應該包含注冊中心要啟動所需的參數
    public abstract void start(Map<String, String> param);

    //停止注冊中心
    public abstract void stop();

    //注冊,是從服務端將服務注冊到注冊中心上,keys表示服務端所有的servicekey值,而value表示服務端的地址。
    public abstract boolean registry(Set<String> keys, String value);

    //移除,從注冊中心上將服務移除
    public abstract boolean remove(Set<String> keys, String value);

    //發現,客戶端從注冊中心上發現服務
    public abstract Map<String, TreeSet<String>> discovery(Set<String> keys);

    //發現重載,這里客戶端會發現servicekey下的所有地址。
    public abstract TreeSet<String> discovery(String key);
}
ServiceRegistry

創建一個本地的注冊中心,使用集合來存儲注冊的數據,實現模板的方法

public class LocalServiceRegistry extends ServiceRegistry {

    //本地注冊中心的實現
    //定義一個map集合來存放注冊的數據
    private Map<String, TreeSet<String>> registryData;//



    @Override
    public void start(Map<String, String> param) {
        registryData = new HashMap<String, TreeSet<String>>();
    }

    @Override
    public void stop() {
        registryData.clear();//清理數據
    }

    @Override
    public boolean registry(Set<String> keys, String value) {//這里傳來的是地址和servicekey
        if (keys==null || keys.size()==0 || value==null || value.trim().length()==0) {
            return false;
        }
        for (String key : keys) {//取出每一個服務key值
            TreeSet<String> values = registryData.get(key);//嘗試去取服務的key值
            if (values == null) {
                values = new TreeSet<>();
                registryData.put(key, values);//如果取不到,則將每一個服務key都存成一個key值,而values是新建的tree類
            }
            values.add(value);//每次通過不同的地址傳來相同服務key,就將地址存到values中。
            //這樣保證了注冊中心的服務key對應了來自不同地址的服務器。
        }
        return true;
    }

    @Override
    public boolean remove(Set<String> keys, String value) {
        if (keys==null || keys.size()==0 || value==null || value.trim().length()==0) {
            return false;
        }
        for (String key : keys) {
            TreeSet<String> values = registryData.get(key);
            if (values != null) {
                values.remove(value);
            }
            //移除每一個key值下的地址
        }
        return true;
    }

    @Override
    public Map<String, TreeSet<String>> discovery(Set<String> keys) {
        if (keys==null || keys.size()==0) {
            return null;
        }
        Map<String, TreeSet<String>> registryDataTmp = new HashMap<String, TreeSet<String>>();
        for (String key : keys) {
            TreeSet<String> valueSetTmp = discovery(key);
            if (valueSetTmp != null) {
                registryDataTmp.put(key, valueSetTmp);
            }
            //將每個key值下的地址都取出來,一般不用,只取一個
        }
        return registryDataTmp;
    }

    @Override
    public TreeSet<String> discovery(String key) {
        return registryData.get(key);
        //返回其中的一個key值對應的地址
    }
}
LocalServiceRegistry

 

使用zk作為注冊中心。

為什么要選用注冊中心

假設沒有注冊中心,采用直連的方式,如果服務提供者發生變化,那么消費者也要立即更新,耦合度太高

zk作為服務注冊的一個框架,消費者只需要向注冊中心獲取服務提供者的地址,無需自己做更新。達到了解耦合的作用,而且還能實現服務的自動發現

 

XXL-RPC中每個服務在zookeeper中對應一個節點,如圖"iface name"節點,該服務的每一個provider機器對應"iface name"節點下的一個子節點,如圖中"192.168.0.1:9999"、"192.168.0.2:9999"和"192.168.0.3:9999",子節點類型為zookeeper的EPHMERAL(臨時節點)類型,該類型節點有個特點,當機器和zookeeper集群斷掉連接后節點將會被移除。consumer底層可以從zookeeper獲取到可提供服務的provider集群地址列表,從而可以向其中一個機器發起RPC調用。

public class ZkServiceRegistry extends ServiceRegistry {

    //定義zk的參數
    public static final String ENV = "env";    //zk的環境
    public static final String ZK_ADDRESS = "zkaddress";    //zk的地址
    public static final String ZK_DIGEST = "zkdigest";     //zk的授權方式


    //配置
    private static final String zkBasePath = "/xxl-rpc";  //zk的基礎路徑
    private String zkEnvPath;    //zk的環境地址
    private XxlZkClient xxlZkClient = null;    //zk的操作端  自己定義

    private Thread refreshThread;//定義一個刷新的線程,用來執行當服務端地址改變后,注冊中心需要刷新改變
    private volatile boolean refreshThreadStop = false;//刷新停止的標志位,線程可見的。

    //設置兩個map集合,來存注冊的數據和發現的數據
    private volatile ConcurrentMap<String, TreeSet<String>> registryData = new ConcurrentHashMap<String, TreeSet<String>>();
    private volatile ConcurrentMap<String, TreeSet<String>> discoveryData = new ConcurrentHashMap<String, TreeSet<String>>();


    //提供從key值到路徑的函數
    public String keyToPath(String nodeKey){
        return zkEnvPath + "/" + nodeKey;
    }
    //從路徑到key值
    public String pathToKey(String nodePath){
        if (nodePath==null || nodePath.length() <= zkEnvPath.length() || !nodePath.startsWith(zkEnvPath)) {
            return null;
        }
        return nodePath.substring(zkEnvPath.length()+1, nodePath.length());
    }

    //開啟注冊
    @Override
    public void start(Map<String, String> param) {
        //從服務端傳來的數據中取到zk所需的參數
        String zkaddress = param.get(ZK_ADDRESS);
        String zkdigest = param.get(ZK_DIGEST);
        String env = param.get(ENV);

        //驗證地址和環境是否為空
        // valid
        if (zkaddress == null || zkaddress.trim().length() == 0) {
            throw new XxlRpcException("xxl-rpc zkaddress can not be empty");
        }
        if (env == null || env.trim().length() == 0) {
            throw new XxlRpcException("xxl-rpc env can not be empty");
        }

        //初始化環境地址
        zkEnvPath = zkBasePath.concat("/").concat(env);

        //配置客戶端
        //需要完善zk的客戶端
        xxlZkClient = new XxlZkClient(zkaddress, zkEnvPath, zkdigest, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    // session expire, close old and create new
                    if (watchedEvent.getState() == Event.KeeperState.Expired) {//如果觀察者失效
                        //刪除舊的,創建新的
                        xxlZkClient.destroy();
                        xxlZkClient.getClient();
                        //刷新發現的數據
                        refreshDiscoveryData(null);
                    }
                    //得到key值
                    String path = watchedEvent.getPath();
                    String key = pathToKey(path);
                    if (key != null) {//如果key不為空
                        xxlZkClient.getClient().exists(path, true);//
                        // refresh
                        if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {//如果子節點發生改變,則執行刷新的方法
                            // refreshDiscoveryData (one):one change
                            refreshDiscoveryData(key);
                        } else if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                            //打印日志,同步連接
                        }
                    }
                } catch (Exception e) {
                    //打印日志

                }
            }
        });
        //初始化客戶端
        xxlZkClient.getClient();

        //刷新的線程啟動
        refreshThread=new Thread(new Runnable() {
            @Override
            public void run() {
                //refreshThreadStop一直使false,直到stop,因此,這個線程一直在運行
                while (!refreshThreadStop) {
                    try {
                        TimeUnit.SECONDS.sleep(60);//

                        // refreshDiscoveryData (all):cycle check
                        refreshDiscoveryData(null); //刷新發現的數據
                        refreshRegistryData();  //刷新注冊的數據
                    } catch (Exception e) {
                        if (!refreshThreadStop) {
                            //打印日志
                        }
                    }
                }
            }
        });
        refreshThread.setName("xxl-rpc, ZkServiceRegistry refresh thread.");
        refreshThread.setDaemon(true);//設置為守護線程
        refreshThread.start();

    }

    //注冊停止
    @Override
    public void stop() {
        if (xxlZkClient!=null) {
            xxlZkClient.destroy();
        }
        if (refreshThread != null) {
            refreshThreadStop = true;
            refreshThread.interrupt();
        }
    }


    private void refreshDiscoveryData(String key){

        //新建一個set集合來存放key值
        Set<String> keys = new HashSet<String>();
        if (key!=null && key.trim().length()>0) {
            keys.add(key);//將key值都加到key里面
        } else {
            if (discoveryData.size() > 0) {
                keys.addAll(discoveryData.keySet());
            }
        }
        if (keys.size() > 0) {
            for (String keyItem: keys) {
                // add-values
                String path = keyToPath(keyItem);
                //子路徑中的數據就是請求的地址
                Map<String, String> childPathData = xxlZkClient.getChildPathData(path);

                // exist-values
                TreeSet<String> existValues = discoveryData.get(keyItem);
                if (existValues == null) {
                    existValues = new TreeSet<String>();
                    discoveryData.put(keyItem, existValues);
                }

                if (childPathData.size() > 0) {
                    existValues.clear();
                    existValues.addAll(childPathData.keySet());
                }
            }
        }

    }
    //刷新注冊數據
    private void refreshRegistryData(){
        if (registryData.size() > 0) {
            for (Map.Entry<String, TreeSet<String>> item: registryData.entrySet()) {
                String key = item.getKey();
                for (String value:item.getValue()) {
                    // make path, child path
                    String path = keyToPath(key);
                    xxlZkClient.setChildPathData(path, value, "");
                }
            }
        }
    }

    @Override
    public boolean registry(Set<String> keys, String value) {
        //在客戶端里設置數據
        for (String key : keys) {
            // local cache
            TreeSet<String> values = registryData.get(key);
            if (values == null) {
                values = new TreeSet<>();
                registryData.put(key, values);
            }
            values.add(value);

            // make path, child path
            String path = keyToPath(key);
            xxlZkClient.setChildPathData(path, value, "");
        }
        return true;
    }

    @Override
    public boolean remove(Set<String> keys, String value) {
        for (String key : keys) {
            TreeSet<String> values = discoveryData.get(key);
            if (values != null) {
                values.remove(value);
            }
            String path = keyToPath(key);
            xxlZkClient.deleteChildPath(path, value);
        }

        return true;
    }

    @Override
    public Map<String, TreeSet<String>> discovery(Set<String> keys) {
        if (keys==null || keys.size()==0) {
            return null;
        }
        Map<String, TreeSet<String>> registryDataTmp = new HashMap<String, TreeSet<String>>();
        for (String key : keys) {
            TreeSet<String> valueSetTmp = discovery(key);
            if (valueSetTmp != null) {
                registryDataTmp.put(key, valueSetTmp);
            }
        }
        return registryDataTmp;
    }

    @Override
    public TreeSet<String> discovery(String key) {
        TreeSet<String> values = discoveryData.get(key);
        if (values == null) {

            // refreshDiscoveryData (one):first use
            refreshDiscoveryData(key);

            values = discoveryData.get(key);
        }

        return values;
    }
}
ZkServiceRegistry

 

zk的注冊中心類提供了一系列的方法,並通過定義的zkclient類來操作zk,創建和更新節點,這樣就可以完成注冊的功能

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM