1.客戶端使用
compile 'com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-discovery:2.2.3.RELEASE' compile 'com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-config:2.2.3.RELEASE'
bootstrap.ym中
spring: application: name: product cloud: nacos: discovery: server-addr: 127.0.0.1:8848 service: ${spring.application.name} namespace: 038b8be8-54da-44a5-9664-def33bc8cd19 group: DEFAULT_GROUP config: server-addr: 127.0.0.1:8848 namespace: 038b8be8-54da-44a5-9664-def33bc8cd19 group: DEFAULT_GROUP prefix: ${spring.application.name}
@EnableDiscoveryClient @SpringBootApplication public class ProductApplication { public static void main(String[] args) { SpringApplication.run(ProductApplication.class, args); } }
2.服務端代碼
git clone https://github.com/alibaba/nacos.git
將源碼導入到idea,然后打開console項目中的application.properties配置文件
將db開頭的配置放開
找到項目distribution\conf下的SQL文件nacos-mysql.sql,導入數據庫
在C:\Users\yue\nacos目錄下新建conf文件夾,文件夾下新建一個配置文件cluster.conf
啟動Nacos時設置VM參數
-Dnscos.standalone=true -Dnacos.home=C:\Users\yue\nacos -Dserver.port=9000
啟動成功后訪問地址http://127.0.0.1:9000/nacos/index.html,賬號密碼默認nacos
3. Spring服務發現的統一規范
Spring將這套規范定義在Spring Cloud Common中
- circuitbreaker包下定義了斷路器的規范
- discovery包下面定義了服務發現的規范
- loadbalancer包下面定義了負載均衡的規范
- serviceregistry包下面定義了服務注冊的規范
serviceregistry包定義了三個核心接口,用來實現服務注冊。
AutoServiceRegistration:用於服務自動注冊
Registration:用於存儲服務信息
ServiceRegistry:用於注冊/移除服務
4.Nacos客戶端服務注冊實現
Nacos服務注冊模塊按照Spring Cloud的規范實現這三個接口
4.1NacosAutoServiceRegistration
這個類實現了服務自動注冊到Nacos注冊中心的功能,它繼承AbstractAutoServiceRegistration類。
public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> { ... }
AbstractAutoServiceRegistration實現AutoServiceRegistration接口來達到服務自動注冊,實現ApplicationListener<WebServerInitializedEvent>接口來實現事件回調;當容器啟動,應用上下文被刷新且程序准備就緒之后會觸發WebServerInitializedEvent事件,調用onApplicationEvent()方法。
@Override @SuppressWarnings("deprecation") public void onApplicationEvent(WebServerInitializedEvent event) { bind(event); } @Deprecated public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals(((ConfigurableWebServerApplicationContext) context) .getServerNamespace())) { return; } } this.port.compareAndSet(0, event.getWebServer().getPort()); this.start(); } public void start() { if (!isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } return; } if (!this.running.get()) { this.context.publishEvent( new InstancePreRegisteredEvent(this, getRegistration())); register(); if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent( new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); } } protected void register() { //實際上最終是調用serviceRegistry.register方法 this.serviceRegistry.register(getRegistration()); }
4.2NacosServiceRegistry
@Override public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); Instance instance = getNacosInstanceFromRegistration(registration); try { // 最終由namingService實現服務注冊 namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); rethrowRuntimeException(e); } }
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
//添加定時心跳任務 beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); } public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); final Map<String, String> params = new HashMap<String, String>(16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); //最終使用http請求注冊服務 reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); }
public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); //添加心跳任務 executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString()); } Map<String, String> params = new HashMap<String, String>(8); Map<String, String> bodyMap = new HashMap<String, String>(2); if (!lightBeatEnabled) { bodyMap.put("beat", JacksonUtils.toJson(beatInfo)); } params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster()); params.put("ip", beatInfo.getIp()); params.put("port", String.valueOf(beatInfo.getPort())); //調用http來監控心跳 String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT); return JacksonUtils.toObj(result); }
總結:
服務的自動注冊其實是利用了Spring
的WebServerInitializedEvent
事件,最終由namingService
完成服務注冊工作。
當容器啟動完成,觸發事件,開始執行register服務,發起服務注冊請求的同時添加一個心跳檢測任務。
5.服務注冊配置類加載
Spring Boot中有一種非常解耦的擴展機制:Spring Factories。這種擴展機制實際上是仿照Java中的SPI擴展機制來實現的
這種自定義的SPI機制是Spring Boot Starter實現的基礎。
SpringBoot啟動的時候會讀取所有jar包下面的META-INF/spring.factories
文件; 並且將文件中的 接口/抽象類 對應的實現類都對應起來,並在需要的時候可以實例化對應的實現類

5.1 NacosServiceRegistryAutoConfiguration
這個配置類實例化了NacosServiceRegistry
、
NacosRegistration
、
NacosAutoServiceRegistration
這三個
bean。
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnNacosDiscoveryEnabled @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class }) public class NacosServiceRegistryAutoConfiguration { @Bean public NacosServiceRegistry nacosServiceRegistry( NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration( ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); } }
1. @EnableConfigurationProperties 可以讓用了@ConfigurationProperties注解但是沒有使用@Component等注解實例化為Bean的類生效,因為沒有@Component注解,Spring容器不會實例化這個類,通過@EnableConfigurationProperties注解讓NacosdiscoveryProperties被實例化並注入到容器中。
2. @ConditionalOnNacosDiscoveryEnabled 是當spring.cloud.nacos.discovery.enabled=true時當前配置類才會生效。
3. @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",matchIfMissing = true)是當spring.cloud.service-registry.auto-registration.enabled=true時當前配置類才會生效。
4. @AutoConfigureAfter(***) 是當括號類的配置類加載生效后再加載當前配置類。
5.2 NacosDiscoveryEndpointAutoConfiguration
這個配置類實例化NamingService。
@Configuration(proxyBeanMethods = false) @ConditionalOnClass(Endpoint.class) @ConditionalOnNacosDiscoveryEnabled public class NacosDiscoveryEndpointAutoConfiguration { @Bean ... @Bean @ConditionalOnEnabledHealthIndicator("nacos-discovery") public HealthIndicator nacosDiscoveryHealthIndicator( NacosServiceManager nacosServiceManager, NacosDiscoveryProperties nacosDiscoveryProperties) { Properties nacosProperties = nacosDiscoveryProperties.getNacosProperties(); return new NacosDiscoveryHealthIndicator( nacosServiceManager.getNamingService(nacosProperties)); } } NacosServiceManager類下: public NamingService getNamingService(Properties properties) { if (Objects.isNull(this.namingService)) { buildNamingService(properties); } return namingService; } private NamingService buildNamingService(Properties properties) { if (Objects.isNull(namingService)) { synchronized (NacosServiceManager.class) { if (Objects.isNull(namingService)) { namingService = createNewNamingService(properties); } } } return namingService; } private NamingService createNewNamingService(Properties properties) { try { return createNamingService(properties); } ... } NacosFactory類下: public static NamingService createNamingService(Properties properties) throws NacosException { return NamingFactory.createNamingService(properties); } NamingFactory類下: public static NamingService createNamingService(Properties properties) throws NacosException { try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService"); // 通過反射機制創建NamingService的對象 Constructor constructor = driverImplClass.getConstructor(Properties.class); NamingService vendorImpl = (NamingService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } } NacosNamingService類下: public NacosNamingService(Properties properties) throws NacosException { init(properties); } private void init(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); //初始化命名空間,注冊中心地址,緩存,日志等 this.namespace = InitUtils.initNamespaceForNaming(properties); InitUtils.initSerialization(); initServerAddr(properties); InitUtils.initWebRootContext(); initCacheDir(); initLogName(properties); //初始化事件分發器,服務代理,心跳機制等 this.eventDispatcher = new EventDispatcher(); this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties); this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties)); this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties)); }
6.服務端服務注冊
流程圖參考:https://www.processon.com/view/5e25b762e4b04579e409e81f
服務端簡單總結:
1.當客戶端發起服務注冊請求時,將服務添加進本地緩存,並建立心跳檢測
2.保存實例時通過實例名稱的前綴來判斷是臨時實例還是永久實例。
3.如果是臨時實例,通過阻塞隊列將實例更新到內存注冊表,然后又通過阻塞隊列將實例信息異步批量同步到集群其他節點
4.如果是永久實例,判斷當前節點是不是Leader,如果不是,則將服務注冊請求轉發到Leader節點,如果是Leader,將實例信息到異步更新內存注冊表和同步寫入文件,然后將實例信息同步到集群其他節點。
naming項目下的 InstanceController類
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); final Instance instance = parseInstance(request); serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; } public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { // ①創建1個空服務 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); // 從本地緩存serviceMap中獲取服務對象 Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } // ②服務注冊 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
①: createEmptyService()方法
// 如果服務不存在,創建一個服務 public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { Service service = getService(namespaceId, serviceName); if (service == null) { // 如果服務不存在,創建一個空的服務 Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); // 將創建的空的服務添加進本地緩存,並初始化 putServiceAndInit(service); if (!local) { addOrReplaceService(service); } } } private void putServiceAndInit(Service service) throws NacosException { // 將服務加入serviceMap緩存 putService(service); // 建立心跳檢測任務機制,默認五秒 service.init(); // 實現數據一致性的監聽 consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }
public void putService(Service service) { if (!serviceMap.containsKey(service.getNamespaceId())) { synchronized (putServiceLock) { if (!serviceMap.containsKey(service.getNamespaceId())) { serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>()); } } } serviceMap.get(service.getNamespaceId()).put(service.getName(), service); }
//建立心跳檢測任務clientBeatCheckTask,默認五秒 public void init() { HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); } }
心跳定時任務
@Override public void run() { try { ...省略 List<Instance> instances = service.allIPs(true); // 設置實例的運行狀況 for (Instance instance : instances) { if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { instance.setHealthy(false); getPushService().serviceChanged(service); ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } } if (!getGlobalConfig().isExpireInstance()) { return; } // 刪除過期的實例 for (Instance instance : instances) { if (instance.isMarked()) { continue; } if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // 刪除實例 deleteIp(instance); } } } catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); } }
②addInstance()方法
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { // 比較並獲取新的實例列表 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); // 保存服務實例 consistencyService.put(key, instances); } }
區分是臨時實例還是永久實例
public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); }
//通過判斷key的前綴字符串來判斷 private ConsistencyService mapConsistencyService(String key) { return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }
臨時實例ephemeralConsistencyService.put()
@PostConstruct public void init() { //提交通知任務 GlobalExecutor.submitDistroNotifyTask(notifier); } @Override public void put(String key, Record value) throws NacosException { //①注冊實例 onPut(key, value); //②同步實例到其他節點 distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); }
①注冊實例更新到緩存
public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); // 保存命名數據 dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } //添加任務 notifier.addTask(key, DataOperation.CHANGE); }
public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } tasks.offer(Pair.with(datumKey, action)); } // task.run調用 private void handle(Pair<String, DataOperation> pair) { try { ...省略 for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == DataOperation.CHANGE) { listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue; } ... } ... } ... }
@Override public void onChange(String key, Instances value) throws Exception { ...
//更新實例,運用CopyAndWrite updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
//重新計算校驗 recalculateChecksum(); }
②同步實例到所有遠程服務器
異步批量同步
public void sync(DistroKey distroKey, DataOperation action, long delay) { for (Member each : memberManager.allMembersWithoutSelf()) { DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress()); DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress()); } } }
永久實例RaftConsistencyServiceImpl
public void put(String key, Record value) throws NacosException { checkIsStopWork(); try { raftCore.signalPublish(key, value); } catch (Exception e) { ... } }
public void signalPublish(String key, Record value) throws Exception { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } //當前節點不是 Leader if (!isLeader()) { ObjectNode params = JacksonUtils.createEmptyJsonNode(); params.put("key", key); params.replace("value", JacksonUtils.transferToJsonNode(value)); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); final RaftPeer leader = getLeader(); // 將注冊請求轉發到集群的 Leader節點 raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters); return; } //當前節點是 Leader OPERATE_LOCK.lock(); try { final long start = System.currentTimeMillis(); final Datum datum = new Datum(); datum.key = key; datum.value = value; if (getDatum(key) == null) { datum.timestamp.set(1L); } else { datum.timestamp.set(getDatum(key).timestamp.incrementAndGet()); } ObjectNode json = JacksonUtils.createEmptyJsonNode(); json.replace("datum", JacksonUtils.transferToJsonNode(datum)); json.replace("source", JacksonUtils.transferToJsonNode(peers.local())); // 開始發布 onPublish(datum, peers.local()); final String content = json.toString(); // 利用 CountDownLatch 實現過半 final CountDownLatch latch = new CountDownLatch(peers.majorityCount()); for (final String server : peers.allServersIncludeMyself()) { if (isLeader(server)) { latch.countDown(); continue; } final String url = buildUrl(server, API_ON_PUB); // 同步實例信息給集群節點 HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { ... return; } latch.countDown(); } @Override public void onError(Throwable throwable) { Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable); } @Override public void onCancel() { } }); } if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) { ... } long end = System.currentTimeMillis(); Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key); } finally { OPERATE_LOCK.unlock(); } } public void onPublish(Datum datum, RaftPeer source) throws Exception { if (stopWork) { throw new IllegalStateException("old raft protocol already stop work"); } RaftPeer local = peers.local(); if (datum.value == null) { ... } if (!peers.isLeader(source.ip)) { ... } if (source.term.get() < local.term.get()) { ... } local.resetLeaderDue(); // 持久化數據,寫入文件 if (KeyBuilder.matchPersistentKey(datum.key)) { raftStore.write(datum); } datums.put(datum.key, datum); if (isLeader()) { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } else { if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) { //set leader term: getLeader().term.set(source.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } } raftStore.updateTerm(local.term.get()); // 發布數據更新事件 NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build()); Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term); }
一致性監聽這里采用了委派機制 ,同時執行了 Distro 和 Raft 兩個實現類。由此可以說明Nacos同時使用 Raft 協議和 Distro 協議維護數據一致性的。
@Override
public void listen(String key, RecordListener listener) throws NacosException { // this special key is listened by both: if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) { persistentConsistencyService.listen(key, listener); ephemeralConsistencyService.listen(key, listener); return; } mapConsistencyService(key).listen(key, listener); }
7.啟動加載
@EnableDiscoveryClient @Import(EnableDiscoveryClientImportSelector.class) public @interface EnableDiscoveryClient { //默認自動將本服務注冊到服務注冊中心 boolean autoRegister() default true; } @Order(Ordered.LOWEST_PRECEDENCE - 100) public class EnableDiscoveryClientImportSelector extends SpringFactoryImportSelector<EnableDiscoveryClient> { @Override public String[] selectImports(AnnotationMetadata metadata) { String[] imports = super.selectImports(metadata); AnnotationAttributes attributes = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes(getAnnotationClass().getName(), true)); // 獲取@EnableDiscoveryClient注解的屬性autoRegister的值 boolean autoRegister = attributes.getBoolean("autoRegister"); if (autoRegister) { // 加載AutoServiceRegistrationConfiguration配置類 List<String> importsList = new ArrayList<>(Arrays.asList(imports)); importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration"); imports = importsList.toArray(new String[0]); } else { // 設置spring.cloud.service-registry.auto-registration.enabled=false, 關閉服務自動注冊功能 Environment env = getEnvironment(); if (ConfigurableEnvironment.class.isInstance(env)) { ConfigurableEnvironment configEnv = (ConfigurableEnvironment) env; LinkedHashMap<String, Object> map = new LinkedHashMap<>(); map.put("spring.cloud.service-registry.auto-registration.enabled", false); MapPropertySource propertySource = new MapPropertySource( "springCloudDiscoveryClient", map); configEnv.getPropertySources().addLast(propertySource); } } return imports; } ...省略 } @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties(AutoServiceRegistrationProperties.class) @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public class AutoServiceRegistrationConfiguration { } @ConfigurationProperties("spring.cloud.service-registry.auto-registration") public class AutoServiceRegistrationProperties { // 是否啟用服務自動注冊。默認值為true private boolean enabled = true; // 是否將管理注冊為服務。默認值為true private boolean registerManagement = true; // 如果沒有自動注冊,啟動是否失敗。默認值為假 private boolean failFast = false; ...省略 }
綜上,沒有使用@EnableDiscoveryClient注解,服務也會被自動注冊,默認自動注冊為true。
如果不想自動注冊服務,可以通過
@EnableDiscoveryClient(autoRegister = false) 或者 spring.cloud.service-registry.auto-registration.enabled=false