1.使用
compile 'com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-config:2.2.3.RELEASE'
spring: application: name: product cloud: nacos: config: server-addr: 127.0.0.1:9000 namespace: 038b8be8-54da-44a5-9664-def33bc8cd19 group: DEFAULT_GROUP prefix: ${spring.application.name}
file-extension: yaml
Nacos配置文件命名规则
${prefix}-${spring.profile.active}.${file-extension}
prefix:服务名称,对应spring:application:name
spring.profile.active:配置环境
file-extension:配置文件类型,默认是properties
2.配置类加载
NacosConfigBootstrapConfiguration配置文件的加载
@Configuration(proxyBeanMethods = false) @ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true) public class NacosConfigBootstrapConfiguration { @Bean @ConditionalOnMissingBean public NacosConfigProperties nacosConfigProperties() { return new NacosConfigProperties(); } @Bean @ConditionalOnMissingBean public NacosConfigManager nacosConfigManager( NacosConfigProperties nacosConfigProperties) { return new NacosConfigManager(nacosConfigProperties); } @Bean public NacosPropertySourceLocator nacosPropertySourceLocator( NacosConfigManager nacosConfigManager) { return new NacosPropertySourceLocator(nacosConfigManager); } }
NacosConfigManager
public NacosConfigManager(NacosConfigProperties nacosConfigProperties) { this.nacosConfigProperties = nacosConfigProperties; createConfigService(nacosConfigProperties); } // 创建ConfigService 配置服务 static ConfigService createConfigService( NacosConfigProperties nacosConfigProperties) { if (Objects.isNull(service)) { // 加锁防止多线程多次创建 synchronized (NacosConfigManager.class) { try { if (Objects.isNull(service)) { service = NacosFactory.createConfigService( // 完成一些配置的初始化 nacosConfigProperties.assembleConfigServiceProperties()); } } ... } } return service; } public static ConfigService createConfigService(Properties properties) throws NacosException { return ConfigFactory.createConfigService(properties); } // 通过反射创建实例 :NacosConfigService public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); Constructor constructor = driverImplClass.getConstructor(Properties.class); ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } }
NacosConfigService
NacosConfigService有参构造方法 public NacosConfigService(Properties properties) throws NacosException { // 检查上下文路径 ValidatorUtils.checkInitParam(properties); String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { this.encode = Constants.ENCODE; } else { this.encode = encodeTmp.trim(); } // 设置命名空间配置 initNamespace(properties); // 安全验证 this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); this.agent.start(); this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); } // SecurityProxy用来安全验证的,默认没有用户名就没启用,登录验证直接返回true public ServerHttpAgent(Properties properties) throws NacosException { this.serverListMgr = new ServerListManager(properties); this.securityProxy = new SecurityProxy(properties, NACOS_RESTTEMPLATE); this.namespaceId = properties.getProperty(PropertyKeyConst.NAMESPACE); init(properties); this.securityProxy.login(this.serverListMgr.getServerUrls()); // 初始化执行服务 this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.config.security.updater"); t.setDaemon(true); return t; } }); // 固定每5秒执行登录验证任务 this.executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { securityProxy.login(serverListMgr.getServerUrls()); } }, 0, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS); }
ClientWorker
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // 初始化超时时间和重试时间 init(properties); // 传进来的安全验证 this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); // 长轮询 this.executorService = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); // 检查配置任务,每10毫秒一次 this.executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); } // 检查配置信息 public void checkConfigInfo() { // 分派任务 int listenerSize = cacheMap.get().size(); // 把长任务数汇总起来。 int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // The task list is no order.So it maybe has issues when changing. executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }
LongPollingRunnable
@Override public void run() { List<CacheData> cacheDatas = new ArrayList<CacheData>(); List<String> inInitializingCacheList = new ArrayList<String>(); try { // 检查故障转移配置 for (CacheData cacheData : cacheMap.get().values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { // 检查本地配置 checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } // 检查服务配置,检查是否没有使用本地配置信息,如果没有使用本地配置信息则请求获取服务端配置信息 List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); if (!CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys); } for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { // 获取服务端配置信息 String[] ct = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
// 更新缓存配置信息,此时缓存的MD5已经改变 cache.setContent(ct[0]); if (null != ct[1]) { cache.setType(ct[1]); } LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]); } catch (NacosException ioe) { String message = String .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { // 如果轮训任务异常,将对该任务下一次执行时间进行处罚 LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } }
CacheData
void checkListenerMd5() { for (ManagerListenerWrap wrap : listeners) { // 如果监听器中的缓存CacheMap的Md5与当前CacheMap的Md5不一致,则通知监听器配置信息已经变更 if (!md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, type, md5, wrap); } } } private void safeNotifyListener(final String dataId, final String group, final String content, final String type, final String md5, final ManagerListenerWrap listenerWrap) { final Listener listener = listenerWrap.listener; Runnable job = new Runnable() { @Override public void run() { ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader(); ClassLoader appClassLoader = listener.getClass().getClassLoader(); try { if (listener instanceof AbstractSharedListener) { AbstractSharedListener adapter = (AbstractSharedListener) listener; adapter.fillContext(dataId, group); LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5); } // 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。 Thread.currentThread().setContextClassLoader(appClassLoader); ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setGroup(group); cr.setContent(content); configFilterChainManager.doFilter(null, cr); String contentTmp = cr.getContent();
// 接收配置信息 listener.receiveConfigInfo(contentTmp); // 比较缓存中的内容,并更新Md5值 if (listener instanceof AbstractConfigChangeListener) { Map data = ConfigChangeHandler.getInstance() .parseChangeData(listenerWrap.lastContent, content, type); ConfigChangeEvent event = new ConfigChangeEvent(data); ((AbstractConfigChangeListener) listener).receiveConfigChange(event); listenerWrap.lastContent = content; } listenerWrap.lastCallMd5 = md5; LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5, listener); } catch (NacosException ex) { ... } finally { Thread.currentThread().setContextClassLoader(myClassLoader); } } }; final long startNotify = System.currentTimeMillis(); try { if (null != listener.getExecutor()) { listener.getExecutor().execute(job); } else { job.run(); } } catch (Throwable t) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group, md5, listener, t.getCause()); } final long finishNotify = System.currentTimeMillis(); LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ", name, (finishNotify - startNotify), dataId, group, md5, listener); }
总结:
客户端通过一个定时任务来检查自己监听的配置项的数据的,一旦服务端的数据发生变化时,客户端将会获取到最新的数据,并将最新的数据保存在一个 CacheData 对象中,然后会重新计算 CacheData 的 md5 属性的值,此时就会对该 CacheData 所绑定的 Listener 触发 receiveConfigInfo 回调。
考虑到服务端故障的问题,客户端将最新数据获取后会保存在本地的 snapshot 文件中,以后会优先从文件中获取配置信息的值。