Nacos服务注册的原理(十三)


 

 上面这张图我在写eureka时就画过,这里拿来用下,因为学习同类型东西就是要学会找相同点和不同点,其实Eureka和nacos的注册和消费流程都是一样的,不同点其实就两块,第一块Eureka在数据同步时没有选举的机制,第二点在数据同步时通知客户端的方式不同;

Nacos服务注册需要具备的能力

  • 服务提供者把自己的协议地址注册到Nacos server
  • 服务消费者需要从Nacos Server上去查询服务提供者的地址(根据服务名称)
  • Nacos Server需要感知到服务提供者的上下线的变化
  • 服务消费者需要动态感知到Nacos Server端服务地址的变化

 服务注册

 

springboot和nacos的结合

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.boot</groupId>
            <artifactId>nacos-config-spring-boot-starter</artifactId>
            <version>0.2.7</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.boot</groupId>
            <artifactId>nacos-discovery-spring-boot-starter</artifactId>
            <version>0.2.7</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

application.properties

# 应用名称
spring.application.name=springboot-nacos


management.endpoints.jmx.exposure.include=*
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always


 
alibaba.cloud.access-key=****
alibaba.cloud.secret-key=****


nacos.discovery.server-addr=localhost:8848

服务注册和查询的代码

@RestController
public class TestController {

    @NacosValue ( value = "${info:默认值}",autoRefreshed = true)
    private String info;

    @NacosInjected
    private NamingService namingService;  //服务注册功能的Service
    //查询注册信息
    @GetMapping("/get")
    public String get() throws NacosException {

       // return info;
        return namingService.getAllInstances ( "SpringBoot-Nacos" ).toString ();
    }


    //注册的代码
    @PostMapping("/registry")
    public String registry() throws NacosException {
        //通过Instance注册服务
        Instance instance=new Instance ();
        instance.setClusterName("TestCluster");  //集群名称
        instance.setEnabled(true);   //是否启用
        instance.setEphemeral(true);  //临时节点/持久化节点, CP(Raft), AP(Distro)
        instance.setIp("localhost");
        instance.setPort(8848);
        instance.setWeight(10);  //1~100
        namingService.registerInstance("SpringBoot-Nacos",instance);
        return "SUCCESS";
    }

}

 

 

 

 

 

 

nacos的实现原理

Nacos的源码分析之服务注册的流程

Dubbo服务注册的流程有两个,一个是和之前分析Eureka源码时的路径一样(参考Eureka源码分析)另一个是基于Dubbo本身提供的自动装配机制,而在基于Dubbo服务发布的过程中,是走的事件监听机制,在DubboServiceRegistrationNonWebApplicationAutoConfiguration这个类中,这个类会监听ApplicationStartedEvent事件,这个时间是spring boot在2.0新增的,就是当spring boot应用启动完成之后会发布这个时间。而此时监听到这个事件之后,会触发注册的动作。
 
//springboot启动完成时会发布@EventListener({ApplicationStartedEvent.class})    
@EventListener({ApplicationStartedEvent.class}) public void onApplicationStarted() { this.setServerPort(); this.register(); } private void register() { if (!this.registered) { this.serviceRegistry.register(this.registration); this.registered = true; } }
this.serviceRegistry。 是一个注入的实例: NacosServiceRegistry

NacosServiceRegistry.register

serviceId , 对应当前应用的application.name
group,表示nacos上的分组配置
instance,表示服务实例信息
    public void register(Registration registration) {
        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
        } else {
            String serviceId = registration.getServiceId();
            String group = this.nacosDiscoveryProperties.getGroup();
            Instance instance = this.getNacosInstanceFromRegistration(registration);

            try {
                this.namingService.registerInstance(serviceId, group, instance);
                log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});
            } catch (Exception var6) {
                log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var6});
                ReflectionUtils.rethrowRuntimeException(var6);
            }

        }
    }

NacosNamingService.registerInstance

开始注册实例,主要做两个动作
  • 如果当前注册的是临时节点,则构建心跳信息,通过beat反应堆来构建心跳任务
  • 调用registerService发起服务注册
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        if (instance.isEphemeral()) { //是否是临时节点,如果是临时节点,则构建心跳信息
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
//beatReactor,添加心跳信息进行处理
this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }

registerService

  • 代码逻辑很简单,构建请求参数
  • 发起请求
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});
        Map<String, String> params = new HashMap(9);
        params.put("namespaceId", this.namespaceId);
        params.put("serviceName", serviceName);
        params.put("groupName", groupName);
        params.put("clusterName", instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JSON.toJSONString(instance.getMetadata()));
        this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, "POST");
    }

reqAPI

发起服务注册请求。
  • api: nacos server open api
  • params: 请求参数
  • body:
  • method: 请求方法类型
  • servers: nacos server地址
    public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException {

        params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

        if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
            throw new NacosException(NacosException.INVALID_PARAM, "no server available");
        }

        NacosException exception = new NacosException();
        //如果服务地址不为空
        if (servers != null && !servers.isEmpty()) {
            //随机获取一台服务器节点
            Random random = new Random(System.currentTimeMillis());
            int index = random.nextInt(servers.size());
            //遍历服务列表
            for (int i = 0; i < servers.size(); i++) {
                String server = servers.get(index); //获取索引位置的服务节点
                try {
//调用指定服务
return callServer(api, params, body, server, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", server, e); } }
//轮询 index
= (index + 1) % servers.size(); } } if (StringUtils.isNotBlank(nacosDomain)) { for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) { try { return callServer(api, params, body, nacosDomain, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", nacosDomain, e); } } } } NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg()); throw new NacosException(exception.getErrCode(), "failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + exception.getMessage()); }

callServer

发起服务调用
    public String callServer(String api, Map<String, String> params, String body, String curServer, String method)
        throws NacosException {
        long start = System.currentTimeMillis();
        long end = 0;
//添加签名信息 injectSecurityInfo(
params);
//添加头信息 List
<String> headers = builderHeaders(); String url; //拼接url地址 if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) { url = curServer + api; } else { if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) { curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort; } url = HttpClient.getPrefix() + curServer + api; } //通过httpclient发起请求 HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method); end = System.currentTimeMillis(); MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)) .observe(end - start); if (HttpURLConnection.HTTP_OK == result.code) { //返回服务端的结果 return result.content; } if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) { return StringUtils.EMPTY; } throw new NacosException(result.code, result.content); }

request

发起请求,获得结果
    public static HttpResult request(String url, List<String> headers, Map<String, String> paramValues, String body, String encoding, String method) {
        HttpURLConnection conn = null;
        try {
            String encodedContent = encodingParams(paramValues, encoding);
            url += (StringUtils.isEmpty(encodedContent)) ? "" : ("?" + encodedContent);

            conn = (HttpURLConnection) new URL(url).openConnection();

            setHeaders(conn, headers, encoding);
            conn.setConnectTimeout(CON_TIME_OUT_MILLIS);
            conn.setReadTimeout(TIME_OUT_MILLIS);
            conn.setRequestMethod(method);
            conn.setDoOutput(true);
            if (StringUtils.isNotBlank(body)) {
                byte[] b = body.getBytes();
                conn.setRequestProperty("Content-Length", String.valueOf(b.length));
                conn.getOutputStream().write(b, 0, b.length);
                conn.getOutputStream().flush();
                conn.getOutputStream().close();
            }
            conn.connect();
            if (NAMING_LOGGER.isDebugEnabled()) {
                NAMING_LOGGER.debug("Request from server: " + url);
            }
            return getResult(conn);
        } catch (Exception e) {
            try {
                if (conn != null) {
                    NAMING_LOGGER.warn("failed to request " + conn.getURL() + " from "
                        + InetAddress.getByName(conn.getURL().getHost()).getHostAddress());
                }
            } catch (Exception e1) {
                NAMING_LOGGER.error("[NA] failed to request ", e1);
                //ignore
            }

            NAMING_LOGGER.error("[NA] failed to request ", e);

            return new HttpResult(500, e.toString(), Collections.<String, String>emptyMap());
        } finally {
            IoUtils.closeQuietly(conn);
        }
    }

Nacos服务端的处理

服务端提供了一个InstanceController类,在这个类中提供了服务注册相关的API,而服务端发起初测时,调用的接口是: [post]: /nacos/v1/ns/instance
  • serviceName: 代表客户端的项目名称
  • namespace: nacos 的namespace
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {

        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        //从请求中解析出instance 实例
        final Instance instance = parseInstance(request);

        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }
}

ServiceManager.registerInstance

    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        //创建一个空服务,在Nacos控制台服务列表展示的服务信息,实际上是初始化一个serviceMap,它 是一个ConcurrentHashMap集合
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        //从serviceMap中,根据namespaceId和serviceName得到一个服务对象
        Service service = getService(namespaceId, serviceName);

        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        //调用addInstance创建一个服务实例
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

createServiceIfAbsent

    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
        //从serviceMap中获取服务对象
        Service service = getService(namespaceId, serviceName);
        if (service == null) {

            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {//如果为空。则初始化
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();

            putServiceAndInit(service);
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }

putServiceAndInit

    private void putServiceAndInit(Service service) throws NacosException {
        putService(service);//把服务信息保存到serviceMap集合
        service = getService(service.getNamespaceId(), service.getName());
        service.init();//建立心跳检测机制
        //实现数据一致性监听,ephemeral=true表示采用raft协议,false表示采用Distro
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    }

addInstance

向上返回,然后进入addInstance方法,把服务实例添加到集合中,然后基于一致性协议进行数据的同步。
    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {

        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

        Service service = getService(namespaceId, serviceName);

        synchronized (service) {
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

            Instances instances = new Instances();
            instances.setInstanceList(instanceList);

            consistencyService.put(key, instances);
        }
    }

消费者的服务查询

服务注册成功之后,消费者就可以从nacos server中获取到服务提供者的地址,然后进行服务的调用。在服务消费中,有一个核心的类 NacosDiscoveryClient 来负责和nacos交互,去获得服务提供者的地址信息。前置的具体的流程就不在这里复述了,之前在讲dubbo源码的时候已经分析过服务的订阅过程。NacosDiscoveryClient 中提供了一个 getInstances 方法用来根据服务提供者名称获取服务提供者的url地址的方法.

客户端启动获取服务列表

NacosDiscoveryClient.getInstances

public class NacosDiscoveryClient implements DiscoveryClient {
    private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryClient.class);
    public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client";
    private NacosServiceDiscovery serviceDiscovery;

    public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {
        this.serviceDiscovery = nacosServiceDiscovery;
    }

    public String description() {
        return "Spring Cloud Nacos Discovery Client";
    }

    public List<ServiceInstance> getInstances(String serviceId) {
        try {
            return this.serviceDiscovery.getInstances(serviceId);
        } catch (Exception var3) {
            throw new RuntimeException("Can not get hosts from nacos server. serviceId: " + serviceId, var3);
        }
    }
}
调用NamingService,根据serviceId、group获得服务实例列表。然后把instance转化为ServiceInstance对象
    public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
        String group = this.discoveryProperties.getGroup();
        List<Instance> instances = this.discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true);
        return hostToServiceInstanceList(instances, serviceId);
    }

NacosNamingService.selectInstances

selectInstances首先从hostReactor获取serviceInfo,然后再从serviceInfo.getHosts()剔除非healty、非enabled、weight小于等于0的instance再返回;如果subscribe为true,则执行hostReactor.getServiceInfo获取serviceInfo,否则执行hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo
 
    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {

        ServiceInfo serviceInfo;
        if (subscribe) {//是否订阅服务地址的变化,默认为true
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }
        return selectInstances(serviceInfo, healthy);
    }

HostReactor.getServiceInfo

    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        //拼接服务名称+集群名称(默认为空)
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
         //从ServiceInfoMap中根据key来查找服务提供者列表,ServiceInfoMap是客户端的服务地址的本 地缓存
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

        if (null == serviceObj) {//如果为空,表示本地缓存不存在 //如果找不到则创建一个新的然后放入serviceInfoMap,同时放入updatingMap,执行 updateServiceNow,再从updatingMap移除;
            serviceObj = new ServiceInfo(serviceName, clusters);

            serviceInfoMap.put(serviceObj.getKey(), serviceObj);

            updatingMap.put(serviceName, new Object());
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);

        } else if (updatingMap.containsKey(serviceName)) {
            //如果从serviceInfoMap找出来的serviceObj在updatingMap中则等待UPDATE_HOLD_INTERVAL;
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        //如果本地缓存中存在,则通过scheduleUpdateIfAbsent开启定时任务,再从serviceInfoMap取出 serviceInfo

        scheduleUpdateIfAbsent(serviceName, clusters);

        return serviceInfoMap.get(serviceObj.getKey());
    }
上述代码中,有两个逻辑,分别是
  • updateServiceNow, 立马从Nacos server中去加载服务地址信息
  • scheduleUpdateIfAbsent 开启定时调度,每1s去查询一次服务地址

updateServiceNow

    public void updateServiceNow(String serviceName, String clusters) {
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {

            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);

            if (StringUtils.isNotEmpty(result)) {
                processServiceJSON(result);
            }
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }

queryList

调用 /nacos/v1/ns/instance/list ,从Nacos server端获取服务地址信息。
    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
        throws NacosException {

        final Map<String, String> params = new HashMap<String, String>(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));

        return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
    }

 

 
 
 
 
 
 
 
 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM