nacos服务注册源码解析


1.客户端使用

compile 'com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-discovery:2.2.3.RELEASE'
compile 'com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-config:2.2.3.RELEASE'

bootstrap.ym中

spring:
  application:
    name: product
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
        service: ${spring.application.name}
        namespace: 038b8be8-54da-44a5-9664-def33bc8cd19
        group: DEFAULT_GROUP
      config:
        server-addr: 127.0.0.1:8848
        namespace: 038b8be8-54da-44a5-9664-def33bc8cd19
        group: DEFAULT_GROUP
        prefix: ${spring.application.name}
@EnableDiscoveryClient
@SpringBootApplication
public class ProductApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProductApplication.class, args);
    }
}

2.服务端代码

git clone https://github.com/alibaba/nacos.git

将源码导入到idea,然后打开console项目中的application.properties配置文件

将db开头的配置放开

找到项目distribution\conf下的SQL文件nacos-mysql.sql,导入数据库

在C:\Users\yue\nacos目录下新建conf文件夹,文件夹下新建一个配置文件cluster.conf

启动Nacos时设置VM参数

-Dnscos.standalone=true -Dnacos.home=C:\Users\yue\nacos -Dserver.port=9000

启动成功后访问地址http://127.0.0.1:9000/nacos/index.html,账号密码默认nacos

3. Spring服务发现的统一规范

Spring将这套规范定义在Spring Cloud Common中

  • circuitbreaker包下定义了断路器的规范
  • discovery包下面定义了服务发现的规范
  • loadbalancer包下面定义了负载均衡的规范
  • serviceregistry包下面定义了服务注册的规范

serviceregistry包定义了三个核心接口,用来实现服务注册。

AutoServiceRegistration:用于服务自动注册

Registration:用于存储服务信息

ServiceRegistry:用于注册/移除服务

4.Nacos客户端服务注册实现

Nacos服务注册模块按照Spring Cloud的规范实现这三个接口

 4.1NacosAutoServiceRegistration

这个类实现了服务自动注册到Nacos注册中心的功能,它继承AbstractAutoServiceRegistration类。

public abstract class AbstractAutoServiceRegistration<R extends Registration>
        implements AutoServiceRegistration, ApplicationContextAware,
        ApplicationListener<WebServerInitializedEvent> {
            ...
}

AbstractAutoServiceRegistration实现AutoServiceRegistration接口来达到服务自动注册,实现ApplicationListener<WebServerInitializedEvent>接口来实现事件回调;当容器启动,应用上下文被刷新且程序准备就绪之后会触发WebServerInitializedEvent事件,调用onApplicationEvent()方法。

@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
    bind(event);
}

@Deprecated
public void bind(WebServerInitializedEvent event) {
    ApplicationContext context = event.getApplicationContext();
    if (context instanceof ConfigurableWebServerApplicationContext) {
        if ("management".equals(((ConfigurableWebServerApplicationContext) context)
                .getServerNamespace())) {
            return;
        }
    }
    this.port.compareAndSet(0, event.getWebServer().getPort());
    this.start();
}

public void start() {
    if (!isEnabled()) {
        if (logger.isDebugEnabled()) {
            logger.debug("Discovery Lifecycle disabled. Not starting");
        }
        return;
    }
    if (!this.running.get()) {
        this.context.publishEvent(
                new InstancePreRegisteredEvent(this, getRegistration()));
        register(); if (shouldRegisterManagement()) {
            registerManagement();
        }
        this.context.publishEvent(
                new InstanceRegisteredEvent<>(this, getConfiguration()));
        this.running.compareAndSet(false, true);
    }
}

protected void register() {
    //实际上最终是调用serviceRegistry.register方法
    this.serviceRegistry.register(getRegistration());
}

4.2NacosServiceRegistry

@Override
public void register(Registration registration) {
    if (StringUtils.isEmpty(registration.getServiceId())) {
        log.warn("No service to register for nacos client...");
        return;
    }
    NamingService namingService = namingService();
    String serviceId = registration.getServiceId();
    String group = nacosDiscoveryProperties.getGroup();

    Instance instance = getNacosInstanceFromRegistration(registration);
    try {
        // 最终由namingService实现服务注册
 namingService.registerInstance(serviceId, group, instance);
        log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                instance.getIp(), instance.getPort());
    }
    catch (Exception e) {
        log.error("nacos registry, {} register failed...{},", serviceId,
                registration.toString(), e);
        rethrowRuntimeException(e);
    }
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
     //添加定时心跳任务 beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); }
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); final Map<String, String> params = new HashMap<String, String>(16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); //最终使用http请求注册服务 reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); }
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    BeatInfo existBeat = null;
    if ((existBeat = dom2Beat.remove(key)) != null) {
        existBeat.setStopped(true);
    }
    dom2Beat.put(key, beatInfo);
    //添加心跳任务
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
    }
    Map<String, String> params = new HashMap<String, String>(8);
    Map<String, String> bodyMap = new HashMap<String, String>(2);
    if (!lightBeatEnabled) {
        bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
    }
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
    params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
    params.put("ip", beatInfo.getIp());
    params.put("port", String.valueOf(beatInfo.getPort()));
    //调用http来监控心跳
    String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
    return JacksonUtils.toObj(result);
}

总结:

服务的自动注册其实是利用了SpringWebServerInitializedEvent事件,最终由namingService完成服务注册工作。

当容器启动完成,触发事件,开始执行register服务,发起服务注册请求的同时添加一个心跳检测任务。

5.服务注册配置类加载

Spring Boot中有一种非常解耦的扩展机制:Spring Factories。这种扩展机制实际上是仿照Java中的SPI扩展机制来实现的

在Spring中也有一种类似与Java SPI的加载机制。它在META-INF/spring.factories文件中配置接口的实现类名称,然后在程序中读取这些配置文件并实例化。
这种自定义的SPI机制是Spring Boot Starter实现的基础。

SpringBoot启动的时候会读取所有jar包下面的META-INF/spring.factories文件; 并且将文件中的 接口/抽象类 对应的实现类都对应起来,并在需要的时候可以实例化对应的实现类

 

5.1 NacosServiceRegistryAutoConfiguration

这个配置类实例化了 NacosServiceRegistryNacosRegistrationNacosAutoServiceRegistration这三个 bean。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
        matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
        AutoServiceRegistrationAutoConfiguration.class,
        NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

    @Bean
    public NacosServiceRegistry nacosServiceRegistry(
            NacosDiscoveryProperties nacosDiscoveryProperties) {
        return new NacosServiceRegistry(nacosDiscoveryProperties);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosRegistration nacosRegistration(
            ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
            NacosDiscoveryProperties nacosDiscoveryProperties,
            ApplicationContext context) {
        return new NacosRegistration(registrationCustomizers.getIfAvailable(),
                nacosDiscoveryProperties, context);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosAutoServiceRegistration nacosAutoServiceRegistration(
            NacosServiceRegistry registry,
            AutoServiceRegistrationProperties autoServiceRegistrationProperties,
            NacosRegistration registration) {
        return new NacosAutoServiceRegistration(registry,
                autoServiceRegistrationProperties, registration);
    }
}

1. @EnableConfigurationProperties 可以让用了@ConfigurationProperties注解但是没有使用@Component等注解实例化为Bean的类生效,因为没有@Component注解,Spring容器不会实例化这个类,通过@EnableConfigurationProperties注解让NacosdiscoveryProperties被实例化并注入到容器中。

2. @ConditionalOnNacosDiscoveryEnabled 是当spring.cloud.nacos.discovery.enabled=true时当前配置类才会生效。

3. @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",matchIfMissing = true)是当spring.cloud.service-registry.auto-registration.enabled=true时当前配置类才会生效。

4. @AutoConfigureAfter(***) 是当括号类的配置类加载生效后再加载当前配置类。

5.2  NacosDiscoveryEndpointAutoConfiguration

这个配置类实例化NamingService。

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Endpoint.class)
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryEndpointAutoConfiguration {

    @Bean
    ...

    @Bean
    @ConditionalOnEnabledHealthIndicator("nacos-discovery")
    public HealthIndicator nacosDiscoveryHealthIndicator(
            NacosServiceManager nacosServiceManager,
            NacosDiscoveryProperties nacosDiscoveryProperties) {
        Properties nacosProperties = nacosDiscoveryProperties.getNacosProperties();
        return new NacosDiscoveryHealthIndicator(
                nacosServiceManager.getNamingService(nacosProperties));
    }
}

NacosServiceManager类下:
public NamingService getNamingService(Properties properties) {
    if (Objects.isNull(this.namingService)) {
        buildNamingService(properties);
    }
    return namingService;
}
private NamingService buildNamingService(Properties properties) {
    if (Objects.isNull(namingService)) {
        synchronized (NacosServiceManager.class) {
            if (Objects.isNull(namingService)) {
                namingService = createNewNamingService(properties);
            }
        }
    }
    return namingService;
}
private NamingService createNewNamingService(Properties properties) {
    try {
        return createNamingService(properties);
    }
    ...
}
NacosFactory类下:
public static NamingService createNamingService(Properties properties) throws NacosException {
    return NamingFactory.createNamingService(properties);
}
NamingFactory类下:
public static NamingService createNamingService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
        // 通过反射机制创建NamingService的对象
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        NamingService vendorImpl = (NamingService) constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}
NacosNamingService类下:
public NacosNamingService(Properties properties) throws NacosException {
    init(properties);
}
private void init(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    //初始化命名空间,注册中心地址,缓存,日志等
    this.namespace = InitUtils.initNamespaceForNaming(properties);
    InitUtils.initSerialization();
    initServerAddr(properties);
    InitUtils.initWebRootContext();
    initCacheDir();
    initLogName(properties);
    //初始化事件分发器,服务代理,心跳机制等
    this.eventDispatcher = new EventDispatcher();
    this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
    this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
    this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir,
            isLoadCacheAtStart(properties), initPollingThreadCount(properties));
}

6.服务端服务注册

流程图参考:https://www.processon.com/view/5e25b762e4b04579e409e81f

服务端简单总结:

1.当客户端发起服务注册请求时,将服务添加进本地缓存,并建立心跳检测

2.保存实例时通过实例名称的前缀来判断是临时实例还是永久实例。

3.如果是临时实例,通过阻塞队列将实例更新到内存注册表,然后又通过阻塞队列将实例信息异步批量同步到集群其他节点

4.如果是永久实例,判断当前节点是不是Leader,如果不是,则将服务注册请求转发到Leader节点,如果是Leader,将实例信息到异步更新内存注册表和同步写入文件,然后将实例信息同步到集群其他节点。

naming项目下的 InstanceController类

@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    final Instance instance = parseInstance(request);
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    // ①创建1个空服务
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    // 从本地缓存serviceMap中获取服务对象
    Service service = getService(namespaceId, serviceName);
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
    // ②服务注册
 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

①: createEmptyService()方法

// 如果服务不存在,创建一个服务
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
        throws NacosException {
    Service service = getService(namespaceId, serviceName);
    if (service == null) {
        // 如果服务不存在,创建一个空的服务
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        service.recalculateChecksum();
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        service.validate();
        // 将创建的空的服务添加进本地缓存,并初始化
 putServiceAndInit(service);
        if (!local) {
            addOrReplaceService(service);
        }
    }
}
private void putServiceAndInit(Service service) throws NacosException {
    // 将服务加入serviceMap缓存
 putService(service); // 建立心跳检测任务机制,默认五秒
 service.init(); // 实现数据一致性的监听
    consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
public void putService(Service service) {
    if (!serviceMap.containsKey(service.getNamespaceId())) {
        synchronized (putServiceLock) {
            if (!serviceMap.containsKey(service.getNamespaceId())) {
                serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
            }
        }
    }
    serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
//建立心跳检测任务clientBeatCheckTask,默认五秒
public void init() { HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); } }

心跳定时任务

@Override
public void run() {
    try {
        ...省略
        List<Instance> instances = service.allIPs(true);
        // 设置实例的运行状况
        for (Instance instance : instances) {
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
                    if (instance.isHealthy()) {
                        instance.setHealthy(false);
                        getPushService().serviceChanged(service);
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }
        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }
        // 删除过期的实例
        for (Instance instance : instances) {
            if (instance.isMarked()) {
                continue;
            }
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                // 删除实例
 deleteIp(instance);
            }
        }
    } catch (Exception e) {
        Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    }
}

addInstance()方法

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException {
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    Service service = getService(namespaceId, serviceName);
    synchronized (service) {
        // 比较并获取新的实例列表
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
        // 保存服务实例
 consistencyService.put(key, instances);
    }
}

区分是临时实例还是永久实例

public void put(String key, Record value) throws NacosException {
    mapConsistencyService(key).put(key, value);
}
//通过判断key的前缀字符串来判断
private ConsistencyService mapConsistencyService(String key) { return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }

临时实例ephemeralConsistencyService.put()

@PostConstruct
public void init() {
    //提交通知任务
    GlobalExecutor.submitDistroNotifyTask(notifier);
}

@Override
public void put(String key, Record value) throws NacosException {
    //①注册实例
 onPut(key, value);
    //②同步实例到其他节点
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
            globalConfig.getTaskDispatchPeriod() / 2);
}

①注册实例更新到缓存

public void onPut(String key, Record value) {
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        // 保存命名数据
        dataStore.put(key, datum);
    }
    if (!listeners.containsKey(key)) {
        return;
    }
    //添加任务
 notifier.addTask(key, DataOperation.CHANGE);
}
public void addTask(String datumKey, DataOperation action) {
    if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
        return;
    }
    if (action == DataOperation.CHANGE) {
        services.put(datumKey, StringUtils.EMPTY);
    }
    tasks.offer(Pair.with(datumKey, action));
}
// task.run调用
private void handle(Pair<String, DataOperation> pair) {
    try {
        ...省略
        for (RecordListener listener : listeners.get(datumKey)) {
            count++;
            try {
                if (action == DataOperation.CHANGE) {
                    listener.onChange(datumKey, dataStore.get(datumKey).value);
                    continue;
                }
                if (action == DataOperation.DELETE) {
                    listener.onDelete(datumKey);
                    continue;
                }
            ...
        }
        ...
    } ...
}
@Override
public void onChange(String key, Instances value) throws Exception {
    ...
   //更新实例,运用CopyAndWrite updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
  //重新计算校验 recalculateChecksum(); }

②同步实例到所有远程服务器

异步批量同步

public void sync(DistroKey distroKey, DataOperation action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                each.getAddress());
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
        }
    }
}

永久实例RaftConsistencyServiceImpl

public void put(String key, Record value) throws NacosException {
    checkIsStopWork();
    try {
        raftCore.signalPublish(key, value);
    } catch (Exception e) {
        ...
    }
}
public void signalPublish(String key, Record value) throws Exception {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    //当前节点不是 Leader
    if (!isLeader()) {
        ObjectNode params = JacksonUtils.createEmptyJsonNode();
        params.put("key", key);
        params.replace("value", JacksonUtils.transferToJsonNode(value));
        Map<String, String> parameters = new HashMap<>(1);
        parameters.put("key", key);
        final RaftPeer leader = getLeader();
        // 将注册请求转发到集群的 Leader节点
        raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
        return;
    }
    //当前节点是 Leader
    OPERATE_LOCK.lock();
    try {
        final long start = System.currentTimeMillis();
        final Datum datum = new Datum();
        datum.key = key;
        datum.value = value;
        if (getDatum(key) == null) {
            datum.timestamp.set(1L);
        } else {
            datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
        }
        ObjectNode json = JacksonUtils.createEmptyJsonNode();
        json.replace("datum", JacksonUtils.transferToJsonNode(datum));
        json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
        // 开始发布
 onPublish(datum, peers.local());
        final String content = json.toString();
        // 利用 CountDownLatch 实现过半
        final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
        for (final String server : peers.allServersIncludeMyself()) {
            if (isLeader(server)) {
                latch.countDown();
                continue;
            }
            final String url = buildUrl(server, API_ON_PUB);
            // 同步实例信息给集群节点
            HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
                @Override
                public void onReceive(RestResult<String> result) {
                    if (!result.ok()) {
                        ...
                        return;
                    }
                    latch.countDown();
                }
                @Override
                public void onError(Throwable throwable) {
                    Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
                }
                @Override
                public void onCancel() {
                }
            });
        }
        if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
            ...
        }

        long end = System.currentTimeMillis();
        Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
    } finally {
        OPERATE_LOCK.unlock();
    }
}

public void onPublish(Datum datum, RaftPeer source) throws Exception {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    RaftPeer local = peers.local();
    if (datum.value == null) {
        ...
    }
    if (!peers.isLeader(source.ip)) {
        ...
    }
    if (source.term.get() < local.term.get()) {
        ...
    }
    local.resetLeaderDue();
    // 持久化数据,写入文件
    if (KeyBuilder.matchPersistentKey(datum.key)) {
        raftStore.write(datum);
    }
    datums.put(datum.key, datum);
    if (isLeader()) {
        local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
    } else {
        if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
            //set leader term:
            getLeader().term.set(source.term.get());
            local.term.set(getLeader().term.get());
        } else {
            local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
        }
    }
    raftStore.updateTerm(local.term.get());
    // 发布数据更新事件
 NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
    Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}

一致性监听这里采用了委派机制 ,同时执行了 Distro 和 Raft 两个实现类。由此可以说明Nacos同时使用 Raft 协议和 Distro 协议维护数据一致性的。

@Override
public void listen(String key, RecordListener listener) throws NacosException { // this special key is listened by both: if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) { persistentConsistencyService.listen(key, listener); ephemeralConsistencyService.listen(key, listener); return; } mapConsistencyService(key).listen(key, listener); }

7.启动加载

@EnableDiscoveryClient
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
    //默认自动将本服务注册到服务注册中心
    boolean autoRegister() default true;
}

@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableDiscoveryClientImportSelector
        extends SpringFactoryImportSelector<EnableDiscoveryClient> {
    @Override
    public String[] selectImports(AnnotationMetadata metadata) {
        String[] imports = super.selectImports(metadata);
        AnnotationAttributes attributes = AnnotationAttributes.fromMap(
                metadata.getAnnotationAttributes(getAnnotationClass().getName(), true));
        // 获取@EnableDiscoveryClient注解的属性autoRegister的值
        boolean autoRegister = attributes.getBoolean("autoRegister");
        if (autoRegister) {
            // 加载AutoServiceRegistrationConfiguration配置类
            List<String> importsList = new ArrayList<>(Arrays.asList(imports));
            importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
            imports = importsList.toArray(new String[0]);
        }
        else {
            // 设置spring.cloud.service-registry.auto-registration.enabled=false, 关闭服务自动注册功能
            Environment env = getEnvironment();
            if (ConfigurableEnvironment.class.isInstance(env)) {
                ConfigurableEnvironment configEnv = (ConfigurableEnvironment) env;
                LinkedHashMap<String, Object> map = new LinkedHashMap<>();
                map.put("spring.cloud.service-registry.auto-registration.enabled", false);
                MapPropertySource propertySource = new MapPropertySource(
                        "springCloudDiscoveryClient", map);
                configEnv.getPropertySources().addLast(propertySource);
            }
        }
        return imports;
    }
    ...省略
}

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public class AutoServiceRegistrationConfiguration {
}

@ConfigurationProperties("spring.cloud.service-registry.auto-registration")
public class AutoServiceRegistrationProperties {
    // 是否启用服务自动注册。默认值为true
    private boolean enabled = true;
    // 是否将管理注册为服务。默认值为true
    private boolean registerManagement = true;
    // 如果没有自动注册,启动是否失败。默认值为假
    private boolean failFast = false;
    ...省略
}

综上,没有使用@EnableDiscoveryClient注解,服务也会被自动注册,默认自动注册为true。

如果不想自动注册服务,可以通过

@EnableDiscoveryClient(autoRegister = false)
或者
spring.cloud.service-registry.auto-registration.enabled=false

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM