nacos服務注冊與發現原理解析


前言:nacos 玩過微服務的想必不會陌生,它是阿里對於springcloud孵化出來的產品,用來完成服務之間的注冊發現和配置中心,其核心作用我就不廢話了,提前去github下載好nacos的源碼包和啟動nacos server

大致流程:每個服務都會有一個nacos client,它用來和nacos server打交道 用來具體的服務注冊 查詢等操作,服務提供者在啟動的時候會向nacos server注冊自己,服務消費者在啟動的時候訂閱nacos server上的服務提供者

服務注冊

首先需要引入spring-cloud-starter-alibaba-nacos-discovery包,本文的引入的版本是2.2.1

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    <version>2.2.1.RELEASE</version>
</dependency>

根據sprin.factories配置來完成相關類的自動注冊

 我們重點來看這幾個類,看名稱可猜到是用來服務注冊的,NacosServiceRegistryAutoConfiguration用來注冊管理這幾個bean

 

NacosServiceRegistry:完成服務注冊,實現ServiceRegistry

NacosRegistration:用來注冊時存儲nacos服務端的相關信息

NacosAutoServiceRegistration 繼承spring中的AbstractAutoServiceRegistration,AbstractAutoServiceRegistration繼承ApplicationListener<WebServerInitializedEvent>,通過事件監聽來發起服務注冊,到時候會調用NacosServiceRegistry.register(registration)

來看具體如何注冊

/******************************************************NacosServiceRegistry******************************************************/
public void register(Registration registration) {
    if (StringUtils.isEmpty(registration.getServiceId())) {
        log.warn("No service to register for nacos client...");
    } else {
        String serviceId = registration.getServiceId();
        String group = this.nacosDiscoveryProperties.getGroup();
        Instance instance = this.getNacosInstanceFromRegistration(registration);
        try {
            this.namingService.registerInstance(serviceId, group, instance);
        } 
    }
}

/******************************************************NacosNamingService******************************************************/
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

    if (instance.isEphemeral()) {
        // 添加心跳檢測
        beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    }
    // 完成服務注冊
    serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}

/******************************************************NacosNamingService******************************************************/
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {

    String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    // 發起一個心跳檢測任務
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

/******************************************************BeatTask******************************************************/
class BeatTask implements Runnable {

    @Override
    public void run() {
        if (beatInfo.isStopped()) {
            return;
        }
        long nextTime = beatInfo.getPeriod();
        try {
            // 向nacos服務發起心跳檢測
            JSONObject result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
            long interval = result.getIntValue("clientBeatInterval");
            boolean lightBeatEnabled = false;
            if (result.containsKey(CommonParams.LIGHT_BEAT_ENABLED)) {
                lightBeatEnabled = result.getBooleanValue(CommonParams.LIGHT_BEAT_ENABLED);
            }
            BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
            if (interval > 0) {
                nextTime = interval;
            }
            int code = NamingResponseCode.OK;
            if (result.containsKey(CommonParams.CODE)) {
                code = result.getIntValue(CommonParams.CODE);
            }
            if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                // 未注冊 先完成注冊
                try {
                    serverProxy.registerService(beatInfo.getServiceName(),
                        NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                } 
            }
        } 
        // 發起下一次心跳檢測
        executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
    }
}

服務提供者向nacos server發起服務注冊前,先向nacos server建立起心跳檢測機制,nacos server那邊也有一個心跳檢測,服務提供者不停的向nacos server發起心跳檢測 告知自己的健康狀態,nacos serve發現該服務心跳檢測時間超時會發布超時事件來告知服務消費者

 

 

服務發現

服務發現由NacosWatch完成,它實現了Spring的Lifecycle接口,容器啟動和銷毀時會調用對應的start()和stop()方法

來看對應源碼

@Override
public void start() {
    // cas設置運行狀態為true
    if (this.running.compareAndSet(false, true)) {
        // 延時執行一個服務發現任務
        this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
                this::nacosServicesWatch, this.properties.getWatchDelay());
    }
}

@Override
public void stop() {
    // 設置運行狀態為false 然后取消正在執行的任務
    if (this.running.compareAndSet(true, false) && this.watchFuture != null) {
        this.watchFuture.cancel(true);
    }
}

public void nacosServicesWatch() {
    try {

        boolean changed = false;
        NamingService namingService = properties.namingServiceInstance();
        // 獲取nacos server上最新的服務提供者們
        ListView<String> listView = properties.namingServiceInstance()
                .getServicesOfServer(1, Integer.MAX_VALUE);

        List<String> serviceList = listView.getData();

        // 有新的訂閱產生 訂閱完后發布事件
        Set<String> currentServices = new HashSet<>(serviceList);
        currentServices.removeAll(cacheServices);
        if (currentServices.size() > 0) {
            changed = true;
        }

        // 取消已經下線的服務訂閱,發起取消訂閱操作並刪除訂閱監聽
        if (cacheServices.removeAll(new HashSet<>(serviceList))
                && cacheServices.size() > 0) {
            changed = true;

            for (String serviceName : cacheServices) {
                namingService.unsubscribe(serviceName,
                        subscribeListeners.get(serviceName));
                subscribeListeners.remove(serviceName);
            }
        }

        cacheServices = new HashSet<>(serviceList);

        // 訂閱服務 並對每個服務都添加一個心跳檢測監聽
        for (String serviceName : cacheServices) {
            if (!subscribeListeners.containsKey(serviceName)) {
                EventListener eventListener = event -> NacosWatch.this.publisher
                        .publishEvent(new HeartbeatEvent(NacosWatch.this,
                                nacosWatchIndex.getAndIncrement()));
                subscribeListeners.put(serviceName, eventListener);
                namingService.subscribe(serviceName, eventListener);

            }
        }
        // 有服務變化 發布事件
        if (changed) {
            this.publisher.publishEvent(
                    new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
        }

    }
    catch (Exception e) {
        log.error("Error watching Nacos Service change", e);
    }
}

大致流程:nacos client這邊在spring容器啟動后執行一個服務訂閱操作的延時任務,這個任務執行時先拉取nacos server那邊最新的服務列表,然后與本地緩存的服務列表進行比較,取消訂閱下線的服務,然后向nacos server發起訂閱操作,訂閱所有服務

那么服務消費者如何實時感知服務提供者的狀態信息呢

 

 

1、服務消費者訂閱后會執行一個輪詢任務(每1s執行一次)用來拉取最新的服務提供者信息並實時更新,實現在HostReactor中的UpdateTask完成,下面來看代碼

public class UpdateTask implements Runnable {
    long lastRefTime = Long.MAX_VALUE;
    private String clusters;
    private String serviceName;

    public UpdateTask(String serviceName, String clusters) {
        this.serviceName = serviceName;
        this.clusters = clusters;
    }

    @Override
    public void run() {
        try {
            // 拿到當前的服務信息
            ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

            // 為空 拉取最新的服務列表隨后更新
            if (serviceObj == null) {
                updateServiceNow(serviceName, clusters);
                // 繼續輪詢
                executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
                return;
            }
            
            if (serviceObj.getLastRefTime() <= lastRefTime) {
                // 當前服務未及時更新 進行更新操作
                updateServiceNow(serviceName, clusters);
                serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
            } else {
                // if serviceName already updated by push, we should not override it
                // since the push data may be different from pull through force push
                refreshOnly(serviceName, clusters);
            }
            // 設置服務最新的更新時間
            lastRefTime = serviceObj.getLastRefTime();
            // 訂閱被取消
            if (!eventDispatcher.isSubscribed(serviceName, clusters) &&
                !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                // abort the update task:
                NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                return;
            }
            // 繼續下一次輪詢
            executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);


        } catch (Throwable e) {
            NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
        }

    }
}

 

2、上面服務注冊時我們說過,服務提供者注冊時nacos服務端也有一個相應的心跳檢測,當心跳檢測超時也就是未及時收到服務提供者的心跳包,nacos server判定該服務狀態異常 隨后通過UDP推送服務信息用來告知對應服務消費者,服務消費者通過PushReceiver來處理udp協議,HostReactor.processServiceJson(String json)來更新本地服務列表

/********************************PushReceiver*****************************/
public void run() {
    while (true) {
        try {
            // byte[] is initialized with 0 full filled by default
            byte[] buffer = new byte[UDP_MSS];
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

            udpSocket.receive(packet);

            String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
            NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());

            PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
            String ack;
            if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                // 處理變更信息
                hostReactor.processServiceJSON(pushPacket.data);

                // send ack to server
                ack = "{\"type\": \"push-ack\""
                    + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                    + "\", \"data\":" + "\"\"}";
            } else if ("dump".equals(pushPacket.type)) {
                // dump data to server
                ack = "{\"type\": \"dump-ack\""
                    + ", \"lastRefTime\": \"" + pushPacket.lastRefTime
                    + "\", \"data\":" + "\""
                    + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
                    + "\"}";
            } else {
                // do nothing send ack only
                ack = "{\"type\": \"unknown-ack\""
                    + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                    + "\", \"data\":" + "\"\"}";
            }

            udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
                ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] error while receiving push data", e);
        }
    }
}

 

服務注冊和訂閱我只講解了主要流程,nacos server那邊處理源碼太多就不一一貼出來了,根據對應的api接口進去一看便知,nacos源碼比較好理解,沒有什么特別難讀懂的地方,這邊只是提供給大家一個看源碼的思路,具體詳細流程還需要讀者自己去細讀

下面通過代碼來模擬nacos服務注冊和訂閱

先啟動一個nacos server,然后打開控制台,添加一個命名空間

服務注冊:分別注冊兩個服務,其中一個服務有兩個實例

public class ServerRegister {

    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.setProperty("serverAddr", "http://localhost:8848");
        properties.setProperty("namespace", "c7981cfd-ccb8-4a9f-8e80-cd1f9633ecec");
        NamingService namingService = NacosFactory.createNamingService(properties);
        // 同一個服務注冊兩個實例
        namingService.registerInstance("serverProvider_1", "127.0.0.1", 8080);
        namingService.registerInstance("serverProvider_1", "127.0.0.1", 8081);
        namingService.registerInstance("serverProvider_2", "127.0.0.1", 7070);
     // 獲取服務名為serverProvider_1的實例信息 List
<Instance> serverProvider = namingService.getAllInstances("serverProvider_1"); System.out.println(JSONArray.toJSONString(serverProvider, SerializerFeature.PrettyFormat, SerializerFeature.WriteMapNullValue, SerializerFeature.WriteDateUseDateFormat)); System.in.read(); } }

服務訂閱:獲取所有的服務提供者,然后進行訂閱 並添加一個事件用來監聽訂閱成功后的實例

public class ServerCustomer {

    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.setProperty("serverAddr", "http://localhost:8848");
        properties.setProperty("namespace", "c7981cfd-ccb8-4a9f-8e80-cd1f9633ecec");
        NamingService namingService = NacosFactory.createNamingService(properties);
        List<String> serverList = namingService.getServicesOfServer(1, Integer.MAX_VALUE).getData();
        System.out.println("得到服務提供者列表:" + JSONArray.toJSONString(serverList));
        for (String server : serverList) {
            // 訂閱serverProvider服務 並添加一個監聽器用來監聽服務狀態
            namingService.subscribe(server, event -> {
                NamingEvent namingEvent = (NamingEvent) event;
                System.out.println(JSONObject.toJSONString(namingEvent, SerializerFeature.PrettyFormat, SerializerFeature.WriteMapNullValue,
                        SerializerFeature.WriteDateUseDateFormat));
            });
        }
        System.in.read();
    }
}

 打印信息:

 

 

 

 nacos控制台也能看到相應服務信息

 

 

 隨后把服務提供者下線 再來看服務消費者那邊的輸出,可以看到服務實例已全部下線

 

 nacos控制台

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM