在spring cloud netflflix的文章中,分析了Environment,這個是非常重要的類,他負責管理spring的運行相關的配置信息,其中就包含application.properties。而在Spring Cloud中,如果集成Nacos作為配置中心的話,那么意味着這部分配置是屬於遠程配置,也會作為配置源保存到Environment中,這樣才能通過@value注解來注入配置中的屬性。從之前的Confifig的源碼中,可以知道Environment中所有外部化配置,針對不同類型的配置都會有與之對應的PropertySource,比如(SystemEnvironmentPropertySource、CommandLinePropertySource)。以及PropertySourcesPropertyResolver來進行解析。那NacosClient在啟動的時候,必然也會需要從遠程服務器上獲取配置加載到Environment中,這樣才能使得應用程序通過@value進行屬性的注入,而且我們一定可以猜測到的是,這塊的工作一定又和spring中某個機制有關系。
SpringApplication.run
在spring boot項目啟動時,有一個prepareContext的方法,它會回調所有實現了ApplicationContextInitializer 的實例,來做一些初始化工作。
public ConfigurableApplicationContext run(String... args) { StopWatch stopWatch = new StopWatch(); stopWatch.start(); ConfigurableApplicationContext context = null; Collection<SpringBootExceptionReporter> exceptionReporters = new ArrayList(); this.configureHeadlessProperty(); SpringApplicationRunListeners listeners = this.getRunListeners(args); listeners.starting(); Collection exceptionReporters; try { ApplicationArguments applicationArguments = new DefaultApplicationArguments(args); ConfigurableEnvironment environment = this.prepareEnvironment(listeners, applicationArguments); this.configureIgnoreBeanInfo(environment); Banner printedBanner = this.printBanner(environment); context = this.createApplicationContext(); exceptionReporters = this.getSpringFactoriesInstances(SpringBootExceptionReporter.class, new Class[]{ConfigurableApplicationContext.class}, context); this.prepareContext(context, environment, listeners, applicationArguments, printedBanner); this.refreshContext(context); this.afterRefresh(context, applicationArguments); stopWatch.stop(); if (this.logStartupInfo) { (new StartupInfoLogger(this.mainApplicationClass)).logStarted(this.getApplicationLog(), stopWatch); } listeners.started(context); this.callRunners(context, applicationArguments); } catch (Throwable var10) { this.handleRunFailure(context, var10, exceptionReporters, listeners); throw new IllegalStateException(var10); } try { listeners.running(context); return context; } catch (Throwable var9) { this.handleRunFailure(context, var9, exceptionReporters, (SpringApplicationRunListeners)null); throw new IllegalStateException(var9); } }
PropertySourceBootstrapConfifiguration.initialize
PropertySourceBootstrapConfifiguration 實現了 ApplicationContextInitializer 接口,其目的就是在應用程序上下文初始化的時候做一些額外的操作。根據默認的 AnnotationAwareOrderComparator 排序規則對propertySourceLocators數組進行排序;獲取運行的環境上下文ConfifigurableEnvironment
遍歷propertySourceLocators時
- 調用 locate 方法,傳入獲取的上下文environment
- 將source添加到PropertySource的鏈表中
- 設置source是否為空的標識標量empty
source不為空的情況,才會設置到environment中
- 返回Environment的可變形式,可進行的操作如addFirst、addLast
- 移除propertySources中的bootstrapProperties
- 根據confifig server覆寫的規則,設置propertySources
- 處理多個active profifiles的配置信息
@Override public void initialize(ConfigurableApplicationContext applicationContext) { List<PropertySource<?>> composite = new ArrayList<>(); //對propertySourceLocators數組進行排序,根據默認的AnnotationAwareOrderComparator AnnotationAwareOrderComparator.sort(this.propertySourceLocators); boolean empty = true; //獲取運行的環境上下文 ConfigurableEnvironment environment = applicationContext.getEnvironment(); for (PropertySourceLocator locator : this.propertySourceLocators) { //回調所有實現PropertySourceLocator接口實例的locate方法, Collection<PropertySource<?>> source = locator.locateCollection(environment); if (source == null || source.size() == 0) { continue; } List<PropertySource<?>> sourceList = new ArrayList<>(); for (PropertySource<?> p : source) { sourceList.add(new BootstrapPropertySource<>(p)); } logger.info("Located property source: " + sourceList); composite.addAll(sourceList);//將source添加到數組 empty = false; //表示propertysource不為空 } //只有propertysource不為空的情況,才會設置到environment中 if (!empty) { MutablePropertySources propertySources = environment.getPropertySources(); String logConfig = environment.resolvePlaceholders("${logging.config:}"); LogFile logFile = LogFile.get(environment); for (PropertySource<?> p : environment.getPropertySources()) { if (p.getName().startsWith(BOOTSTRAP_PROPERTY_SOURCE_NAME)) { propertySources.remove(p.getName()); } } insertPropertySources(propertySources, composite); reinitializeLoggingSystem(environment, logConfig, logFile); setLogLevels(applicationContext, environment); handleIncludedProfiles(environment); } }
PropertySourceLoader.locateCollection
這個方法會調用子類的locate方法,來獲得一個PropertySource,然后將PropertySource集合返回。接着它會調用 ConfigServicePropertySourceLocator 的locate方法。
static Collection<PropertySource<?>> locateCollection(PropertySourceLocator locator, Environment environment) { PropertySource<?> propertySource = locator.locate(environment); if (propertySource == null) { return Collections.emptyList(); } if (CompositePropertySource.class.isInstance(propertySource)) { Collection<PropertySource<?>> sources = ((CompositePropertySource) propertySource) .getPropertySources(); List<PropertySource<?>> filteredSources = new ArrayList<>(); for (PropertySource<?> p : sources) { if (p != null) { filteredSources.add(p); } } return filteredSources; } else { return Arrays.asList(propertySource); } }
NacosPropertySourceLocator.locate
這個就是Nacos 配置中心加載的的關鍵實現了,分別調用三個方法來加載配置(NAOCOS配置加載順序:共享配置 --> 擴展配置 --> 自身配置(后面優先級高) , 這三個配置在前面的內容中已經說過了)
public PropertySource<?> locate(Environment env) { this.nacosConfigProperties.setEnvironment(env); ConfigService configService = this.nacosConfigManager.getConfigService(); if (null == configService) { log.warn("no instance of config service found, can't load config from nacos"); return null; } else { long timeout = (long)this.nacosConfigProperties.getTimeout(); this.nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout); String name = this.nacosConfigProperties.getName(); String dataIdPrefix = this.nacosConfigProperties.getPrefix(); if (StringUtils.isEmpty(dataIdPrefix)) { dataIdPrefix = name; } if (StringUtils.isEmpty(dataIdPrefix)) { dataIdPrefix = env.getProperty("spring.application.name"); } CompositePropertySource composite = new CompositePropertySource("NACOS"); //加載共享配置 this.loadSharedConfiguration(composite); //加載擴展配置 this.loadExtConfiguration(composite); //加載自身配置 this.loadApplicationConfiguration(composite, dataIdPrefix, this.nacosConfigProperties, env); return composite; } }
loadApplicationConfifiguration
可以先不管加載共享配置、擴展配置的方法,最終本質上都是去遠程服務上讀取配置,只是傳入的參數不一樣。
- fifileExtension,表示配置文件的擴展名
- nacosGroup表示分組
- 加載 dataid=項目名稱 的配置
- 加載 dataid=項目名稱+擴展名 的配置
- 遍歷當前配置的激活點(profifile),分別循環加載帶有profifile的dataid配置
private void loadApplicationConfiguration(CompositePropertySource compositePropertySource, String dataIdPrefix, NacosConfigProperties properties, Environment environment) { String fileExtension = properties.getFileExtension(); String nacosGroup = properties.getGroup(); //加載默認的配置,nacos-dubbo-provider group:DEFAULT_GROUP this.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup, fileExtension, true); //加載默認的配置,nacos-dubbo-provider group:DEFAULT_GROUP this.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix + "." + fileExtension, nacosGroup, fileExtension, true); String[] var7 = environment.getActiveProfiles(); int var8 = var7.length; for(int var9 = 0; var9 < var8; ++var9) { String profile = var7[var9]; String dataId = dataIdPrefix + "-" + profile + "." + fileExtension; this.loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup, fileExtension, true); } }
loadNacosDataIfPresent
調用 loadNacosPropertySource 加載存在的配置信息。把加載之后的配置屬性保存到CompositePropertySource中。
private void loadNacosDataIfPresent(final CompositePropertySource composite, final String dataId, final String group, String fileExtension, boolean isRefreshable) { if (null != dataId && dataId.trim().length() >= 1) { if (null != group && group.trim().length() >= 1) { NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group, fileExtension, isRefreshable); this.addFirstPropertySource(composite, propertySource, false); } } }
loadNacosPropertySource
private NacosPropertySource loadNacosPropertySource(final String dataId, final String group, String fileExtension, boolean isRefreshable) { //類上面是否加了RefreshCount注解 if (NacosContextRefresher.getRefreshCount() != 0) { if (!isRefreshable) {//是否支持自動刷新, // 如果不支持自動刷新配置則自動從緩存獲 取返回 return NacosPropertySourceRepository.getNacosPropertySource(dataId, group); } } //構造器從配置中心獲取數據 return nacosPropertySourceBuilder.build(dataId, group, fileExtension, isRefreshable); }
加載
- 先從本地加載
- 如果本地沒有,則從遠程加載,並緩存到本地
- 在某個地方開啟一個定時任務,去不斷的更新本地緩存中的配置,基於MD5的比較
NacosPropertySource build(String dataId, String group, String fileExtension, boolean isRefreshable) { //調用loadNacosData加載遠程數據 Map<String, Object> p = loadNacosData(dataId, group, fileExtension); NacosPropertySource nacosPropertySource = new NacosPropertySource(group, dataId, p, new Date(), isRefreshable); //返回Nacos屬性源 NacosPropertySourceRepository.collectNacosPropertySource(nacosPropertySource); return nacosPropertySource; }
loadNacosData
加載Nacos的數據。
private Map<String, Object> loadNacosData(String dataId, String group, String fileExtension) { String data = null; try { // http遠程訪問配置中心,獲取配置數據 data = configService.getConfig(dataId, group, timeout); if (StringUtils.isEmpty(data)) { //如果為空,則提示日志 log.warn( "Ignore the empty nacos configuration and get it based on dataId[{}] & group[{}]", dataId, group); return EMPTY_MAP; } if (log.isDebugEnabled()) { log.debug(String.format( "Loading nacos data, dataId: '%s', group: '%s', data: %s", dataId, group, data)); } //根據擴展名進行數據的解析 Map<String, Object> dataMap = NacosDataParserHandler.getInstance() .parseNacosData(data, fileExtension); return dataMap == null ? EMPTY_MAP : dataMap; } catch (NacosException e) { log.error("get data from Nacos error,dataId:{}, ", dataId, e); } catch (Exception e) { log.error("parse data from Nacos error,dataId:{},data:{},", dataId, data, e); } return EMPTY_MAP; }
getConfig
@Override public String getConfig(String dataId, String group, long timeoutMs) throws NacosException { return getConfigInner(namespace, dataId, group, timeoutMs); }
getConfifigInner
繼續往下跟蹤,最終進入到getConfifigInner方法,主要有幾個邏輯
- 先從本地磁盤中加載配置,因為應用在啟動時,會加載遠程配置緩存到本地,如果本地文件的內容不為空,直接返回。
- 如果本地文件的內容為空,則調用worker.getServerConfifig加載遠程配置
- 如果出現異常,則調用本地快照文件加載配置
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException { group = null2defaultGroup(group); ParamUtils.checkKeyParam(dataId, group); ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setTenant(tenant); cr.setGroup(group); // 優先使用本地配置,從本地內存或者磁盤中拿 String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); if (content != null) { LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content)); cr.setContent(content); configFilterChainManager.doFilter(null, cr); content = cr.getContent(); return content; } try { //遠程配置加載 String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs); cr.setContent(ct[0]); //過濾鏈 configFilterChainManager.doFilter(null, cr); content = cr.getContent(); return content; } catch (NacosException ioe) { //如果遠程報錯 if (NacosException.NO_RIGHT == ioe.getErrCode()) { throw ioe; } LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}", agent.getName(), dataId, group, tenant, ioe.toString()); } LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content)); //如果遠程和本地都沒有就找到快照的內容進行加載,相當 於熔錯機制 content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant); cr.setContent(content); configFilterChainManager.doFilter(null, cr); content = cr.getContent(); return content; }
clientWorker.getServerConfifig
通過agent.httpGet發起http請求,獲取遠程服務的配置。
//通過agent.httpGet發起http請求,獲取遠程服務的配置。 public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException { String[] ct = new String[2]; if (StringUtils.isBlank(group)) { group = Constants.DEFAULT_GROUP; } HttpRestResult<String> result = null; try { Map<String, String> params = new HashMap<String, String>(3); if (StringUtils.isBlank(tenant)) { params.put("dataId", dataId); params.put("group", group); } else { params.put("dataId", dataId); params.put("group", group); params.put("tenant", tenant); } result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout); } catch (Exception ex) { String message = String .format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ex); throw new NacosException(NacosException.SERVER_ERROR, ex); } switch (result.getCode()) { case HttpURLConnection.HTTP_OK: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.getData()); ct[0] = result.getData(); if (result.getHeader().getValue(CONFIG_TYPE) != null) { ct[1] = result.getHeader().getValue(CONFIG_TYPE); } else { ct[1] = ConfigType.TEXT.getType(); } return ct; case HttpURLConnection.HTTP_NOT_FOUND: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null); return ct; case HttpURLConnection.HTTP_CONFLICT: { LOGGER.error( "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, " + "tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(NacosException.CONFLICT, "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } case HttpURLConnection.HTTP_FORBIDDEN: { LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(result.getCode(), result.getMessage()); } default: { LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId, group, tenant, result.getCode()); throw new NacosException(result.getCode(), "http error, code=" + result.getCode() + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } } }
上面客戶端發起請求過程搞定了,接下來就是服務端接收到請求進行處理從數據庫中查詢了;請求入口
ConfigController.getConfig
@GetMapping @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class) public void getConfig(HttpServletRequest request, HttpServletResponse response, @RequestParam("dataId") String dataId, @RequestParam("group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam(value = "tag", required = false) String tag) throws IOException, ServletException, NacosException { // check tenant ParamUtils.checkTenant(tenant); tenant = processTenant(tenant); // check params ParamUtils.checkParam(dataId, group, "datumId", "content"); ParamUtils.checkParam(tag); final String clientIp = RequestUtil.getRemoteIp(request); inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp); }
doGetConfig
public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group, String tenant, String tag, String clientIp) throws IOException, ServletException { final String groupKey = GroupKey2.getKey(dataId, group, tenant); String autoTag = request.getHeader("Vipserver-Tag"); String requestIpApp = RequestUtil.getAppName(request); int lockResult = tryConfigReadLock(groupKey); final String requestIp = RequestUtil.getRemoteIp(request); boolean isBeta = false; if (lockResult > 0) { FileInputStream fis = null; try { String md5 = Constants.NULL; long lastModified = 0L; //先從緩存中拿數據 CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey); if (cacheItem != null) { //判斷緩存是否是測試數據 if (cacheItem.isBeta()) { if (cacheItem.getIps4Beta().contains(clientIp)) { isBeta = true; } } final String configType = (null != cacheItem.getType()) ? cacheItem.getType() : FileTypeEnum.TEXT.getFileType(); response.setHeader("Config-Type", configType); String contentTypeHeader; try { contentTypeHeader = FileTypeEnum.valueOf(configType.toUpperCase()).getContentType(); } catch (IllegalArgumentException ex) { contentTypeHeader = FileTypeEnum.TEXT.getContentType(); } response.setHeader(HttpHeaderConsts.CONTENT_TYPE, contentTypeHeader); } File file = null; ConfigInfoBase configInfoBase = null; PrintWriter out = null; if (isBeta) { md5 = cacheItem.getMd54Beta(); lastModified = cacheItem.getLastModifiedTs4Beta(); if (PropertyUtil.isDirectRead()) { configInfoBase = persistService.findConfigInfo4Beta(dataId, group, tenant); } else { file = DiskUtil.targetBetaFile(dataId, group, tenant); } response.setHeader("isBeta", "true"); } else { if (StringUtils.isBlank(tag)) { //是否使用標簽方式 if (isUseTag(cacheItem, autoTag)) { if (cacheItem != null) { if (cacheItem.tagMd5 != null) { md5 = cacheItem.tagMd5.get(autoTag); } if (cacheItem.tagLastModifiedTs != null) { lastModified = cacheItem.tagLastModifiedTs.get(autoTag); } } if (PropertyUtil.isDirectRead()) { configInfoBase = persistService.findConfigInfo4Tag(dataId, group, tenant, autoTag); } else { file = DiskUtil.targetTagFile(dataId, group, tenant, autoTag); } response.setHeader("Vipserver-Tag", URLEncoder.encode(autoTag, StandardCharsets.UTF_8.displayName())); } else { md5 = cacheItem.getMd5(); lastModified = cacheItem.getLastModifiedTs(); if (PropertyUtil.isDirectRead()) { //查詢數據庫操作 configInfoBase = persistService.findConfigInfo(dataId, group, tenant); } else { file = DiskUtil.targetFile(dataId, group, tenant); } if (configInfoBase == null && fileNotExist(file)) { // FIXME CacheItem // No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1. ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1, ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp); // pullLog.info("[client-get] clientIp={}, {}, // no data", // new Object[]{clientIp, groupKey}); response.setStatus(HttpServletResponse.SC_NOT_FOUND); response.getWriter().println("config data not exist"); return HttpServletResponse.SC_NOT_FOUND + ""; } } } else { if (cacheItem != null) { if (cacheItem.tagMd5 != null) { md5 = cacheItem.tagMd5.get(tag); } if (cacheItem.tagLastModifiedTs != null) { Long lm = cacheItem.tagLastModifiedTs.get(tag); if (lm != null) { lastModified = lm; } } } if (PropertyUtil.isDirectRead()) { configInfoBase = persistService.findConfigInfo4Tag(dataId, group, tenant, tag); } else { file = DiskUtil.targetTagFile(dataId, group, tenant, tag); } if (configInfoBase == null && fileNotExist(file)) { // FIXME CacheItem // No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1. ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1, ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp); // pullLog.info("[client-get] clientIp={}, {}, // no data", // new Object[]{clientIp, groupKey}); response.setStatus(HttpServletResponse.SC_NOT_FOUND); response.getWriter().println("config data not exist"); return HttpServletResponse.SC_NOT_FOUND + ""; } } } response.setHeader(Constants.CONTENT_MD5, md5); // Disable cache. response.setHeader("Pragma", "no-cache"); response.setDateHeader("Expires", 0); response.setHeader("Cache-Control", "no-cache,no-store"); if (PropertyUtil.isDirectRead()) { response.setDateHeader("Last-Modified", lastModified); } else { fis = new FileInputStream(file); response.setDateHeader("Last-Modified", file.lastModified()); } if (PropertyUtil.isDirectRead()) { out = response.getWriter(); out.print(configInfoBase.getContent()); out.flush(); out.close(); } else { fis.getChannel() .transferTo(0L, fis.getChannel().size(), Channels.newChannel(response.getOutputStream())); } LogUtil.PULL_CHECK_LOG.warn("{}|{}|{}|{}", groupKey, requestIp, md5, TimeUtils.getCurrentTimeStr()); final long delayed = System.currentTimeMillis() - lastModified; // TODO distinguish pull-get && push-get /* Otherwise, delayed cannot be used as the basis of push delay directly, because the delayed value of active get requests is very large. */ ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified, ConfigTraceService.PULL_EVENT_OK, delayed, requestIp); } finally { releaseConfigReadLock(groupKey); if (null != fis) { fis.close(); } } } else if (lockResult == 0) { // FIXME CacheItem No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1. ConfigTraceService .logPullEvent(dataId, group, tenant, requestIpApp, -1, ConfigTraceService.PULL_EVENT_NOTFOUND, -1, requestIp); response.setStatus(HttpServletResponse.SC_NOT_FOUND); response.getWriter().println("config data not exist"); return HttpServletResponse.SC_NOT_FOUND + ""; } else { PULL_LOG.info("[client-get] clientIp={}, {}, get data during dump", clientIp, groupKey); response.setStatus(HttpServletResponse.SC_CONFLICT); response.getWriter().println("requested file is being modified, please try later."); return HttpServletResponse.SC_CONFLICT + ""; } return HttpServletResponse.SC_OK + ""; }
findConfigInfo
@Override public ConfigInfo findConfigInfo(final String dataId, final String group, final String tenant) { final String tenantTmp = StringUtils.isBlank(tenant) ? StringUtils.EMPTY : tenant; try { return this.jt.queryForObject( "SELECT ID,data_id,group_id,tenant_id,app_name,content,md5,type FROM config_info WHERE data_id=? AND group_id=? AND tenant_id=?", new Object[] {dataId, group, tenantTmp}, CONFIG_INFO_ROW_MAPPER); } catch (EmptyResultDataAccessException e) { // Indicates that the data does not exist, returns null. return null; } catch (CannotGetJdbcConnectionException e) { LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e); throw e; } }
查詢完后,將查詢結果返回給客戶端;到這里整個調用流程就完成了;
客戶端配置的動態更新
在NacosConfifigService的構造方法中,當這個類被實例化以后,有做一些事情
- 初始化一個HttpAgent,這里又用到了裝飾起模式,實際工作的類是ServerHttpAgent,MetricsHttpAgent內部也是調用了ServerHttpAgent的方法,增加了監控統計的信息
- ClientWorker, 客戶端的一個工作類,agent作為參數傳入到clientworker,可以基本猜測到里面會用到agent做一些遠程相關的事情
public NacosConfigService(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { this.encode = Constants.ENCODE; } else { this.encode = encodeTmp.trim(); } initNamespace(properties); //agent是遠程通信的代理,如果要遠程通信就用這agent做;MetricsHttpAgent是記錄請求信息,他會上報監控信息 this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); //開啟定時上報 this.agent.start(); //ClientWorker是任務 this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); }
clientWorker
可以看到 ClientWorker 除了將 HttpAgent 維持在自己內部,還創建了兩個線程池:第一個線程池是只擁有一個線程用來執行定時任務的 executor,executor 每隔 10ms 就會執行一次checkConfifigInfo() 方法,從方法名上可以知道是每 10 ms 檢查一次配置信息。第二個線程池是一個普通的線程池,從 ThreadFactory 的名稱可以看到這個線程池是做長輪詢的。
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // Initialize the timeout parameter init(properties); //初始化一個定時調度的線程池,重寫了threadfactory方法 this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); //初始化一個定時調度的線程池,從里面的name名字來看,似乎和長輪訓有關系。而這個長輪訓應該是和 nacos服務端的長輪訓 this.executorService = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); //設置定時任務的執行頻率,並且調用checkConfigInfo這個方法,猜測是定時去檢測配置是否發生了變 化 // 首次執行延遲時間為1毫秒、延遲時間為10毫秒 this.executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }
checkConfifigInfo
這個方法主要的目的是用來檢查服務端的配置信息是否發生了變化。如果有變化,則觸發listener通知cacheMap: AtomicReference<Map<String, CacheData>> cacheMap 用來存儲監聽變更的緩存集合。key是根據dataID/group/tenant(租戶) 拼接的值。Value是對應存儲在nacos服務器上的配置文件的內容。
默認情況下,每個長輪訓LongPullingRunnable任務默認處理3000個監聽配置集。如果超過3000, 則需要啟動多個LongPollingRunnable去執行。currentLongingTaskCount保存已啟動的LongPullingRunnable任務數
默認情況下,每個長輪訓LongPullingRunnable任務默認處理3000個監聽配置集。如果超過3000, 則需要啟動多個LongPollingRunnable去執行。currentLongingTaskCount保存已啟動的LongPullingRunnable任務數
public void checkConfigInfo() { // Dispatch taskes. // 分任務 int listenerSize = cacheMap.get().size(); // Round up the longingTaskCount. // 向上取整為批數,監聽的配置數量除以3000,得到一個整數,代表長輪訓任務的數量 int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); //currentLongingTaskCount表示當前的長輪訓任務數量,如果小於計算的結果,則可以繼續創建 if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // The task list is no order.So it maybe has issues when changing. // 要判斷任務是否在執行 這塊需要好好想想。 任務列表現在是無序的。變化過程可能有問 executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount;//更新當前長輪訓人數數量 } }
LongPollingRunnable.run
初始化new LongPollingRunnable()丟給 executorService線程池來處理,所以我們可以找到LongPollingRunnable里面的run方法這個方法傳遞了一個taskid, tasked用來區分cacheMap中的任務批次, 保存到cacheDatas這個集合中cacheData.isUseLocalConfigInfo 這個值的變化來自於checkLocalConfifig這個方法
class LongPollingRunnable implements Runnable { private final int taskId; public LongPollingRunnable(int taskId) { this.taskId = taskId; } @Override public void run() { List<CacheData> cacheDatas = new ArrayList<CacheData>(); List<String> inInitializingCacheList = new ArrayList<String>(); try { // check failover config // tasked用來區分cacheMap中的任務批次, 保存到cacheDatas這個集合中 for (CacheData cacheData : cacheMap.get().values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { checkLocalConfig(cacheData);//通過本地文件中緩存的數據和 cacheData集合中的數據進行比對,判斷是否出現數據變化 if (cacheData.isUseLocalConfigInfo()) {//這里表示數據有變 化,需要通知監聽器 cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } // check server config List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); if (!CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys); } for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String[] ct = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(ct[0]); if (null != ct[1]) { cache.setType(ct[1]); } LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]); } catch (NacosException ioe) { String message = String .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } } }
checkLocalConfifig
檢查本地配置,這里面有三種情況
- 如果isUseLocalConfifigInfo為false,但是本地緩存路徑的文件是存在的,那么把isUseLocalConfifigInfo設置為true,並且更新cacheData的內容以及文件的更新時間
- 如果isUseLocalCOnfifigInfo為true,但是本地緩存文件不存在,則設置為false,不通知監聽器
- isUseLocalConfifigInfo為true,並且本地緩存文件也存在,但是緩存的的時間和文件的更新時間不一致,則更新cacheData中的內容,並且isUseLocalConfifigInfo設置為true
private void checkLocalConfig(CacheData cacheData) { final String dataId = cacheData.dataId; final String group = cacheData.group; final String tenant = cacheData.tenant; File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant); // 如果本地緩存文件存在,並且cacheData.isUseLocalConfigInfo()=flase;如果條件成立相當於可以直接用本地緩存數據 //而不用遠程加載 if (!cacheData.isUseLocalConfigInfo() && path.exists()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn( "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); return; } // 有 -> 沒有。不通知業務監聽器,從server拿到配置后通知。 // If use local config info, then it doesn't notify business listener and notify after getting from server. if (cacheData.isUseLocalConfigInfo() && !path.exists()) { cacheData.setUseLocalConfigInfo(false); LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); return; } // 有變更 // When it changed. if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path .lastModified()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn( "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); } }
checkListenerMd5
遍歷用戶自己添加的監聽器,如果發現數據的md5值不同,則發送通知
void checkListenerMd5() { for (ManagerListenerWrap wrap : listeners) { if (!md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, type, md5, wrap); } } }
檢查服務端配置
在LongPollingRunnable.run中,先通過本地配置的讀取和檢查來判斷數據是否發生變化從而實現變化的通知接着,當前的線程還需要去遠程服務器上獲得最新的數據,檢查哪些數據發生了變化
- 通過checkUpdateDataIds獲取遠程服務器上數據變更的dataid
- 遍歷這些變化的集合,然后調用getServerConfifig從遠程服務器獲得對應的內容
- 更新本地的cache,設置為服務器端返回的內容
- 最后遍歷cacheDatas,找到變化的數據進行通知
//從服務端獲取發生變化的數據的DataID列表,保存在List<String>集合中 // check server config List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); if (!CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys); } for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { //遍歷有變換的groupkey,發起遠程請求獲得指定groupkey的內容 String[] ct = getServerConfig(dataId, group, tenant, 3000L); //把獲取到的內容設置到CacheData中 CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(ct[0]); if (null != ct[1]) { cache.setType(ct[1]); } LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]); } catch (NacosException ioe) { String message = String .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } //再遍歷CacheData這個集合,找到發生變化的數據進行通知 for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this);//繼續傳遞當前線程進行輪詢 } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } } }
checkUpdateDataIds
- 首先從cacheDatas集合中找到isUseLocalConfifigInfo為false的緩存
- 調用checkUpdateConfifigStr
/** * 從Server獲取值變化了的DataID列表。返回的對象里只有dataId和group是有效的。 保證不返回 NULL。 */ List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception { StringBuilder sb = new StringBuilder(); for (CacheData cacheData : cacheDatas) { //如果不使用本地緩存 if (!cacheData.isUseLocalConfigInfo()) { sb.append(cacheData.dataId).append(WORD_SEPARATOR); sb.append(cacheData.group).append(WORD_SEPARATOR); if (StringUtils.isBlank(cacheData.tenant)) { sb.append(cacheData.getMd5()).append(LINE_SEPARATOR); } else { sb.append(cacheData.getMd5()).append(WORD_SEPARATOR); sb.append(cacheData.getTenant()).append(LINE_SEPARATOR); } if (cacheData.isInitializing()) { // It updates when cacheData occours in cacheMap by first time. inInitializingCacheList .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant)); } } } boolean isInitializingCacheList = !inInitializingCacheList.isEmpty(); return checkUpdateConfigStr(sb.toString(), isInitializingCacheList); }
通過長輪訓的方式,從遠程服務器獲得變化的數據進行返回
/** * 從Server獲取值變化了的DataID列表。返回的對象里只有dataId和group是有效的。 保證不返 回NULL。 */ List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception { Map<String, String> params = new HashMap<String, String>(2); params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); Map<String, String> headers = new HashMap<String, String>(2); headers.put("Long-Pulling-Timeout", "" + timeout); // told server do not hang me up if new initializing cacheData added in if (isInitializingCacheList) { headers.put("Long-Pulling-Timeout-No-Hangup", "true"); } if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } try { // In order to prevent the server from handling the delay of the client's long task, // increase the client's read timeout to avoid this problem. long readTimeoutMs = timeout + (long) Math.round(timeout >> 1); HttpRestResult<String> result = agent .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), readTimeoutMs); if (result.ok()) { setHealthServer(true); return parseUpdateDataIdResponse(result.getData()); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.getCode()); } } catch (Exception e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }
代碼回退
getServerConfifig
根據dataId、group、tenant等信息,使用http請求從遠程服務器上獲得配置信息,讀取到數據之后緩存到本地文件中
//通過agent.httpGet發起http請求,獲取遠程服務的配置。 public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException { String[] ct = new String[2]; if (StringUtils.isBlank(group)) { group = Constants.DEFAULT_GROUP; } HttpRestResult<String> result = null; try { Map<String, String> params = new HashMap<String, String>(3); if (StringUtils.isBlank(tenant)) { params.put("dataId", dataId); params.put("group", group); } else { params.put("dataId", dataId); params.put("group", group); params.put("tenant", tenant); } result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout); } catch (Exception ex) { String message = String .format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ex); throw new NacosException(NacosException.SERVER_ERROR, ex); } switch (result.getCode()) { case HttpURLConnection.HTTP_OK: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.getData()); ct[0] = result.getData(); if (result.getHeader().getValue(CONFIG_TYPE) != null) { ct[1] = result.getHeader().getValue(CONFIG_TYPE); } else { ct[1] = ConfigType.TEXT.getType(); } return ct; case HttpURLConnection.HTTP_NOT_FOUND: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null); return ct; case HttpURLConnection.HTTP_CONFLICT: { LOGGER.error( "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, " + "tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(NacosException.CONFLICT, "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } case HttpURLConnection.HTTP_FORBIDDEN: { LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(result.getCode(), result.getMessage()); } default: { LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId, group, tenant, result.getCode()); throw new NacosException(result.getCode(), "http error, code=" + result.getCode() + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } } }
ConfifigController
上面流程說完配置的動態更新就說完了,下面說下客戶端發起了長輪詢請求客戶端怎么處理;nacos是使用spring mvc提供的rest api。這里面會調用inner.doPollingConfifig進行處理
@PostMapping("/listener") @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class) public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true); String probeModify = request.getParameter("Listening-Configs"); if (StringUtils.isBlank(probeModify)) { throw new IllegalArgumentException("invalid probeModify"); } probeModify = URLDecoder.decode(probeModify, Constants.ENCODE); Map<String, String> clientMd5Map; try { clientMd5Map = MD5Util.getClientMd5Map(probeModify); } catch (Throwable e) { throw new IllegalArgumentException("invalid probeModify"); } // do long-polling inner.doPollingConfig(request, response, clientMd5Map, probeModify.length()); }
doPollingConfifig
這個方法中,兼容了長輪訓和短輪詢的邏輯,我們只需要關注長輪訓的部分。再次進入到longPollingService.addLongPollingClient
/** * 輪詢接口. */ public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException { // 長輪詢 // Long polling. if (LongPollingService.isSupportLongPolling(request)) { //阻塞當前的返回,直到數據發生變化或者超時 longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; } // 兼容短輪詢邏輯,遍歷客戶端傳遞過來的探測數據,返回發生了變化的數據的key // Compatible with short polling logic. List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map); // 兼容短輪詢result // Compatible with short polling result. String oldResult = MD5Util.compareMd5OldResult(changedGroups); String newResult = MD5Util.compareMd5ResultString(changedGroups); String version = request.getHeader(Constants.CLIENT_VERSION_HEADER); if (version == null) { version = "2.0.0"; } int versionNum = Protocol.getVersionNumber(version); // Befor 2.0.4 version, return value is put into header. if (versionNum < START_LONG_POLLING_VERSION_NUM) { response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult); response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult); } else { request.setAttribute("content", newResult); } Loggers.AUTH.info("new content:" + newResult); // 禁用緩存 // Disable cache. response.setHeader("Pragma", "no-cache"); response.setDateHeader("Expires", 0); response.setHeader("Cache-Control", "no-cache,no-store"); response.setStatus(HttpServletResponse.SC_OK); return HttpServletResponse.SC_OK + ""; }
longPollingService.addLongPollingClient
這個方法應該是把客戶端的長輪訓請求添加到某個任務中去。
- 獲得客戶端傳遞過來的超時時間,並且進行本地計算,提前500ms返回響應,這就能解釋為什么客戶端響應超時時間是29.5+了。當然如果 isFixedPolling=true 的情況下,不會提前返回響應
- 根據客戶端請求過來的md5和服務器端對應的group下對應內容的md5進行比較,如果不一致,則通過 generateResponse 將結果返回
- 如果配置文件沒有發生變化,則通過 scheduler.execute 啟動了一個定時任務,將客戶端的長輪詢請求封裝成一個叫 ClientLongPolling 的任務,交給 scheduler 去執行
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) { //str表示超時時間,也就是timeout String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); /** * 提前500ms返回響應,為避免客戶端超時 */ // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout. long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) {//是否是固定的超時時間 timeout = Math.max(10000, getFixedPollingInterval()); // Do nothing but set fix polling timeout. } else { //先去比較變化,如果發現有變化的key直接返回 long start = System.currentTimeMillis(); //根據客戶端請求過來的md5和服務器端對應的group下對應內容的md5進行比較,如果不一致, // 則通過`generateResponse`將結果返回 List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { generateResponse(req, rsp, changedGroups); LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return;//返回客戶端 } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } String ip = RequestUtil.getRemoteIp(req); // 一定要由HTTP線程調用,否則離開后容器會立即發送響應 // Must be called by http thread, or send response. final AsyncContext asyncContext = req.startAsync(); // AsyncContext是Servlet3.0中提供的對象,調用startAsync獲得AsyncContext對象之后, // 這 個請求的響應會被延后,並釋放容器分配的線程。 // 這樣就可以實現長輪詢的機制. // AsyncContext.setTimeout()的超時時間不准,所以只能自己控制 asyncContext.setTimeout(0L); //執行長輪詢任務 ConfigExecutor.executeLongPolling( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); }
ClientLongPolling
clientLongPolling到底做了什么操作。可以先猜測一下應該會做什么事情
- 這個任務要阻塞29.5s才能執行,因為立馬執行沒有任何意義,畢竟前面已經執行過一次了
- 如果在29.5s+之內,數據發生變化,需要提前通知。需要有一種監控機制
基於這些猜想,可以看看它的實現過程從代碼粗粒度來看,它的實現似乎和猜想一致,在run方法中,通過scheduler.schedule實現了一個定時任務,它的delay時間正好是前面計算的29.5s。在這個任務中,會通過MD5Util.compareMd5來進行計算那另外一個,當數據發生變化以后,肯定不能等到29.5s之后才通知呀,那怎么辦呢?發現有一個allSubs 的東西,它似乎和發布訂閱有關系。那是不是有可能當前的clientLongPolling訂閱了數據變化的事件呢?
class ClientLongPolling implements Runnable { @Override public void run() { asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() { @Override public void run() { try { getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis()); /*** 刪除訂閱關系 */ // Delete subsciber's relations. allSubs.remove(ClientLongPolling.this); if (isFixedPolling()) { LogUtil.CLIENT_LOG .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix", RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); List<String> changedGroups = MD5Util .compareMd5((HttpServletRequest) asyncContext.getRequest(), (HttpServletResponse) asyncContext.getResponse(), clientMd5Map); if (changedGroups.size() > 0) { sendResponse(changedGroups); } else { sendResponse(null); } } else { LogUtil.CLIENT_LOG .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout", RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); sendResponse(null); } } catch (Throwable t) { LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause()); } } }, timeoutTime, TimeUnit.MILLISECONDS); allSubs.add(this); }
allSubs
allSubs是一個隊列,隊列里面放了ClientLongPolling這個對象。這個隊列似乎和配置變更有某種關聯關系。那么這里必須要實現的是,當用戶在nacos 控制台修改了配置之后,必須要從這個訂閱關系中取出關注的客戶端長連接,然后把變更的結果返回。於是我們去看LongPollingService的構造方法查找訂閱關系
/*** 長輪詢訂閱關系 */
final Queue<ClientLongPolling> allSubs;
allSubs.add(this);
LongPollingService
public LongPollingService() { allSubs = new ConcurrentLinkedQueue<ClientLongPolling>(); ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS); // Register LocalDataChangeEvent to NotifyCenter. NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize); // Register A Subscriber to subscribe LocalDataChangeEvent. NotifyCenter.registerSubscriber(new Subscriber() { @Override public void onEvent(Event event) { if (isFixedPolling()) { // Ignore. } else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent) event; ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } } } @Override public Class<? extends Event> subscribeType() { return LocalDataChangeEvent.class; } }); }
DataChangeTask.run
這個是數據變化的任務,最讓人興奮的應該是,它里面有一個循環迭代器,從allSubs里面獲得ClientLongPolling最后通過clientSub.sendResponse把數據返回到客戶端。所以,這也就能夠理解為何數據變化能夠實時觸發更新了。
class DataChangeTask implements Runnable { @Override public void run() { try { ConfigCacheService.getContentBetaMd5(groupKey); for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub = iter.next(); if (clientSub.clientMd5Map.containsKey(groupKey)) { // If published tag is not in the beta list, then it skipped. if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) { continue; } // If published tag is not in the tag list, then it skipped. if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) { continue; } getRetainIps().put(clientSub.ip, System.currentTimeMillis()); iter.remove(); // Delete subscribers' relationships. LogUtil.CLIENT_LOG .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance", RequestUtil .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()), "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey); clientSub.sendResponse(Arrays.asList(groupKey)); } } } catch (Throwable t) { LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t)); } }