1.使用
compile 'com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-config:2.2.3.RELEASE'
spring: application: name: product cloud: nacos: config: server-addr: 127.0.0.1:9000 namespace: 038b8be8-54da-44a5-9664-def33bc8cd19 group: DEFAULT_GROUP prefix: ${spring.application.name}
file-extension: yaml
Nacos配置文件命名規則
${prefix}-${spring.profile.active}.${file-extension}
prefix:服務名稱,對應spring:application:name
spring.profile.active:配置環境
file-extension:配置文件類型,默認是properties
2.配置類加載
NacosConfigBootstrapConfiguration配置文件的加載
@Configuration(proxyBeanMethods = false) @ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true) public class NacosConfigBootstrapConfiguration { @Bean @ConditionalOnMissingBean public NacosConfigProperties nacosConfigProperties() { return new NacosConfigProperties(); } @Bean @ConditionalOnMissingBean public NacosConfigManager nacosConfigManager( NacosConfigProperties nacosConfigProperties) { return new NacosConfigManager(nacosConfigProperties); } @Bean public NacosPropertySourceLocator nacosPropertySourceLocator( NacosConfigManager nacosConfigManager) { return new NacosPropertySourceLocator(nacosConfigManager); } }
NacosConfigManager
public NacosConfigManager(NacosConfigProperties nacosConfigProperties) { this.nacosConfigProperties = nacosConfigProperties; createConfigService(nacosConfigProperties); } // 創建ConfigService 配置服務 static ConfigService createConfigService( NacosConfigProperties nacosConfigProperties) { if (Objects.isNull(service)) { // 加鎖防止多線程多次創建 synchronized (NacosConfigManager.class) { try { if (Objects.isNull(service)) { service = NacosFactory.createConfigService( // 完成一些配置的初始化 nacosConfigProperties.assembleConfigServiceProperties()); } } ... } } return service; } public static ConfigService createConfigService(Properties properties) throws NacosException { return ConfigFactory.createConfigService(properties); } // 通過反射創建實例 :NacosConfigService public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); Constructor constructor = driverImplClass.getConstructor(Properties.class); ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } }
NacosConfigService
NacosConfigService有參構造方法 public NacosConfigService(Properties properties) throws NacosException { // 檢查上下文路徑 ValidatorUtils.checkInitParam(properties); String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { this.encode = Constants.ENCODE; } else { this.encode = encodeTmp.trim(); } // 設置命名空間配置 initNamespace(properties); // 安全驗證 this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); this.agent.start(); this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); } // SecurityProxy用來安全驗證的,默認沒有用戶名就沒啟用,登錄驗證直接返回true public ServerHttpAgent(Properties properties) throws NacosException { this.serverListMgr = new ServerListManager(properties); this.securityProxy = new SecurityProxy(properties, NACOS_RESTTEMPLATE); this.namespaceId = properties.getProperty(PropertyKeyConst.NAMESPACE); init(properties); this.securityProxy.login(this.serverListMgr.getServerUrls()); // 初始化執行服務 this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.config.security.updater"); t.setDaemon(true); return t; } }); // 固定每5秒執行登錄驗證任務 this.executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { securityProxy.login(serverListMgr.getServerUrls()); } }, 0, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS); }
ClientWorker
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // 初始化超時時間和重試時間 init(properties); // 傳進來的安全驗證 this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); // 長輪詢 this.executorService = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); // 檢查配置任務,每10毫秒一次 this.executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); } // 檢查配置信息 public void checkConfigInfo() { // 分派任務 int listenerSize = cacheMap.get().size(); // 把長任務數匯總起來。 int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // The task list is no order.So it maybe has issues when changing. executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }
LongPollingRunnable
@Override public void run() { List<CacheData> cacheDatas = new ArrayList<CacheData>(); List<String> inInitializingCacheList = new ArrayList<String>(); try { // 檢查故障轉移配置 for (CacheData cacheData : cacheMap.get().values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { // 檢查本地配置 checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } // 檢查服務配置,檢查是否沒有使用本地配置信息,如果沒有使用本地配置信息則請求獲取服務端配置信息 List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); if (!CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys); } for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { // 獲取服務端配置信息 String[] ct = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
// 更新緩存配置信息,此時緩存的MD5已經改變 cache.setContent(ct[0]); if (null != ct[1]) { cache.setType(ct[1]); } LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]); } catch (NacosException ioe) { String message = String .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { // 如果輪訓任務異常,將對該任務下一次執行時間進行處罰 LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } }
CacheData
void checkListenerMd5() { for (ManagerListenerWrap wrap : listeners) { // 如果監聽器中的緩存CacheMap的Md5與當前CacheMap的Md5不一致,則通知監聽器配置信息已經變更 if (!md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, type, md5, wrap); } } } private void safeNotifyListener(final String dataId, final String group, final String content, final String type, final String md5, final ManagerListenerWrap listenerWrap) { final Listener listener = listenerWrap.listener; Runnable job = new Runnable() { @Override public void run() { ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader(); ClassLoader appClassLoader = listener.getClass().getClassLoader(); try { if (listener instanceof AbstractSharedListener) { AbstractSharedListener adapter = (AbstractSharedListener) listener; adapter.fillContext(dataId, group); LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5); } // 執行回調之前先將線程classloader設置為具體webapp的classloader,以免回調方法中調用spi接口是出現異常或錯用(多應用部署才會有該問題)。 Thread.currentThread().setContextClassLoader(appClassLoader); ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setGroup(group); cr.setContent(content); configFilterChainManager.doFilter(null, cr); String contentTmp = cr.getContent();
// 接收配置信息 listener.receiveConfigInfo(contentTmp); // 比較緩存中的內容,並更新Md5值 if (listener instanceof AbstractConfigChangeListener) { Map data = ConfigChangeHandler.getInstance() .parseChangeData(listenerWrap.lastContent, content, type); ConfigChangeEvent event = new ConfigChangeEvent(data); ((AbstractConfigChangeListener) listener).receiveConfigChange(event); listenerWrap.lastContent = content; } listenerWrap.lastCallMd5 = md5; LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5, listener); } catch (NacosException ex) { ... } finally { Thread.currentThread().setContextClassLoader(myClassLoader); } } }; final long startNotify = System.currentTimeMillis(); try { if (null != listener.getExecutor()) { listener.getExecutor().execute(job); } else { job.run(); } } catch (Throwable t) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group, md5, listener, t.getCause()); } final long finishNotify = System.currentTimeMillis(); LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ", name, (finishNotify - startNotify), dataId, group, md5, listener); }
總結:
客戶端通過一個定時任務來檢查自己監聽的配置項的數據的,一旦服務端的數據發生變化時,客戶端將會獲取到最新的數據,並將最新的數據保存在一個 CacheData 對象中,然后會重新計算 CacheData 的 md5 屬性的值,此時就會對該 CacheData 所綁定的 Listener 觸發 receiveConfigInfo 回調。
考慮到服務端故障的問題,客戶端將最新數據獲取后會保存在本地的 snapshot 文件中,以后會優先從文件中獲取配置信息的值。