Nacos服務注冊的原理(十三)


 

 上面這張圖我在寫eureka時就畫過,這里拿來用下,因為學習同類型東西就是要學會找相同點和不同點,其實Eureka和nacos的注冊和消費流程都是一樣的,不同點其實就兩塊,第一塊Eureka在數據同步時沒有選舉的機制,第二點在數據同步時通知客戶端的方式不同;

Nacos服務注冊需要具備的能力

  • 服務提供者把自己的協議地址注冊到Nacos server
  • 服務消費者需要從Nacos Server上去查詢服務提供者的地址(根據服務名稱)
  • Nacos Server需要感知到服務提供者的上下線的變化
  • 服務消費者需要動態感知到Nacos Server端服務地址的變化

 服務注冊

 

springboot和nacos的結合

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.boot</groupId>
            <artifactId>nacos-config-spring-boot-starter</artifactId>
            <version>0.2.7</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.boot</groupId>
            <artifactId>nacos-discovery-spring-boot-starter</artifactId>
            <version>0.2.7</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

application.properties

# 應用名稱
spring.application.name=springboot-nacos


management.endpoints.jmx.exposure.include=*
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always


 
alibaba.cloud.access-key=****
alibaba.cloud.secret-key=****


nacos.discovery.server-addr=localhost:8848

服務注冊和查詢的代碼

@RestController
public class TestController {

    @NacosValue ( value = "${info:默認值}",autoRefreshed = true)
    private String info;

    @NacosInjected
    private NamingService namingService;  //服務注冊功能的Service
    //查詢注冊信息
    @GetMapping("/get")
    public String get() throws NacosException {

       // return info;
        return namingService.getAllInstances ( "SpringBoot-Nacos" ).toString ();
    }


    //注冊的代碼
    @PostMapping("/registry")
    public String registry() throws NacosException {
        //通過Instance注冊服務
        Instance instance=new Instance ();
        instance.setClusterName("TestCluster");  //集群名稱
        instance.setEnabled(true);   //是否啟用
        instance.setEphemeral(true);  //臨時節點/持久化節點, CP(Raft), AP(Distro)
        instance.setIp("localhost");
        instance.setPort(8848);
        instance.setWeight(10);  //1~100
        namingService.registerInstance("SpringBoot-Nacos",instance);
        return "SUCCESS";
    }

}

 

 

 

 

 

 

nacos的實現原理

Nacos的源碼分析之服務注冊的流程

Dubbo服務注冊的流程有兩個,一個是和之前分析Eureka源碼時的路徑一樣(參考Eureka源碼分析)另一個是基於Dubbo本身提供的自動裝配機制,而在基於Dubbo服務發布的過程中,是走的事件監聽機制,在DubboServiceRegistrationNonWebApplicationAutoConfiguration這個類中,這個類會監聽ApplicationStartedEvent事件,這個時間是spring boot在2.0新增的,就是當spring boot應用啟動完成之后會發布這個時間。而此時監聽到這個事件之后,會觸發注冊的動作。
 
//springboot啟動完成時會發布@EventListener({ApplicationStartedEvent.class})    
@EventListener({ApplicationStartedEvent.class}) public void onApplicationStarted() { this.setServerPort(); this.register(); } private void register() { if (!this.registered) { this.serviceRegistry.register(this.registration); this.registered = true; } }
this.serviceRegistry。 是一個注入的實例: NacosServiceRegistry

NacosServiceRegistry.register

serviceId , 對應當前應用的application.name
group,表示nacos上的分組配置
instance,表示服務實例信息
    public void register(Registration registration) {
        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
        } else {
            String serviceId = registration.getServiceId();
            String group = this.nacosDiscoveryProperties.getGroup();
            Instance instance = this.getNacosInstanceFromRegistration(registration);

            try {
                this.namingService.registerInstance(serviceId, group, instance);
                log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});
            } catch (Exception var6) {
                log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var6});
                ReflectionUtils.rethrowRuntimeException(var6);
            }

        }
    }

NacosNamingService.registerInstance

開始注冊實例,主要做兩個動作
  • 如果當前注冊的是臨時節點,則構建心跳信息,通過beat反應堆來構建心跳任務
  • 調用registerService發起服務注冊
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        if (instance.isEphemeral()) { //是否是臨時節點,如果是臨時節點,則構建心跳信息
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
//beatReactor,添加心跳信息進行處理
this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }

registerService

  • 代碼邏輯很簡單,構建請求參數
  • 發起請求
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});
        Map<String, String> params = new HashMap(9);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("groupName", groupName);
        params.put("clusterName", instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JSON.toJSONString(instance.getMetadata()));
        this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, "POST");
    }

reqAPI

發起服務注冊請求。
  • api: nacos server open api
  • params: 請求參數
  • body:
  • method: 請求方法類型
  • servers: nacos server地址
    public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException {

        params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

        if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
            throw new NacosException(NacosException.INVALID_PARAM, "no server available");
        }

        NacosException exception = new NacosException();
        //如果服務地址不為空
        if (servers != null && !servers.isEmpty()) {
            //隨機獲取一台服務器節點
            Random random = new Random(System.currentTimeMillis());
            int index = random.nextInt(servers.size());
            //遍歷服務列表
            for (int i = 0; i < servers.size(); i++) {
                String server = servers.get(index); //獲取索引位置的服務節點
                try {
//調用指定服務
return callServer(api, params, body, server, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", server, e); } }
//輪詢 index
= (index + 1) % servers.size(); } } if (StringUtils.isNotBlank(nacosDomain)) { for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) { try { return callServer(api, params, body, nacosDomain, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", nacosDomain, e); } } } } NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg()); throw new NacosException(exception.getErrCode(), "failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + exception.getMessage()); }

callServer

發起服務調用
    public String callServer(String api, Map<String, String> params, String body, String curServer, String method)
        throws NacosException {
        long start = System.currentTimeMillis();
        long end = 0;
//添加簽名信息 injectSecurityInfo(
params);
//添加頭信息 List
<String> headers = builderHeaders(); String url; //拼接url地址 if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) { url = curServer + api; } else { if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) { curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort; } url = HttpClient.getPrefix() + curServer + api; } //通過httpclient發起請求 HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method); end = System.currentTimeMillis(); MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)) .observe(end - start); if (HttpURLConnection.HTTP_OK == result.code) { //返回服務端的結果 return result.content; } if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) { return StringUtils.EMPTY; } throw new NacosException(result.code, result.content); }

request

發起請求,獲得結果
    public static HttpResult request(String url, List<String> headers, Map<String, String> paramValues, String body, String encoding, String method) {
        HttpURLConnection conn = null;
        try {
            String encodedContent = encodingParams(paramValues, encoding);
            url += (StringUtils.isEmpty(encodedContent)) ? "" : ("?" + encodedContent);

            conn = (HttpURLConnection) new URL(url).openConnection();

            setHeaders(conn, headers, encoding);
            conn.setConnectTimeout(CON_TIME_OUT_MILLIS);
            conn.setReadTimeout(TIME_OUT_MILLIS);
            conn.setRequestMethod(method);
            conn.setDoOutput(true);
            if (StringUtils.isNotBlank(body)) {
                byte[] b = body.getBytes();
                conn.setRequestProperty("Content-Length", String.valueOf(b.length));
                conn.getOutputStream().write(b, 0, b.length);
                conn.getOutputStream().flush();
                conn.getOutputStream().close();
            }
            conn.connect();
            if (NAMING_LOGGER.isDebugEnabled()) {
                NAMING_LOGGER.debug("Request from server: " + url);
            }
            return getResult(conn);
        } catch (Exception e) {
            try {
                if (conn != null) {
                    NAMING_LOGGER.warn("failed to request " + conn.getURL() + " from "
                        + InetAddress.getByName(conn.getURL().getHost()).getHostAddress());
                }
            } catch (Exception e1) {
                NAMING_LOGGER.error("[NA] failed to request ", e1);
                //ignore
            }

            NAMING_LOGGER.error("[NA] failed to request ", e);

            return new HttpResult(500, e.toString(), Collections.<String, String>emptyMap());
        } finally {
            IoUtils.closeQuietly(conn);
        }
    }

Nacos服務端的處理

服務端提供了一個InstanceController類,在這個類中提供了服務注冊相關的API,而服務端發起初測時,調用的接口是: [post]: /nacos/v1/ns/instance
  • serviceName: 代表客戶端的項目名稱
  • namespace: nacos 的namespace
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {

        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        //從請求中解析出instance 實例
        final Instance instance = parseInstance(request);

        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }
}

ServiceManager.registerInstance

    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        //創建一個空服務,在Nacos控制台服務列表展示的服務信息,實際上是初始化一個serviceMap,它 是一個ConcurrentHashMap集合
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        //從serviceMap中,根據namespaceId和serviceName得到一個服務對象
        Service service = getService(namespaceId, serviceName);

        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        //調用addInstance創建一個服務實例
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

createServiceIfAbsent

    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
        //從serviceMap中獲取服務對象
        Service service = getService(namespaceId, serviceName);
        if (service == null) {

            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {//如果為空。則初始化
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();

            putServiceAndInit(service);
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }

putServiceAndInit

    private void putServiceAndInit(Service service) throws NacosException {
        putService(service);//把服務信息保存到serviceMap集合
        service = getService(service.getNamespaceId(), service.getName());
        service.init();//建立心跳檢測機制
        //實現數據一致性監聽,ephemeral=true表示采用raft協議,false表示采用Distro
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    }

addInstance

向上返回,然后進入addInstance方法,把服務實例添加到集合中,然后基於一致性協議進行數據的同步。
    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {

        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

        Service service = getService(namespaceId, serviceName);

        synchronized (service) {
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

            Instances instances = new Instances();
            instances.setInstanceList(instanceList);

            consistencyService.put(key, instances);
        }
    }

消費者的服務查詢

服務注冊成功之后,消費者就可以從nacos server中獲取到服務提供者的地址,然后進行服務的調用。在服務消費中,有一個核心的類 NacosDiscoveryClient 來負責和nacos交互,去獲得服務提供者的地址信息。前置的具體的流程就不在這里復述了,之前在講dubbo源碼的時候已經分析過服務的訂閱過程。NacosDiscoveryClient 中提供了一個 getInstances 方法用來根據服務提供者名稱獲取服務提供者的url地址的方法.

客戶端啟動獲取服務列表

NacosDiscoveryClient.getInstances

public class NacosDiscoveryClient implements DiscoveryClient {
    private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryClient.class);
    public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client";
    private NacosServiceDiscovery serviceDiscovery;

    public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {
        this.serviceDiscovery = nacosServiceDiscovery;
    }

    public String description() {
        return "Spring Cloud Nacos Discovery Client";
    }

    public List<ServiceInstance> getInstances(String serviceId) {
        try {
            return this.serviceDiscovery.getInstances(serviceId);
        } catch (Exception var3) {
            throw new RuntimeException("Can not get hosts from nacos server. serviceId: " + serviceId, var3);
        }
    }
}
調用NamingService,根據serviceId、group獲得服務實例列表。然后把instance轉化為ServiceInstance對象
    public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
        String group = this.discoveryProperties.getGroup();
        List<Instance> instances = this.discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true);
        return hostToServiceInstanceList(instances, serviceId);
    }

NacosNamingService.selectInstances

selectInstances首先從hostReactor獲取serviceInfo,然后再從serviceInfo.getHosts()剔除非healty、非enabled、weight小於等於0的instance再返回;如果subscribe為true,則執行hostReactor.getServiceInfo獲取serviceInfo,否則執行hostReactor.getServiceInfoDirectlyFromServer獲取serviceInfo
 
    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {

        ServiceInfo serviceInfo;
        if (subscribe) {//是否訂閱服務地址的變化,默認為true
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }
        return selectInstances(serviceInfo, healthy);
    }

HostReactor.getServiceInfo

    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        //拼接服務名稱+集群名稱(默認為空)
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
         //從ServiceInfoMap中根據key來查找服務提供者列表,ServiceInfoMap是客戶端的服務地址的本 地緩存
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

        if (null == serviceObj) {//如果為空,表示本地緩存不存在 //如果找不到則創建一個新的然后放入serviceInfoMap,同時放入updatingMap,執行 updateServiceNow,再從updatingMap移除;
            serviceObj = new ServiceInfo(serviceName, clusters);

            serviceInfoMap.put(serviceObj.getKey(), serviceObj);

            updatingMap.put(serviceName, new Object());
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);

        } else if (updatingMap.containsKey(serviceName)) {
            //如果從serviceInfoMap找出來的serviceObj在updatingMap中則等待UPDATE_HOLD_INTERVAL;
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        //如果本地緩存中存在,則通過scheduleUpdateIfAbsent開啟定時任務,再從serviceInfoMap取出 serviceInfo

        scheduleUpdateIfAbsent(serviceName, clusters);

        return serviceInfoMap.get(serviceObj.getKey());
    }
上述代碼中,有兩個邏輯,分別是
  • updateServiceNow, 立馬從Nacos server中去加載服務地址信息
  • scheduleUpdateIfAbsent 開啟定時調度,每1s去查詢一次服務地址

updateServiceNow

    public void updateServiceNow(String serviceName, String clusters) {
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {

            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);

            if (StringUtils.isNotEmpty(result)) {
                processServiceJSON(result);
            }
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }

queryList

調用 /nacos/v1/ns/instance/list ,從Nacos server端獲取服務地址信息。
    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
        throws NacosException {

        final Map<String, String> params = new HashMap<String, String>(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));

        return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
    }

 

 
 
 
 
 
 
 
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM