1.客户端使用
compile 'com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-discovery:2.2.3.RELEASE' compile 'com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-config:2.2.3.RELEASE'
bootstrap.ym中
spring: application: name: product cloud: nacos: discovery: server-addr: 127.0.0.1:8848 service: ${spring.application.name} namespace: 038b8be8-54da-44a5-9664-def33bc8cd19 group: DEFAULT_GROUP config: server-addr: 127.0.0.1:8848 namespace: 038b8be8-54da-44a5-9664-def33bc8cd19 group: DEFAULT_GROUP prefix: ${spring.application.name}
@EnableDiscoveryClient @SpringBootApplication public class ProductApplication { public static void main(String[] args) { SpringApplication.run(ProductApplication.class, args); } }
2.服务端代码
git clone https://github.com/alibaba/nacos.git
将源码导入到idea,然后打开console项目中的application.properties配置文件
将db开头的配置放开
找到项目distribution\conf下的SQL文件nacos-mysql.sql,导入数据库
在C:\Users\yue\nacos目录下新建conf文件夹,文件夹下新建一个配置文件cluster.conf
启动Nacos时设置VM参数
-Dnscos.standalone=true -Dnacos.home=C:\Users\yue\nacos -Dserver.port=9000
启动成功后访问地址http://127.0.0.1:9000/nacos/index.html,账号密码默认nacos
3. Spring服务发现的统一规范
Spring将这套规范定义在Spring Cloud Common中
- circuitbreaker包下定义了断路器的规范
- discovery包下面定义了服务发现的规范
- loadbalancer包下面定义了负载均衡的规范
- serviceregistry包下面定义了服务注册的规范
serviceregistry包定义了三个核心接口,用来实现服务注册。
AutoServiceRegistration:用于服务自动注册
Registration:用于存储服务信息
ServiceRegistry:用于注册/移除服务
4.Nacos客户端服务注册实现
Nacos服务注册模块按照Spring Cloud的规范实现这三个接口
4.1NacosAutoServiceRegistration
这个类实现了服务自动注册到Nacos注册中心的功能,它继承AbstractAutoServiceRegistration类。
public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> { ... }
AbstractAutoServiceRegistration实现AutoServiceRegistration接口来达到服务自动注册,实现ApplicationListener<WebServerInitializedEvent>接口来实现事件回调;当容器启动,应用上下文被刷新且程序准备就绪之后会触发WebServerInitializedEvent事件,调用onApplicationEvent()方法。
@Override @SuppressWarnings("deprecation") public void onApplicationEvent(WebServerInitializedEvent event) { bind(event); } @Deprecated public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals(((ConfigurableWebServerApplicationContext) context) .getServerNamespace())) { return; } } this.port.compareAndSet(0, event.getWebServer().getPort()); this.start(); } public void start() { if (!isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } return; } if (!this.running.get()) { this.context.publishEvent( new InstancePreRegisteredEvent(this, getRegistration())); register(); if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent( new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); } } protected void register() { //实际上最终是调用serviceRegistry.register方法 this.serviceRegistry.register(getRegistration()); }
4.2NacosServiceRegistry
@Override public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); Instance instance = getNacosInstanceFromRegistration(registration); try { // 最终由namingService实现服务注册 namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); rethrowRuntimeException(e); } }
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
//添加定时心跳任务 beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); } public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); final Map<String, String> params = new HashMap<String, String>(16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); //最终使用http请求注册服务 reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); }
public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); //添加心跳任务 executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString()); } Map<String, String> params = new HashMap<String, String>(8); Map<String, String> bodyMap = new HashMap<String, String>(2); if (!lightBeatEnabled) { bodyMap.put("beat", JacksonUtils.toJson(beatInfo)); } params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster()); params.put("ip", beatInfo.getIp()); params.put("port", String.valueOf(beatInfo.getPort())); //调用http来监控心跳 String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT); return JacksonUtils.toObj(result); }
总结:
服务的自动注册其实是利用了Spring
的WebServerInitializedEvent
事件,最终由namingService
完成服务注册工作。
当容器启动完成,触发事件,开始执行register服务,发起服务注册请求的同时添加一个心跳检测任务。
5.服务注册配置类加载
Spring Boot中有一种非常解耦的扩展机制:Spring Factories。这种扩展机制实际上是仿照Java中的SPI扩展机制来实现的
这种自定义的SPI机制是Spring Boot Starter实现的基础。
SpringBoot启动的时候会读取所有jar包下面的META-INF/spring.factories
文件; 并且将文件中的 接口/抽象类 对应的实现类都对应起来,并在需要的时候可以实例化对应的实现类

5.1 NacosServiceRegistryAutoConfiguration
这个配置类实例化了NacosServiceRegistry
、
NacosRegistration
、
NacosAutoServiceRegistration
这三个
bean。
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnNacosDiscoveryEnabled @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class }) public class NacosServiceRegistryAutoConfiguration { @Bean public NacosServiceRegistry nacosServiceRegistry( NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration( ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); } }
1. @EnableConfigurationProperties 可以让用了@ConfigurationProperties注解但是没有使用@Component等注解实例化为Bean的类生效,因为没有@Component注解,Spring容器不会实例化这个类,通过@EnableConfigurationProperties注解让NacosdiscoveryProperties被实例化并注入到容器中。
2. @ConditionalOnNacosDiscoveryEnabled 是当spring.cloud.nacos.discovery.enabled=true时当前配置类才会生效。
3. @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",matchIfMissing = true)是当spring.cloud.service-registry.auto-registration.enabled=true时当前配置类才会生效。
4. @AutoConfigureAfter(***) 是当括号类的配置类加载生效后再加载当前配置类。
5.2 NacosDiscoveryEndpointAutoConfiguration
这个配置类实例化NamingService。
@Configuration(proxyBeanMethods = false) @ConditionalOnClass(Endpoint.class) @ConditionalOnNacosDiscoveryEnabled public class NacosDiscoveryEndpointAutoConfiguration { @Bean ... @Bean @ConditionalOnEnabledHealthIndicator("nacos-discovery") public HealthIndicator nacosDiscoveryHealthIndicator( NacosServiceManager nacosServiceManager, NacosDiscoveryProperties nacosDiscoveryProperties) { Properties nacosProperties = nacosDiscoveryProperties.getNacosProperties(); return new NacosDiscoveryHealthIndicator( nacosServiceManager.getNamingService(nacosProperties)); } } NacosServiceManager类下: public NamingService getNamingService(Properties properties) { if (Objects.isNull(this.namingService)) { buildNamingService(properties); } return namingService; } private NamingService buildNamingService(Properties properties) { if (Objects.isNull(namingService)) { synchronized (NacosServiceManager.class) { if (Objects.isNull(namingService)) { namingService = createNewNamingService(properties); } } } return namingService; } private NamingService createNewNamingService(Properties properties) { try { return createNamingService(properties); } ... } NacosFactory类下: public static NamingService createNamingService(Properties properties) throws NacosException { return NamingFactory.createNamingService(properties); } NamingFactory类下: public static NamingService createNamingService(Properties properties) throws NacosException { try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService"); // 通过反射机制创建NamingService的对象 Constructor constructor = driverImplClass.getConstructor(Properties.class); NamingService vendorImpl = (NamingService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } } NacosNamingService类下: public NacosNamingService(Properties properties) throws NacosException { init(properties); } private void init(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); //初始化命名空间,注册中心地址,缓存,日志等 this.namespace = InitUtils.initNamespaceForNaming(properties); InitUtils.initSerialization(); initServerAddr(properties); InitUtils.initWebRootContext(); initCacheDir(); initLogName(properties); //初始化事件分发器,服务代理,心跳机制等 this.eventDispatcher = new EventDispatcher(); this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties); this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties)); this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties)); }
6.服务端服务注册
流程图参考:https://www.processon.com/view/5e25b762e4b04579e409e81f
服务端简单总结:
1.当客户端发起服务注册请求时,将服务添加进本地缓存,并建立心跳检测
2.保存实例时通过实例名称的前缀来判断是临时实例还是永久实例。
3.如果是临时实例,通过阻塞队列将实例更新到内存注册表,然后又通过阻塞队列将实例信息异步批量同步到集群其他节点
4.如果是永久实例,判断当前节点是不是Leader,如果不是,则将服务注册请求转发到Leader节点,如果是Leader,将实例信息到异步更新内存注册表和同步写入文件,然后将实例信息同步到集群其他节点。
naming项目下的 InstanceController类
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); final Instance instance = parseInstance(request); serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; } public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { // ①创建1个空服务 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); // 从本地缓存serviceMap中获取服务对象 Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } // ②服务注册 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
①: createEmptyService()方法
// 如果服务不存在,创建一个服务 public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { Service service = getService(namespaceId, serviceName); if (service == null) { // 如果服务不存在,创建一个空的服务 Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); // 将创建的空的服务添加进本地缓存,并初始化 putServiceAndInit(service); if (!local) { addOrReplaceService(service); } } } private void putServiceAndInit(Service service) throws NacosException { // 将服务加入serviceMap缓存 putService(service); // 建立心跳检测任务机制,默认五秒 service.init(); // 实现数据一致性的监听 consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }
public void putService(Service service) { if (!serviceMap.containsKey(service.getNamespaceId())) { synchronized (putServiceLock) { if (!serviceMap.containsKey(service.getNamespaceId())) { serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>()); } } } serviceMap.get(service.getNamespaceId()).put(service.getName(), service); }
//建立心跳检测任务clientBeatCheckTask,默认五秒 public void init() { HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); } }
心跳定时任务
@Override public void run() { try { ...省略 List<Instance> instances = service.allIPs(true); // 设置实例的运行状况 for (Instance instance : instances) { if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { instance.setHealthy(false); getPushService().serviceChanged(service); ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } } if (!getGlobalConfig().isExpireInstance()) { return; } // 删除过期的实例 for (Instance instance : instances) { if (instance.isMarked()) { continue; } if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // 删除实例 deleteIp(instance); } } } catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); } }
②addInstance()方法
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { // 比较并获取新的实例列表 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); // 保存服务实例 consistencyService.put(key, instances); } }
区分是临时实例还是永久实例
public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); }
//通过判断key的前缀字符串来判断 private ConsistencyService mapConsistencyService(String key) { return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }
临时实例ephemeralConsistencyService.put()
@PostConstruct public void init() { //提交通知任务 GlobalExecutor.submitDistroNotifyTask(notifier); } @Override public void put(String key, Record value) throws NacosException { //①注册实例 onPut(key, value); //②同步实例到其他节点 distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); }
①注册实例更新到缓存
public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); // 保存命名数据 dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } //添加任务 notifier.addTask(key, DataOperation.CHANGE); }
public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } tasks.offer(Pair.with(datumKey, action)); } // task.run调用 private void handle(Pair<String, DataOperation> pair) { try { ...省略 for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == DataOperation.CHANGE) { listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue; } ... } ... } ... }
@Override public void onChange(String key, Instances value) throws Exception { ...
//更新实例,运用CopyAndWrite updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
//重新计算校验 recalculateChecksum(); }
②同步实例到所有远程服务器
异步批量同步
public void sync(DistroKey distroKey, DataOperation action, long delay) { for (Member each : memberManager.allMembersWithoutSelf()) { DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress()); DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress()); } } }
永久实例RaftConsistencyServiceImpl
public void put(String key, Record value) throws NacosException { checkIsStopWork(); try { raftCore.signalPublish(key, value); } catch (Exception e) { ... } }
public void signalPublish(String key, Record value) throws Exception { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } //当前节点不是 Leader if (!isLeader()) { ObjectNode params = JacksonUtils.createEmptyJsonNode(); params.put("key", key); params.replace("value", JacksonUtils.transferToJsonNode(value)); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); final RaftPeer leader = getLeader(); // 将注册请求转发到集群的 Leader节点 raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters); return; } //当前节点是 Leader OPERATE_LOCK.lock(); try { final long start = System.currentTimeMillis(); final Datum datum = new Datum(); datum.key = key; datum.value = value; if (getDatum(key) == null) { datum.timestamp.set(1L); } else { datum.timestamp.set(getDatum(key).timestamp.incrementAndGet()); } ObjectNode json = JacksonUtils.createEmptyJsonNode(); json.replace("datum", JacksonUtils.transferToJsonNode(datum)); json.replace("source", JacksonUtils.transferToJsonNode(peers.local())); // 开始发布 onPublish(datum, peers.local()); final String content = json.toString(); // 利用 CountDownLatch 实现过半 final CountDownLatch latch = new CountDownLatch(peers.majorityCount()); for (final String server : peers.allServersIncludeMyself()) { if (isLeader(server)) { latch.countDown(); continue; } final String url = buildUrl(server, API_ON_PUB); // 同步实例信息给集群节点 HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { ... return; } latch.countDown(); } @Override public void onError(Throwable throwable) { Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable); } @Override public void onCancel() { } }); } if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) { ... } long end = System.currentTimeMillis(); Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key); } finally { OPERATE_LOCK.unlock(); } } public void onPublish(Datum datum, RaftPeer source) throws Exception { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } RaftPeer local = peers.local(); if (datum.value == null) { ... } if (!peers.isLeader(source.ip)) { ... } if (source.term.get() < local.term.get()) { ... } local.resetLeaderDue(); // 持久化数据,写入文件 if (KeyBuilder.matchPersistentKey(datum.key)) { raftStore.write(datum); } datums.put(datum.key, datum); if (isLeader()) { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } else { if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) { //set leader term: getLeader().term.set(source.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } } raftStore.updateTerm(local.term.get()); // 发布数据更新事件 NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build()); Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term); }
一致性监听这里采用了委派机制 ,同时执行了 Distro 和 Raft 两个实现类。由此可以说明Nacos同时使用 Raft 协议和 Distro 协议维护数据一致性的。
@Override
public void listen(String key, RecordListener listener) throws NacosException { // this special key is listened by both: if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) { persistentConsistencyService.listen(key, listener); ephemeralConsistencyService.listen(key, listener); return; } mapConsistencyService(key).listen(key, listener); }
7.启动加载
@EnableDiscoveryClient @Import(EnableDiscoveryClientImportSelector.class) public @interface EnableDiscoveryClient { //默认自动将本服务注册到服务注册中心 boolean autoRegister() default true; } @Order(Ordered.LOWEST_PRECEDENCE - 100) public class EnableDiscoveryClientImportSelector extends SpringFactoryImportSelector<EnableDiscoveryClient> { @Override public String[] selectImports(AnnotationMetadata metadata) { String[] imports = super.selectImports(metadata); AnnotationAttributes attributes = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes(getAnnotationClass().getName(), true)); // 获取@EnableDiscoveryClient注解的属性autoRegister的值 boolean autoRegister = attributes.getBoolean("autoRegister"); if (autoRegister) { // 加载AutoServiceRegistrationConfiguration配置类 List<String> importsList = new ArrayList<>(Arrays.asList(imports)); importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration"); imports = importsList.toArray(new String[0]); } else { // 设置spring.cloud.service-registry.auto-registration.enabled=false, 关闭服务自动注册功能 Environment env = getEnvironment(); if (ConfigurableEnvironment.class.isInstance(env)) { ConfigurableEnvironment configEnv = (ConfigurableEnvironment) env; LinkedHashMap<String, Object> map = new LinkedHashMap<>(); map.put("spring.cloud.service-registry.auto-registration.enabled", false); MapPropertySource propertySource = new MapPropertySource( "springCloudDiscoveryClient", map); configEnv.getPropertySources().addLast(propertySource); } } return imports; } ...省略 } @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties(AutoServiceRegistrationProperties.class) @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public class AutoServiceRegistrationConfiguration { } @ConfigurationProperties("spring.cloud.service-registry.auto-registration") public class AutoServiceRegistrationProperties { // 是否启用服务自动注册。默认值为true private boolean enabled = true; // 是否将管理注册为服务。默认值为true private boolean registerManagement = true; // 如果没有自动注册,启动是否失败。默认值为假 private boolean failFast = false; ...省略 }
综上,没有使用@EnableDiscoveryClient注解,服务也会被自动注册,默认自动注册为true。
如果不想自动注册服务,可以通过
@EnableDiscoveryClient(autoRegister = false) 或者 spring.cloud.service-registry.auto-registration.enabled=false