nacos源碼解析(二)-客戶端如何訪問注冊中心


概述

  客戶端與注冊中心服務端的交互,主要集中在服務注冊,服務下線,服務發現以及訂閱某個服務,其實使用最多的就是服務注冊和服務發現,下面我會從源碼的角度分析一下這四個功能,客戶端是如何處理的,本文不會介紹注冊中心服務端如何處理的,這個之后會寫文章分析。

 

客戶端代碼

public class NamingExample {

    public static void main(String[] args) throws NacosException {

        Properties properties = new Properties();
        properties.setProperty("serverAddr","127.0.0.1:8848");
        properties.setProperty("namespace", "namespace");

        NamingService naming = NamingFactory.createNamingService(properties);
        //服務注冊
        naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");

        naming.registerInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
        //服務發現
        System.out.println(naming.getAllInstances("nacos.test.3"));

        System.out.println("----------------------------------------------------");
        //服務下線
        naming.deregisterInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");

        System.out.println(naming.getAllInstances("nacos.test.3"));

        System.out.println("----------------------------------------------------");
        //服務訂閱
        naming.subscribe("nacos.test.3", new EventListener() {
            @Override
            public void onEvent(Event event) {
                System.out.println(((NamingEvent)event).getServiceName());

                System.out.println("======================================================");
                System.out.println(((NamingEvent)event).getInstances());
            }
        });

        try{
          Thread.sleep(5000000);
        } catch (Exception e){

        }

    }
}
View Code

 

服務注冊分析

@Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

        if (instance.isEphemeral()) {
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
            //為注冊服務設置一個定時任務獲取心跳信息,默認為5s匯報一次
            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }
        //注冊到服務端
        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

注冊主要做了兩件事,第一件事:為注冊的服務設置一個定時任務,定時拉去服務信息。 第二件事:將服務注冊到服務端。下面詳細介紹一下這兩個事,不會貼出源碼,貼出源碼太長了,大家自己看

第一件事詳解

  1.啟動一個定時任務,定時拉取服務信息,時間間隔為5s

  2.如果拉下來服務正常,不做處理,如果不正常,重新注冊

第二件事詳解

  發送http請求給注冊中心服務端,調用服務注冊接口,注冊服務

 

服務發現分析

@Override
    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {

        ServiceInfo serviceInfo;
        if (subscribe) {
            //從本地緩存中獲取,如果本地緩存不存在從服務端拉取
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }
        List<Instance> list;
        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            return new ArrayList<Instance>();
        }
        return list;
    }

服務發現流程如下:

 

服務下線分析

 @Override
    public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        if (instance.isEphemeral()) {
            //移除心跳信息監測的定時任務
            beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), instance.getPort());
        }
        //請求服務端下線接口
        serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
    }

這個和服務注冊干的事情正好相反,也做了兩件事,第一件事:不在進行心跳檢測。  第二件事:請求服務端服務下線接口。下面詳細分析一下

第一件事詳解

  停止進行心跳檢測,並不是把第一步的定時任務給停止了,而是設置了一個參數,當定時任務運行的時候會判斷這個參數,如果為true,直接返回了,不會拉服務端的信息

第二件事詳解

  請求服務端服務下線接口

 

服務訂閱詳解

@Override
    public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {
        eventDispatcher.addListener(hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
            StringUtils.join(clusters, ",")), StringUtils.join(clusters, ","), listener);
    }

public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {

NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");
List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());
observers.add(listener);

observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
if (observers != null) {
observers.add(listener);
}
  
serviceChanged(serviceInfo);
}
 

服務訂閱功能稍微有些復雜,不過也還好,看下面的圖

 

 這里稍微提一下nacos事件處理機制,nacos的事件處理也是采用觀察者模式實現的,具體流程如下

 

   nacos的監聽器和springboot的監聽器還不太一樣,springboot是每次變更一次,主動的觸發通知,然后自己處理這個事件,但是nacos不同,nacos是啟動的時候就會啟動一個死循環,然后這個死循環去消費隊列,如果有某個服務發生了變更,就會把變更的服務信息放入到這個隊列中,注意,這個隊列是LinkedBlockingQueue,是一個阻塞隊列,之后死循環會調用take方法獲取隊列中的信息,如果隊列為空,會阻塞。

  上面的解釋中,大家可能比較感興趣的是,在什么地方捕獲到服務發生變更了,其實在第一次服務發現的時候就會有服務變更事件,因為從服務器上拉去的信息和本地的緩存信息不一致,就會把這個變更的服務信息放入隊列中。

 

在最開始的客戶端代碼中,服務訂閱那一塊的輸出如下:

DEFAULT_GROUP@@nacos.test.3
======================================================
[{"clusterName":"DEFAULT","enabled":true,"ephemeral":true,"healthy":true,"instanceHeartBeatInterval":5000,"instanceHeartBeatTimeOut":15000,"instanceId":"2.2.2.2#9999#DEFAULT#DEFAULT_GROUP@@nacos.test.3","instanceIdGenerator":"simple","ip":"2.2.2.2","ipDeleteTimeout":30000,"metadata":{},"port":9999,"serviceName":"DEFAULT_GROUP@@nacos.test.3","weight":1.0}, {"clusterName":"TEST1","enabled":true,"ephemeral":true,"healthy":true,"instanceHeartBeatInterval":5000,"instanceHeartBeatTimeOut":15000,"instanceId":"11.11.11.11#8888#TEST1#DEFAULT_GROUP@@nacos.test.3","instanceIdGenerator":"simple","ip":"11.11.11.11","ipDeleteTimeout":30000,"metadata":{},"port":8888,"serviceName":"DEFAULT_GROUP@@nacos.test.3","weight":1.0}]
DEFAULT_GROUP@@nacos.test.3
======================================================
[{"clusterName":"TEST1","enabled":true,"ephemeral":true,"healthy":true,"instanceHeartBeatInterval":5000,"instanceHeartBeatTimeOut":15000,"instanceId":"11.11.11.11#8888#TEST1#DEFAULT_GROUP@@nacos.test.3","instanceIdGenerator":"simple","ip":"11.11.11.11","ipDeleteTimeout":30000,"metadata":{},"port":8888,"serviceName":"DEFAULT_GROUP@@nacos.test.3","weight":1.0}]

這里比較神奇的是執行了兩次事件,根據上面的解釋,在訂閱時,無論這個服務有沒有發生變更,都會把服務信息放入到變更隊列中,之后會執行事件,所以第一次打印是正常的,大家可以發現第一次打印的結果中,服務名稱為DEFAULT_GROUP@@nacos.test.3的服務有兩個實例,我們確實注冊了兩個實例,分別為11.11.11.11:88882.2.2.2:9999,但是其實我們已經把2.2.2.2:9999下線了,為什么這里打印的還是兩個呢?

這個就是服務發現那個地方有一個定時更新本地服務信息搗的鬼,那個定時任務是每隔1s執行一次,也就是說我們在執行這段訂閱代碼的時候,本地緩存還沒有刷新,還是兩個實例,所以打印為兩個實例。

有了上面的理解,第二次執行事件就很好理解了,因為定時任務跟新了本地緩存,所以會有一個新的服務變更的事件放入到了隊列中,所以又執行了一次event,為了驗證我們理解的正確性,我們改一下客戶端的代碼,驗證一下我們理解的正確性。

public class NamingExample {

    public static void main(String[] args) throws NacosException {

        Properties properties = new Properties();
        properties.setProperty("serverAddr","127.0.0.1:8848");
        properties.setProperty("namespace", "namespace");

        NamingService naming = NamingFactory.createNamingService(properties);
        //服務注冊
        naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");

        naming.registerInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");
        //服務發現
        System.out.println(naming.getAllInstances("nacos.test.3"));

        System.out.println("----------------------------------------------------");
        //服務下線
        naming.deregisterInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");

        System.out.println(naming.getAllInstances("nacos.test.3"));
       
        //加入1s的延遲
        try{
            Thread.sleep(1000);
        } catch (Exception e){

        }
        System.out.println("----------------------------------------------------");
        //服務訂閱
        naming.subscribe("nacos.test.3", new EventListener() {
            @Override
            public void onEvent(Event event) {
                System.out.println(((NamingEvent)event).getServiceName());

                System.out.println("======================================================");
                System.out.println(((NamingEvent)event).getInstances());
            }
        });

        try{
          Thread.sleep(5000000);
        } catch (Exception e){

        }

    }
}
View Code

 

打印結果如下

DEFAULT_GROUP@@nacos.test.3
======================================================
[{"clusterName":"TEST1","enabled":true,"ephemeral":true,"healthy":true,"instanceHeartBeatInterval":5000,"instanceHeartBeatTimeOut":15000,"instanceId":"11.11.11.11#8888#TEST1#DEFAULT_GROUP@@nacos.test.3","instanceIdGenerator":"simple","ip":"11.11.11.11","ipDeleteTimeout":30000,"metadata":{},"port":8888,"serviceName":"DEFAULT_GROUP@@nacos.test.3","weight":1.0}]

我們在執行訂閱之前加了1s的等待,我們發現就執行了一次事件,這就驗證了我們猜想的正確性。

總結

  總的來說,覺得nacos的代碼寫的比較清晰,看着很舒服,由於本人水平很菜,有什么問題,望大家指正。

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM