nacos服務注冊源碼解析


1.客戶端使用

compile 'com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-discovery:2.2.3.RELEASE'
compile 'com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-config:2.2.3.RELEASE'

bootstrap.ym中

spring:
  application:
    name: product
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
        service: ${spring.application.name}
        namespace: 038b8be8-54da-44a5-9664-def33bc8cd19
        group: DEFAULT_GROUP
      config:
        server-addr: 127.0.0.1:8848
        namespace: 038b8be8-54da-44a5-9664-def33bc8cd19
        group: DEFAULT_GROUP
        prefix: ${spring.application.name}
@EnableDiscoveryClient
@SpringBootApplication
public class ProductApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProductApplication.class, args);
    }
}

2.服務端代碼

git clone https://github.com/alibaba/nacos.git

將源碼導入到idea,然后打開console項目中的application.properties配置文件

將db開頭的配置放開

找到項目distribution\conf下的SQL文件nacos-mysql.sql,導入數據庫

在C:\Users\yue\nacos目錄下新建conf文件夾,文件夾下新建一個配置文件cluster.conf

啟動Nacos時設置VM參數

-Dnscos.standalone=true -Dnacos.home=C:\Users\yue\nacos -Dserver.port=9000

啟動成功后訪問地址http://127.0.0.1:9000/nacos/index.html,賬號密碼默認nacos

3. Spring服務發現的統一規范

Spring將這套規范定義在Spring Cloud Common中

  • circuitbreaker包下定義了斷路器的規范
  • discovery包下面定義了服務發現的規范
  • loadbalancer包下面定義了負載均衡的規范
  • serviceregistry包下面定義了服務注冊的規范

serviceregistry包定義了三個核心接口,用來實現服務注冊。

AutoServiceRegistration:用於服務自動注冊

Registration:用於存儲服務信息

ServiceRegistry:用於注冊/移除服務

4.Nacos客戶端服務注冊實現

Nacos服務注冊模塊按照Spring Cloud的規范實現這三個接口

 4.1NacosAutoServiceRegistration

這個類實現了服務自動注冊到Nacos注冊中心的功能,它繼承AbstractAutoServiceRegistration類。

public abstract class AbstractAutoServiceRegistration<R extends Registration>
        implements AutoServiceRegistration, ApplicationContextAware,
        ApplicationListener<WebServerInitializedEvent> {
            ...
}

AbstractAutoServiceRegistration實現AutoServiceRegistration接口來達到服務自動注冊,實現ApplicationListener<WebServerInitializedEvent>接口來實現事件回調;當容器啟動,應用上下文被刷新且程序准備就緒之后會觸發WebServerInitializedEvent事件,調用onApplicationEvent()方法。

@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
    bind(event);
}

@Deprecated
public void bind(WebServerInitializedEvent event) {
    ApplicationContext context = event.getApplicationContext();
    if (context instanceof ConfigurableWebServerApplicationContext) {
        if ("management".equals(((ConfigurableWebServerApplicationContext) context)
                .getServerNamespace())) {
            return;
        }
    }
    this.port.compareAndSet(0, event.getWebServer().getPort());
    this.start();
}

public void start() {
    if (!isEnabled()) {
        if (logger.isDebugEnabled()) {
            logger.debug("Discovery Lifecycle disabled. Not starting");
        }
        return;
    }
    if (!this.running.get()) {
        this.context.publishEvent(
                new InstancePreRegisteredEvent(this, getRegistration()));
        register(); if (shouldRegisterManagement()) {
            registerManagement();
        }
        this.context.publishEvent(
                new InstanceRegisteredEvent<>(this, getConfiguration()));
        this.running.compareAndSet(false, true);
    }
}

protected void register() {
    //實際上最終是調用serviceRegistry.register方法
    this.serviceRegistry.register(getRegistration());
}

4.2NacosServiceRegistry

@Override
public void register(Registration registration) {
    if (StringUtils.isEmpty(registration.getServiceId())) {
        log.warn("No service to register for nacos client...");
        return;
    }
    NamingService namingService = namingService();
    String serviceId = registration.getServiceId();
    String group = nacosDiscoveryProperties.getGroup();

    Instance instance = getNacosInstanceFromRegistration(registration);
    try {
        // 最終由namingService實現服務注冊
 namingService.registerInstance(serviceId, group, instance);
        log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                instance.getIp(), instance.getPort());
    }
    catch (Exception e) {
        log.error("nacos registry, {} register failed...{},", serviceId,
                registration.toString(), e);
        rethrowRuntimeException(e);
    }
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
     //添加定時心跳任務 beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); }
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); final Map<String, String> params = new HashMap<String, String>(16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); //最終使用http請求注冊服務 reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); }
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    BeatInfo existBeat = null;
    if ((existBeat = dom2Beat.remove(key)) != null) {
        existBeat.setStopped(true);
    }
    dom2Beat.put(key, beatInfo);
    //添加心跳任務
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
    if (NAMING_LOGGER.isDebugEnabled()) {
        NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
    }
    Map<String, String> params = new HashMap<String, String>(8);
    Map<String, String> bodyMap = new HashMap<String, String>(2);
    if (!lightBeatEnabled) {
        bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
    }
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
    params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
    params.put("ip", beatInfo.getIp());
    params.put("port", String.valueOf(beatInfo.getPort()));
    //調用http來監控心跳
    String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
    return JacksonUtils.toObj(result);
}

總結:

服務的自動注冊其實是利用了SpringWebServerInitializedEvent事件,最終由namingService完成服務注冊工作。

當容器啟動完成,觸發事件,開始執行register服務,發起服務注冊請求的同時添加一個心跳檢測任務。

5.服務注冊配置類加載

Spring Boot中有一種非常解耦的擴展機制:Spring Factories。這種擴展機制實際上是仿照Java中的SPI擴展機制來實現的

在Spring中也有一種類似與Java SPI的加載機制。它在META-INF/spring.factories文件中配置接口的實現類名稱,然后在程序中讀取這些配置文件並實例化。
這種自定義的SPI機制是Spring Boot Starter實現的基礎。

SpringBoot啟動的時候會讀取所有jar包下面的META-INF/spring.factories文件; 並且將文件中的 接口/抽象類 對應的實現類都對應起來,並在需要的時候可以實例化對應的實現類

 

5.1 NacosServiceRegistryAutoConfiguration

這個配置類實例化了 NacosServiceRegistryNacosRegistrationNacosAutoServiceRegistration這三個 bean。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
        matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
        AutoServiceRegistrationAutoConfiguration.class,
        NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

    @Bean
    public NacosServiceRegistry nacosServiceRegistry(
            NacosDiscoveryProperties nacosDiscoveryProperties) {
        return new NacosServiceRegistry(nacosDiscoveryProperties);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosRegistration nacosRegistration(
            ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
            NacosDiscoveryProperties nacosDiscoveryProperties,
            ApplicationContext context) {
        return new NacosRegistration(registrationCustomizers.getIfAvailable(),
                nacosDiscoveryProperties, context);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosAutoServiceRegistration nacosAutoServiceRegistration(
            NacosServiceRegistry registry,
            AutoServiceRegistrationProperties autoServiceRegistrationProperties,
            NacosRegistration registration) {
        return new NacosAutoServiceRegistration(registry,
                autoServiceRegistrationProperties, registration);
    }
}

1. @EnableConfigurationProperties 可以讓用了@ConfigurationProperties注解但是沒有使用@Component等注解實例化為Bean的類生效,因為沒有@Component注解,Spring容器不會實例化這個類,通過@EnableConfigurationProperties注解讓NacosdiscoveryProperties被實例化並注入到容器中。

2. @ConditionalOnNacosDiscoveryEnabled 是當spring.cloud.nacos.discovery.enabled=true時當前配置類才會生效。

3. @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",matchIfMissing = true)是當spring.cloud.service-registry.auto-registration.enabled=true時當前配置類才會生效。

4. @AutoConfigureAfter(***) 是當括號類的配置類加載生效后再加載當前配置類。

5.2  NacosDiscoveryEndpointAutoConfiguration

這個配置類實例化NamingService。

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Endpoint.class)
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryEndpointAutoConfiguration {

    @Bean
    ...

    @Bean
    @ConditionalOnEnabledHealthIndicator("nacos-discovery")
    public HealthIndicator nacosDiscoveryHealthIndicator(
            NacosServiceManager nacosServiceManager,
            NacosDiscoveryProperties nacosDiscoveryProperties) {
        Properties nacosProperties = nacosDiscoveryProperties.getNacosProperties();
        return new NacosDiscoveryHealthIndicator(
                nacosServiceManager.getNamingService(nacosProperties));
    }
}

NacosServiceManager類下:
public NamingService getNamingService(Properties properties) {
    if (Objects.isNull(this.namingService)) {
        buildNamingService(properties);
    }
    return namingService;
}
private NamingService buildNamingService(Properties properties) {
    if (Objects.isNull(namingService)) {
        synchronized (NacosServiceManager.class) {
            if (Objects.isNull(namingService)) {
                namingService = createNewNamingService(properties);
            }
        }
    }
    return namingService;
}
private NamingService createNewNamingService(Properties properties) {
    try {
        return createNamingService(properties);
    }
    ...
}
NacosFactory類下:
public static NamingService createNamingService(Properties properties) throws NacosException {
    return NamingFactory.createNamingService(properties);
}
NamingFactory類下:
public static NamingService createNamingService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
        // 通過反射機制創建NamingService的對象
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        NamingService vendorImpl = (NamingService) constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}
NacosNamingService類下:
public NacosNamingService(Properties properties) throws NacosException {
    init(properties);
}
private void init(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    //初始化命名空間,注冊中心地址,緩存,日志等
    this.namespace = InitUtils.initNamespaceForNaming(properties);
    InitUtils.initSerialization();
    initServerAddr(properties);
    InitUtils.initWebRootContext();
    initCacheDir();
    initLogName(properties);
    //初始化事件分發器,服務代理,心跳機制等
    this.eventDispatcher = new EventDispatcher();
    this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
    this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
    this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir,
            isLoadCacheAtStart(properties), initPollingThreadCount(properties));
}

6.服務端服務注冊

流程圖參考:https://www.processon.com/view/5e25b762e4b04579e409e81f

服務端簡單總結:

1.當客戶端發起服務注冊請求時,將服務添加進本地緩存,並建立心跳檢測

2.保存實例時通過實例名稱的前綴來判斷是臨時實例還是永久實例。

3.如果是臨時實例,通過阻塞隊列將實例更新到內存注冊表,然后又通過阻塞隊列將實例信息異步批量同步到集群其他節點

4.如果是永久實例,判斷當前節點是不是Leader,如果不是,則將服務注冊請求轉發到Leader節點,如果是Leader,將實例信息到異步更新內存注冊表和同步寫入文件,然后將實例信息同步到集群其他節點。

naming項目下的 InstanceController類

@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    final Instance instance = parseInstance(request);
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    // ①創建1個空服務
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    // 從本地緩存serviceMap中獲取服務對象
    Service service = getService(namespaceId, serviceName);
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
    // ②服務注冊
 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

①: createEmptyService()方法

// 如果服務不存在,創建一個服務
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
        throws NacosException {
    Service service = getService(namespaceId, serviceName);
    if (service == null) {
        // 如果服務不存在,創建一個空的服務
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        service.recalculateChecksum();
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        service.validate();
        // 將創建的空的服務添加進本地緩存,並初始化
 putServiceAndInit(service);
        if (!local) {
            addOrReplaceService(service);
        }
    }
}
private void putServiceAndInit(Service service) throws NacosException {
    // 將服務加入serviceMap緩存
 putService(service); // 建立心跳檢測任務機制,默認五秒
 service.init(); // 實現數據一致性的監聽
    consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
public void putService(Service service) {
    if (!serviceMap.containsKey(service.getNamespaceId())) {
        synchronized (putServiceLock) {
            if (!serviceMap.containsKey(service.getNamespaceId())) {
                serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
            }
        }
    }
    serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
//建立心跳檢測任務clientBeatCheckTask,默認五秒
public void init() { HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); } }

心跳定時任務

@Override
public void run() {
    try {
        ...省略
        List<Instance> instances = service.allIPs(true);
        // 設置實例的運行狀況
        for (Instance instance : instances) {
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
                    if (instance.isHealthy()) {
                        instance.setHealthy(false);
                        getPushService().serviceChanged(service);
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }
        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }
        // 刪除過期的實例
        for (Instance instance : instances) {
            if (instance.isMarked()) {
                continue;
            }
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                // 刪除實例
 deleteIp(instance);
            }
        }
    } catch (Exception e) {
        Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    }
}

addInstance()方法

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException {
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    Service service = getService(namespaceId, serviceName);
    synchronized (service) {
        // 比較並獲取新的實例列表
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
        // 保存服務實例
 consistencyService.put(key, instances);
    }
}

區分是臨時實例還是永久實例

public void put(String key, Record value) throws NacosException {
    mapConsistencyService(key).put(key, value);
}
//通過判斷key的前綴字符串來判斷
private ConsistencyService mapConsistencyService(String key) { return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }

臨時實例ephemeralConsistencyService.put()

@PostConstruct
public void init() {
    //提交通知任務
    GlobalExecutor.submitDistroNotifyTask(notifier);
}

@Override
public void put(String key, Record value) throws NacosException {
    //①注冊實例
 onPut(key, value);
    //②同步實例到其他節點
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
            globalConfig.getTaskDispatchPeriod() / 2);
}

①注冊實例更新到緩存

public void onPut(String key, Record value) {
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        // 保存命名數據
        dataStore.put(key, datum);
    }
    if (!listeners.containsKey(key)) {
        return;
    }
    //添加任務
 notifier.addTask(key, DataOperation.CHANGE);
}
public void addTask(String datumKey, DataOperation action) {
    if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
        return;
    }
    if (action == DataOperation.CHANGE) {
        services.put(datumKey, StringUtils.EMPTY);
    }
    tasks.offer(Pair.with(datumKey, action));
}
// task.run調用
private void handle(Pair<String, DataOperation> pair) {
    try {
        ...省略
        for (RecordListener listener : listeners.get(datumKey)) {
            count++;
            try {
                if (action == DataOperation.CHANGE) {
                    listener.onChange(datumKey, dataStore.get(datumKey).value);
                    continue;
                }
                if (action == DataOperation.DELETE) {
                    listener.onDelete(datumKey);
                    continue;
                }
            ...
        }
        ...
    } ...
}
@Override
public void onChange(String key, Instances value) throws Exception {
    ...
   //更新實例,運用CopyAndWrite updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
  //重新計算校驗 recalculateChecksum(); }

②同步實例到所有遠程服務器

異步批量同步

public void sync(DistroKey distroKey, DataOperation action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                each.getAddress());
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
        }
    }
}

永久實例RaftConsistencyServiceImpl

public void put(String key, Record value) throws NacosException {
    checkIsStopWork();
    try {
        raftCore.signalPublish(key, value);
    } catch (Exception e) {
        ...
    }
}
public void signalPublish(String key, Record value) throws Exception {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    //當前節點不是 Leader
    if (!isLeader()) {
        ObjectNode params = JacksonUtils.createEmptyJsonNode();
        params.put("key", key);
        params.replace("value", JacksonUtils.transferToJsonNode(value));
        Map<String, String> parameters = new HashMap<>(1);
        parameters.put("key", key);
        final RaftPeer leader = getLeader();
        // 將注冊請求轉發到集群的 Leader節點
        raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
        return;
    }
    //當前節點是 Leader
    OPERATE_LOCK.lock();
    try {
        final long start = System.currentTimeMillis();
        final Datum datum = new Datum();
        datum.key = key;
        datum.value = value;
        if (getDatum(key) == null) {
            datum.timestamp.set(1L);
        } else {
            datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
        }
        ObjectNode json = JacksonUtils.createEmptyJsonNode();
        json.replace("datum", JacksonUtils.transferToJsonNode(datum));
        json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
        // 開始發布
 onPublish(datum, peers.local());
        final String content = json.toString();
        // 利用 CountDownLatch 實現過半
        final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
        for (final String server : peers.allServersIncludeMyself()) {
            if (isLeader(server)) {
                latch.countDown();
                continue;
            }
            final String url = buildUrl(server, API_ON_PUB);
            // 同步實例信息給集群節點
            HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
                @Override
                public void onReceive(RestResult<String> result) {
                    if (!result.ok()) {
                        ...
                        return;
                    }
                    latch.countDown();
                }
                @Override
                public void onError(Throwable throwable) {
                    Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
                }
                @Override
                public void onCancel() {
                }
            });
        }
        if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
            ...
        }

        long end = System.currentTimeMillis();
        Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
    } finally {
        OPERATE_LOCK.unlock();
    }
}

public void onPublish(Datum datum, RaftPeer source) throws Exception {
    if (stopWork) {
        throw new IllegalStateException("old raft protocol already stop work");
    }
    RaftPeer local = peers.local();
    if (datum.value == null) {
        ...
    }
    if (!peers.isLeader(source.ip)) {
        ...
    }
    if (source.term.get() < local.term.get()) {
        ...
    }
    local.resetLeaderDue();
    // 持久化數據,寫入文件
    if (KeyBuilder.matchPersistentKey(datum.key)) {
        raftStore.write(datum);
    }
    datums.put(datum.key, datum);
    if (isLeader()) {
        local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
    } else {
        if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
            //set leader term:
            getLeader().term.set(source.term.get());
            local.term.set(getLeader().term.get());
        } else {
            local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
        }
    }
    raftStore.updateTerm(local.term.get());
    // 發布數據更新事件
 NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
    Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}

一致性監聽這里采用了委派機制 ,同時執行了 Distro 和 Raft 兩個實現類。由此可以說明Nacos同時使用 Raft 協議和 Distro 協議維護數據一致性的。

@Override
public void listen(String key, RecordListener listener) throws NacosException { // this special key is listened by both: if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) { persistentConsistencyService.listen(key, listener); ephemeralConsistencyService.listen(key, listener); return; } mapConsistencyService(key).listen(key, listener); }

7.啟動加載

@EnableDiscoveryClient
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
    //默認自動將本服務注冊到服務注冊中心
    boolean autoRegister() default true;
}

@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableDiscoveryClientImportSelector
        extends SpringFactoryImportSelector<EnableDiscoveryClient> {
    @Override
    public String[] selectImports(AnnotationMetadata metadata) {
        String[] imports = super.selectImports(metadata);
        AnnotationAttributes attributes = AnnotationAttributes.fromMap(
                metadata.getAnnotationAttributes(getAnnotationClass().getName(), true));
        // 獲取@EnableDiscoveryClient注解的屬性autoRegister的值
        boolean autoRegister = attributes.getBoolean("autoRegister");
        if (autoRegister) {
            // 加載AutoServiceRegistrationConfiguration配置類
            List<String> importsList = new ArrayList<>(Arrays.asList(imports));
            importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
            imports = importsList.toArray(new String[0]);
        }
        else {
            // 設置spring.cloud.service-registry.auto-registration.enabled=false, 關閉服務自動注冊功能
            Environment env = getEnvironment();
            if (ConfigurableEnvironment.class.isInstance(env)) {
                ConfigurableEnvironment configEnv = (ConfigurableEnvironment) env;
                LinkedHashMap<String, Object> map = new LinkedHashMap<>();
                map.put("spring.cloud.service-registry.auto-registration.enabled", false);
                MapPropertySource propertySource = new MapPropertySource(
                        "springCloudDiscoveryClient", map);
                configEnv.getPropertySources().addLast(propertySource);
            }
        }
        return imports;
    }
    ...省略
}

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public class AutoServiceRegistrationConfiguration {
}

@ConfigurationProperties("spring.cloud.service-registry.auto-registration")
public class AutoServiceRegistrationProperties {
    // 是否啟用服務自動注冊。默認值為true
    private boolean enabled = true;
    // 是否將管理注冊為服務。默認值為true
    private boolean registerManagement = true;
    // 如果沒有自動注冊,啟動是否失敗。默認值為假
    private boolean failFast = false;
    ...省略
}

綜上,沒有使用@EnableDiscoveryClient注解,服務也會被自動注冊,默認自動注冊為true。

如果不想自動注冊服務,可以通過

@EnableDiscoveryClient(autoRegister = false)
或者
spring.cloud.service-registry.auto-registration.enabled=false

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM