Nacos-配置中心


一.服務端處理

1.1 ConfigController.getConfig()接口獲取配置

@GetMapping

@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)

public void getConfig(HttpServletRequest request, HttpServletResponse response,

@RequestParam("dataId") String dataId, @RequestParam("group") String group,

@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, //租戶信息,對應 Nacos 的命名空間ID字段

@RequestParam(value = "tag", required = false) String tag){

        // check tenant,檢查名稱等有效性(省略...)

          // check params,校驗參數非空和有效性(省略...)

        final String clientIp = RequestUtil.getRemoteIp(request);

         inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);

}

1.1.1 RequestUtil.getRemoteIp方法盡可能獲取真實IP

獲取IP是盡可能獲取真實的,而不是代理的,如果有NGINX的話,優先使用X_FORWARDED_FOR,取出第一個就是最開始的客戶端真實地址。如果都沒有的話,只能用RemoteAddr。

 public static String getRemoteIp(HttpServletRequest request) {

    String xForwardedFor = request.getHeader(X_FORWARDED_FOR);

  if (!StringUtils.isBlank(xForwardedFor)) {

  return xForwardedFor.split(X_FORWARDED_FOR_SPLIT_SYMBOL)[0].trim();

  }

  String nginxHeader = request.getHeader(X_REAL_IP);

   return StringUtils.isBlank(nginxHeader) ? request.getRemoteAddr() : nginxHeader;

 }

1.1.2 ConfigServletInner.doGetConfig

根據groupKey更新緩存的屬性,然后根據單例運行是否用mysql,進行mysql查詢或者直接用文件零拷貝傳輸,

因為有個DumpService在初始化的時候會去mysql比對記錄,把文件保存到本地/data\config-data文件夾中,所以可以直接用零拷貝了。

public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,

  String tenant, String tag, String clientIp) {

    final String groupKey = GroupKey2.getKey(dataId, group, tenant); //拼接

    String autoTag = request.getHeader("Vipserver-Tag");

    String requestIpApp = RequestUtil.getAppName(request);

    int lockResult = tryConfigReadLock(groupKey); //首先獲取讀鎖,先從緩存中讀,最大重試10次,間隔1

    final String requestIp = RequestUtil.getRemoteIp(request);

    boolean isBeta = false;

    if (lockResult > 0) {

      FileInputStream fis = null;

      String md5 = Constants.NULL;

      long lastModified = 0L;

      //獲取緩存,包括MD5,lastmodifyedTs,type等..

      CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey);

      //configType例如yaml

      final String configType = (null != cacheItem.getType()) ? cacheItem.getType() : FileTypeEnum.TEXT.getFileType();

      File file = null;

      ConfigInfoBase configInfoBase = null;

      PrintWriter out = null;

      md5 = cacheItem.getMd5(); //緩存中的md5值

      lastModified = cacheItem.getLastModifiedTs(); //緩存中的最后修改時間

      //確定是否直接讀取數據(嵌入式存儲derby+單機),如果使用mysql,降低數據庫讀取壓力; 如果使用raft + derby,降低leader讀取壓力;

      // persistService.findConfigInfo方法后面有簡單分析

      if (PropertyUtil.isDirectRead()) {

      configInfoBase = persistService.findConfigInfo(dataId, group, tenant);

      } else {

      file = DiskUtil.targetFile(dataId, group, tenant); //讀取本地文件

      }

      response.setHeader(Constants.CONTENT_MD5, md5);

      response.setHeader("Pragma", "no-cache"); // 頭部禁用緩存

      response.setDateHeader("Expires", 0);

      response.setHeader("Cache-Control", "no-cache,no-store");

      if (PropertyUtil.isDirectRead()) {

        response.setDateHeader("Last-Modified", lastModified);

      } else {

        fis = new FileInputStream(file);

        response.setDateHeader("Last-Modified", file.lastModified());

      }

      //設置一些頭信息后,進行響應輸出,如果有本地文件就用零拷貝,否則就用字符流。

      if (PropertyUtil.isDirectRead()) {

        out = response.getWriter(); //使用字符流

        out.print(configInfoBase.getContent());

        out.flush();

        out.close();

      } else {

      //操作系統可以將字節直接從文件系統緩存傳輸到目標通道

             fis.getChannel().transferTo(0L, fis.getChannel().size(), Channels.newChannel(response.getOutputStream()));

      }

      .....

      finally { //釋放鎖 releaseConfigReadLock(groupKey); }

    } else if (lockResult == 0) {

      //如果獲取不到說明配置文件不存在或者有線程正在寫配置文件,也不能讀,為了保證數據的一致性:

      ...

     }

  return HttpServletResponse.SC_OK + "";

}

1.1.3 PersistService.findConfigInfo()查數據庫配置

這個方法會直接查詢數據庫, select **** 這種。

derby數據庫:

如果其他數據庫用JdbcTemplate操作;

1.2 DumpService將配置文件全部Dump到磁盤

Nacos Config模塊有一個特點,會將數據庫中的配置信息,dump成文件,

通過直接文件讀取的方式,替代直接讀取數據庫,降低數據庫的壓力,是的數據庫可以更好的處理寫操作。

 

 

             

 

 

 

 

圖片來源: https://blog.csdn.net/wangwei19871103/article/details/105814924

1.2.1 DumpService.init() 初始化

spring啟動加載時,會執行帶有 @PostConstruct 注解的初始化方法;

@PostConstruct

@Override

protected void init() throws Throwable {

  if (ApplicationUtils.getStandaloneMode()) {

      dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor,dumpAllTagProcessor);

  return;

  }

//非單機模式, 后面章節再分析

}

1.2.2 dumpOperate() 執行dump操作

protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,

  DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {

    String dumpFileContext = "CONFIG_DUMP_TO_FILE";

    TimerContext.start(dumpFileContext);

    // 構建並添加全部配置文件Dump處理器

    Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());

    // 構建並添加全部灰度配置文件Dump處理器

    Runnable dumpAllBeta = () -> dumpAllTaskMgr.addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());

    // 構建並添加全部Tag配置文件Dump處理器

    Runnable dumpAllTag = () -> dumpAllTaskMgr.addTask(DumpAllTagTask.TASK_ID, new DumpAllTagTask());

    //清除歷史配置文件信息(xx天之前的歷史配置信息全部刪除)

    Runnable clearConfigHistory = () -> {

      // 單機模式返回true, derby + raft 模式 leader節點可以執行此任務

      if (canExecute()) {

              Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), 24 * getRetentionDays());

        int totalCount = persistService.findConfigHistoryCountByTime(startTime);

              if (totalCount > 0) {

          // 分頁刪除歷史記錄 ; 采用分頁的方式,一是為了降低數據庫刪除數據時的壓力,

          // 另一方面考慮數據庫集群的主從同步延遲的問題(bin-log)

          int pageSize = 1000;

          int removeTime = (totalCount + pageSize - 1) / pageSize;

                  while (removeTime > 0) {

              persistService.removeConfigHistory(startTime, pageSize); // 分頁刪除以免批量太大報錯

              removeTime--;

           }  

        }

      }

    };

 

    //全量Dump配置信息

    dumpConfigInfo(dumpAllProcessor);

    // 更新 Beta緩存,先刪除文件

    DiskUtil.clearAllBeta();

    if (persistService.isExistTable(BETA_TABLE_NAME)) {

      dumpAllBetaProcessor.process(new DumpAllBetaTask());

    }

    // 更新 Tag 緩存,先刪除文件

    DiskUtil.clearAllTag();

    if (persistService.isExistTable(TAG_TABLE_NAME)) {

      dumpAllTagProcessor.process(new DumpAllTagTask());

    }

 

    // add to dump aggr

    List<ConfigInfoChanged> configList = persistService.findAllAggrGroup();

     if (configList != null && !configList.isEmpty()) {

       total = configList.size();

      List<List<ConfigInfoChanged>> splitList = splitList(configList, INIT_THREAD_COUNT);

      for (List<ConfigInfoChanged> list : splitList) {

          MergeAllDataWorker work = new MergeAllDataWorker(list);

          work.start();

      }

      

 

    // 非單機模式,則Nacos Config存在一個dump文件的心跳記錄,可以減少dump文件的開銷和任務耗時

    if (!ApplicationUtils.getStandaloneMode()) {

      Runnable heartbeat = () -> {

        String heartBeatTime = TimeUtils.getCurrentTime().toString();

        DiskUtil.saveHeartBeatToDisk(heartBeatTime); // 時間信息持久化

      };

      // 周期性執行任務

      ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS);

      // 隨機的任務延遲時間

      long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;

      ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);

      // 周期性執行dump全部灰度配置文件的操作 ,6小時

      ConfigExecutor.scheduleConfigTask(dumpAllBeta, initialDelay,  DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);

      // 周期性執行dump全部tag配置文件的操作 ,6小時

      ConfigExecutor.scheduleConfigTask(dumpAllTag, initialDelay,  DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);

     }

 

    // 周期性執行清除往期歷史配置信息記錄

    ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);

    } finally {

      TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);

    }

 }

1.2.3 dumpConfigInfo()

作用: 主要是將數據庫中的所有ConfigInfo查詢出來寫到服務器的磁盤中

參數: dumpAllProcessor, 這個是TaskProcessor 任務處理器;處理器中有個 process()方法; 最終執行任務的時候就是執行這個方法的;

private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException {

    int timeStep = 6;

    Boolean isAllDump = true;

    // initial dump all

    FileInputStream fis = null;

    Timestamp heartheatLastStamp = null;

 

    //判斷是否快速啟動,即配置文件中的isQuickStart默認false ,

    if (isQuickStart()) {

    File heartbeatFile = DiskUtil.heartBeatFile();

    if (heartbeatFile.exists()) {

      fis = new FileInputStream(heartbeatFile);

         String heartheatTempLast = IoUtils.toString(fis, Constants.ENCODE);

         heartheatLastStamp = Timestamp.valueOf(heartheatTempLast);

      //如果上一次服務正常的時間距離現在不超過6個小時; 那么設置 isAllDump = false;表示不需要全量Dump

      if (TimeUtils.getCurrentTime().getTime() - heartheatLastStamp.getTime()

          < timeStep * 60 * 60 * 1000) {

            isAllDump = false;

       }

    }

  

    if (isAllDump) {

          DiskUtil.clearAll(); //先刪除本地file

      dumpAllProcessor.process(new DumpAllTask()); //處理全部配置數據

    } else {

      //非全量dump,下面小節1.2.5介紹.dumpChangeProcessor

      Timestamp beforeTimeStamp = getBeforeStamp(heartheatLastStamp, timeStep);

      DumpChangeProcessor dumpChangeProcessor = new DumpChangeProcessor(this, beforeTimeStamp, TimeUtils.getCurrentTime());

      dumpChangeProcessor.process(new DumpChangeTask());

      // 文件的 MD5 檢查任務

      Runnable checkMd5Task = () -> {

        // 直接根據內存緩存中的配置信息的數據,進行快速檢查每個配置文件信息的變更情況

        List<String> diffList = ConfigCacheService.checkMd5();

            for (String groupKey : diffList) {

                // 將對應格式的數據進行解析

          String[] dg = GroupKey.parseKey(groupKey);

          String dataId = dg[0];

          String group = dg[1];

          String tenant = dg[2];

        // 直接查找對應的配置文件信息

          ConfigInfoWrapper configInfo = persistService.queryConfigInfo(dataId, group, tenant);

        // 進行變更判斷並dump出文件

          ConfigCacheService.dumpChange(dataId, group, tenant, configInfo.getContent(),

          configInfo.getLastModified());

         }

           };

    // 進行周期任務調度執行

    ConfigExecutor.scheduleConfigTask(checkMd5Task, 0, 12, TimeUnit.HOURS);

      

}

DiskUtil.heartBeatFile() 獲取心跳文件

心跳文件在 {NACOS_HOME}/status/heartBeat.txt,這是一個心跳文件,每十秒就會把當前時間寫入到這個文件中;

作用: (斷點續傳) 為了能夠快速啟動應用,可以選擇不需要全部Dump所有的配置文件,因為上一次可能已經Dump了文件在磁盤中了,

如果配置很大的話,走IO還是會花費一定的時間的; 所以每十秒來持久化一次當前時間,用於記錄上一次服務正常距離現在有多長時間;

假設服務宕機了,半個小時之后才啟動成功,那么我們只需要將這半小時之內數據庫中的配置變化重新Dump到磁盤中就行了,不需要DumpAll;

1.2.4 DumpAllProcessor.process()-全量dump

DumpAllProcessor.process()方法

@Override

public boolean process(NacosTask task) {

    long currentMaxId = persistService.findConfigMaxId(); //查詢數據庫最大id

    long lastMaxId = 0;

    while (lastMaxId < currentMaxId) {

    //分頁獲取數據庫的數據,每次1000

    Page<ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);

    if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) {

      for (ConfigInfoWrapper cf : page.getPageItems()) {

        long id = cf.getId();

        lastMaxId = id > lastMaxId ? id : lastMaxId;

        // AggrWhitelist是Nacos頁面自定義的一個DataId; 如果ConfigInfo的DataId是這個值的話就會被單獨解析,

        if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {

          AggrWhitelist.load(cf.getContent());

        }

        // ClientIpWhiteList也是Nacos自己定義的一個預留配置DataId,Ip白名單

        if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {

          ClientIpWhiteList.load(cf.getContent());

          }

        // SwitchService也是Nacos內部預留的一個配置;DataId是 com.alibaba.nacos.meta.switch ;

        //開發者可以配置這個里面的屬性,來進行一些設置內部屬性的操作;

         if (cf.getDataId().equals(SwitchService.SWITCH_META_DATAID)) {

          SwitchService.load(cf.getContent());

        }

         //dump方法里面會校驗MD5是否改變並更新,真正的磁盤寫入操作

         boolean result = ConfigCacheService.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified(), cf.getType())

      }

    } else {

      lastMaxId += PAGE_SIZE;

          

   return true;

}

ConfigService.dump()方法真正的磁盤寫入操作;

這個方法首先將配置保存到磁盤文件中,並且緩存配置信息的MD5到內存中;

如果配置信息不一致(MD5不一致),則將會發送一個通知事件 LocalDataChangeEvent告知本地數據有更改;

CacheItem 是配置信息的對象;保存着配置信息的一些信息,但是沒有保存Content,只保存了content的MD5;

/*** Save config file and update md5 value in cache. */

public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,

String type) {

    String groupKey = GroupKey2.getKey(dataId, group, tenant);

    //如果內存中沒有當前配置的緩存 CacheItem,則組裝對象保存進去;這時md5是空字符串;

    CacheItem ci = makeSure(groupKey);

    ci.setType(type);

    final int lockResult = tryWriteLock(groupKey); //獲取寫鎖,沒獲取到則報錯或返回false

    //計算content的MD5

    final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);

    if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {

              ...

    } else if (!PropertyUtil.isDirectRead()) {

      //上面計算的md5跟內存 CacheItem 中的md5做比較(第一次肯定不相等),如果不相等則將文件保存到磁盤中;

      DiskUtil.saveToDisk(dataId, group, tenant, content);

    }

    //updateMd5方法中,如果MD5不相同,則更新 CacheItem 中的MD5屬性和lastModifiedTs屬性;

    // lastModifiedTs是表示最后更新時間; 如果MD5不相同,還要發送通知告知數據有變更;

    updateMd5(groupKey, md5, lastModifiedTs)

    finally { releaseWriteLock(groupKey); }

}

updateMd5方法:

public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {

  CacheItem cache = makeSure(groupKey);

    if (cache.md5 == null || !cache.md5.equals(md5)) {

           cache.md5 = md5;

           cache.lastModifiedTs = lastModifiedTs;

           NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));

    }

}

LongPollingService監聽LocalDataChangeEvent事件

@Override

public void onEvent(Event event) {

  //SwitchService配置中的一個屬性 isFixedPolling; 是否固定長輪詢

     if (isFixedPolling()) {

    // Ignore.

     } else {

       if (event instanceof LocalDataChangeEvent) {

       LocalDataChangeEvent evt = (LocalDataChangeEvent) event;

       ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));

     }

}

最終執行DataChangeTask

class DataChangeTask implements Runnable {

  @Override

  public void run() {

  ConfigCacheService.getContentBetaMd5(groupKey);

    //1.遍歷所有的長輪詢訂閱者者

    for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {

      ClientLongPolling clientSub = iter.next();

      if (clientSub.clientMd5Map.containsKey(groupKey)) {

        if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {

          continue; // 2. 如果是beta發布且不在beta列表直接跳過

        }

        if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {

          continue; // 3.如果tag發布且不在tag列表直接跳過

          }

        getRetainIps().put(clientSub.ip, System.currentTimeMillis());

        iter.remove(); // Delete subscribers' relationships.

        //4.發送Http請求通知所有未被上面2、3過濾掉的的訂閱者最新的配置數據ConfigInfo

        clientSub.sendResponse(Arrays.asList(groupKey));

      }

    }

    } 

}

1.2.5 DumpChangeProcessor.process()-非全量dump

即Dump有變化的數據的執行器;

public class DumpChangeProcessor implements NacosTaskProcessor {

  @Override

public boolean process(NacosTask task) {

long startUpdateMd5 = System.currentTimeMillis();

//1.查詢數據庫所有的配置文件

List<ConfigInfoWrapper> updateMd5List = persistService.listAllGroupKeyMd5();

//2.將所有的配置文件緩存到內存中,並通知所有訂閱的客戶端

for (ConfigInfoWrapper config : updateMd5List) {

  final String groupKey = GroupKey2.getKey(config.getDataId(), config.getGroup());     

  ConfigCacheService.updateMd5(groupKey, config.getMd5(), config.getLastModified());

}

long endUpdateMd5 = System.currentTimeMillis();

long startDeletedConfigTime = System.currentTimeMillis();

//3. 從 his_config_info 歷史表中找到從上一次心跳時間(heartBeat.txt)到現在的所有被刪除記錄,

// his_config_info 記錄的就是歷史的配置文件;

List<ConfigInfo> configDeleted = persistService.findDeletedConfig(startTime, endTime);

//4. 遍歷拿到的歷史配置數據的dataId,group,Tenant;然后去config_info表中查找能不能查到數據

// 如果能查到,說明配置不是被刪除了,只是修改了content;

// 如果不能查到,說明整個配置文件都被刪除了;則將磁盤對應的配置文件刪除;

並且通知訂閱的客戶端數據變更;

for (ConfigInfo configInfo : configDeleted) {

  if (persistService.findConfigInfo(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant())

    == null) {

    ConfigCacheService.remove(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant());

  }

}

long endDeletedConfigTime = System.currentTimeMillis();

final long startChangeConfigTime = System.currentTimeMillis();

//5. config_info表中查找 從上一次心跳時間(heartBeat.txt)到現在的所有有被修改過的配置數據,

// 然后執行 ConfigService.dumpChange 將這個改過的配置Dump的磁盤中,並通知;

List<ConfigInfoWrapper> changeConfigs = persistService.findChangeConfig(startTime, endTime);

for (ConfigInfoWrapper cf : changeConfigs) {

  boolean result = ConfigCacheService.dumpChange(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified());

}

//6. load Nacos內置的一些DataId配置:ClientIpWhiteList,AggrWhitelist,SwitchService

ConfigCacheService.reloadConfig();

long endChangeConfigTime = System.currentTimeMillis();

return true;

}

}

每隔12個小時全量Dump一次數據

ConfigExecutor.scheduleConfigTask(checkMd5Task, 0, 12, TimeUnit.HOURS);

 

public static void scheduleConfigTask(Runnable command, long initialDelay, long delay, TimeUnit unit) {

    TIMER_EXECUTOR.scheduleWithFixedDelay(command, initialDelay, delay, unit);

}

另外,還有部分邏輯在 1.2.2 dumpOperate()方法中;

二.客戶端讀取配置

2.1 服務啟動相關配置

spring-cloud-alibaba-nacos-config工程中 META-INF\spring.factories文件注入的類;

org.springframework.cloud.bootstrap.BootstrapConfiguration=\

com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\

com.alibaba.cloud.nacos.NacosConfigAutoConfiguration,\

com.alibaba.cloud.nacos.endpoint.NacosConfigEndpointAutoConfiguration

org.springframework.boot.diagnostics.FailureAnalyzer=\

com.alibaba.cloud.nacos.diagnostics.analyzer.NacosConnectionFailureAnalyzer

2.1.1 NacosConfigBootstrapConfiguration

nacosConfigProperties實例和nacosPropertySourceLocator實例注入。

@Configuration

@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)

public class NacosConfigBootstrapConfiguration {

@Bean

@ConditionalOnMissingBean

public NacosConfigProperties nacosConfigProperties() {

  return new NacosConfigProperties();

}

@Bean

public NacosPropertySourceLocator nacosPropertySourceLocator(

     NacosConfigProperties nacosConfigProperties) {

      return new NacosPropertySourceLocator(nacosConfigProperties);

}

}

2.1.2 NacosPropertySourceLocator

實現接口PropertySourceLocator;

locate()

先准備設置一堆屬性,然后進行共享配置和額外配置的加載,主要是loadApplicationConfiguration

@Override

public PropertySource<?> locate(Environment env) {

    //獲取配置服務實例,NacosFactory根據properties反射方式創建

    ConfigService configService = nacosConfigProperties.configServiceInstance();

    long timeout = nacosConfigProperties.getTimeout();

    nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout); //屬性源建造器,timeout為超時30秒

    String name = nacosConfigProperties.getName(); //dataid的名字

    String dataIdPrefix = nacosConfigProperties.getPrefix(); //前綴

    if (StringUtils.isEmpty(dataIdPrefix)) {

      dataIdPrefix = name;

    }

    //前綴為空的話默認就是spring.application.name

    if (StringUtils.isEmpty(dataIdPrefix)) {

      dataIdPrefix = env.getProperty("spring.application.name");

    }

    //創建符合屬性源

    CompositePropertySource composite = new CompositePropertySource(NACOS_PROPERTY_SOURCE_NAME);

    loadSharedConfiguration(composite); //共享配置,主要是默認組里面的

    loadExtConfiguration(composite); // 額外配置

    loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);

    return composite;

}

loadApplicationConfiguration()

先獲取配置的擴展名和分組,根據分組配置文件加載和激活的環境加載,也就是我們經常用激活環境的配置文件xxx-dev-yaml這種。

加載后的信息都要放入CompositePropertySource符合屬性里並返回。

private void loadApplicationConfiguration(

    CompositePropertySource compositePropertySource, String dataIdPrefix,

    NacosConfigProperties properties, Environment environment) {

    String fileExtension = properties.getFileExtension(); //擴展名,比如yml

    String nacosGroup = properties.getGroup(); //分組,默認DEFAULT_GROUP

    //根據分組配置文件加載

    loadNacosDataIfPresent(compositePropertySource,

    dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);

    for (String profile : environment.getActiveProfiles()) { //有環境配置的更高級別,比如dev,prod

      String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;

      //加載環境配置

      loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup,

      fileExtension, true);

    }

}

loadNacosDataIfPresent方法

private void loadNacosDataIfPresent(

    final CompositePropertySource composite,

    final String dataId, final String group, String fileExtension,

    boolean isRefreshable) {

    if (NacosContextRefresher.getRefreshCount() != 0) { //刷新過了

      NacosPropertySource ps;

      if (!isRefreshable) { //不刷新,直接緩存取

        ps = NacosPropertySourceRepository.getNacosPropertySource(dataId);

      }

      else {

        ps = nacosPropertySourceBuilder.build(dataId, group, fileExtension, true);

      }

      composite.addFirstPropertySource(ps);

    }

    else {

      NacosPropertySource ps = nacosPropertySourceBuilder.build(dataId, group,

      fileExtension, isRefreshable);

      composite.addFirstPropertySource(ps);

    }

}

NacosPropertySourceBuilder的build方法

先加載數據,然后結果封裝成NacosPropertySource,放進緩存(並發hashmap)。

NacosPropertySource build(String dataId, String group, String fileExtension, boolean isRefreshable) {

    Properties p = loadNacosData(dataId, group, fileExtension);

    NacosPropertySource nacosPropertySource = new NacosPropertySource(group, dataId,

    propertiesToMap(p), new Date(), isRefreshable);

    NacosPropertySourceRepository.collectNacosPropertySources(nacosPropertySource);

    return nacosPropertySource;

}

loadNacosData()

用NacosConfigService來加載,加載到了就解析成LinkedHashMap返回,否則就是個空的LinkedHashMap。

private Properties loadNacosData(String dataId, String group, String fileExtension) {

    String data = null;

    //獲取到的是字符串,需要后面根據文件類型解析

    data = configService.getConfig(dataId, group, timeout);

    if (!StringUtils.isEmpty(data)) {

      //properties格式配置文件

      if (fileExtension.equalsIgnoreCase("properties")) {

        Properties properties = new Properties();

        properties.load(new StringReader(data));

        return properties;

      }

    //yml和yaml格式配置文件

    else if (fileExtension.equalsIgnoreCase("yaml")

        || fileExtension.equalsIgnoreCase("yml")) {

      YamlPropertiesFactoryBean yamlFactory = new YamlPropertiesFactoryBean();

      yamlFactory.setResources(new ByteArrayResource(data.getBytes()));

      return yamlFactory.getObject();

     

    return EMPTY_PROPERTIES; //返回空的數據

}

2.2 NacosConfigService

NacosConfigService結構在下面小節介紹。

2.2.1 getConfig()

@Override

public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {

    return getConfigInner(namespace, dataId, group, timeoutMs);

}

2.2.2 getConfigInner()方法

首先優先從本地獲取,再次從網絡獲取,否則從本地快照緩存文件獲取;

private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {

    group = null2defaultGroup(group); //默認組

    ParamUtils.checkKeyParam(dataId, group); //檢查參數

    ConfigResponse cr = new ConfigResponse(); // 創建響應

    cr.setDataId(dataId);

    cr.setTenant(tenant);

    cr.setGroup(group);

    / 優先使用本地配置,比如C:\Users\600336\nacos\config\fixed-localhost_8848_nacos\data\config-data\目錄中獲取相應配置文件

    String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);

    if (content != null) {

      cr.setContent(content);

      configFilterChainManager.doFilter(null, cr);

      content = cr.getContent();

      return content;

    }

 

    // 如果不本地不存在的話,就從網絡讀,下一節分析

    content = worker.getServerConfig(dataId, group, tenant, timeoutMs);

    cr.setContent(content);

    configFilterChainManager.doFilter(null, cr);

    content = cr.getContent();

    return content;

 

    //如果上面都沒獲取到,則獲取本地快照緩存文件內容,

    //比如 xxx\nacos\config\fixed-localhost_8848_nacos\snapshot\mytest\nacos-config-client.yml

    dataId, group, tenant, ContentUtils.truncateContent(content));

    content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);

    cr.setContent(content);

    configFilterChainManager.doFilter(null, cr);

    content = cr.getContent();

    return content;

}

2.2.3 ClientWorker.getServerConfig()

用代理請求/v1/cs/configs,傳參數dataId,group,tenant獲取配置文件。

public String getServerConfig(String dataId, String group, String tenant, long readTimeout)

throws NacosException {

    ... //參數group空則默認組

    HttpResult result = null;

    List<String> params = null;

    if (StringUtils.isBlank(tenant)) {

      params = Arrays.asList("dataId", dataId, "group", group);

    } else {

      params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);

    }

    result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);

    

    switch (result.code) {

      case HttpURLConnection.HTTP_OK:

        LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);

      return result.content;

    }

}

2.3 NacosConfigService大致結構

 

 

 

 

 

 

 

 

 

 

圖片來源:https://blog.csdn.net/wangwei19871103/article/details/105738140

由於版本問題,可能結構有區別。

2.3.1 創建配置服務ConfigService

客戶端啟動時候, 前面介紹的NacosPropertySourceLocator.locate()方法中創建,

  ConfigService configService = nacosConfigProperties.configServiceInstance();

調用:

此時會初始化一些配置到properties中;

  configService = NacosFactory.createConfigService(properties);

其實就是反射出NacosConfigService,然后獲取有參構造方法,反射創建實例。

  public static ConfigService createConfigService(Properties properties) throws NacosException {

    Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");

    Constructor constructor = driverImplClass.getConstructor(Properties.class);

    ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);

    return vendorImpl;

  }

2.3.2 NacosConfigService構造方法

內部組件有:

ServerHttpAgent : http請求的代理,

MetricsHttpAgent : 包裝了ServerHttpAgent,加了計時的功能,

ClientWorker : 做配置文件檢查。

public NacosConfigService(Properties properties) throws NacosException {

    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);

    if (StringUtils.isBlank(encodeTmp)) {

      encode = Constants.ENCODE; //默認設置utf-8

    } else {

      encode = encodeTmp.trim();

    }

    initNamespace(properties);

    agent = new MetricsHttpAgent(new ServerHttpAgent(properties));

    agent.start();

    worker = new ClientWorker(agent, configFilterChainManager, properties);

}

2.3.3 ServerHttpAgent

public ServerHttpAgent(Properties properties) throws NacosException {

    serverListMgr = new ServerListManager(properties);

    init(properties); //設置編碼,密碼,最大重試次數

}

2.3.4 ServerListManager.start()

ServerListManager用來管理注冊中心集群列表;

agent.start(); 最終調用到 ServerListManager的start方法;

public synchronized void start() throws NacosException {

    if (isStarted || isFixed) {

      return;

    }

    //這里會創建任務,從nameserver獲取serverlist

    GetServerListTask getServersTask = new GetServerListTask(addressServerUrl);

    //重試5次

    for (int i = 0; i < initServerlistRetryTimes && serverUrls.isEmpty(); ++i) {

      //如果有改變則發起ServerlistChangeEvent事件

      getServersTask.run();

      this.wait((i + 1) * 100L);

           if (serverUrls.isEmpty()) { ...拋服務器異常... }

    }

    //無延遲開始調度,每30秒一次

    TimerService.scheduleWithFixedDelay(getServersTask, 0L, 30L, TimeUnit.SECONDS);

    isStarted = true;

}

2.4 ClientWorker

參數:

agent:http代理,MetricsHttpAgent對象,

ConfigFilterChainManager : 過濾器管理器 ,默認里面沒有過濾器,可以addFilter自己加。

這里也開啟了一個單線程的執行器,執行checkConfigInfo檢查配置任務,每10毫秒一次,去檢查當前的配置數量,

如果超過一個輪詢任務的限制數量,默認3000個,就開啟一個新的任務去做。

@SuppressWarnings("PMD.ThreadPoolCreationRule")

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {

    this.agent = agent;

    this.configFilterChainManager = configFilterChainManager;

    init(properties); // 初始化超時時間

 

    executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {

      @Override

      public Thread newThread(Runnable r) {

        Thread t = new Thread(r);

        t.setName("com.alibaba.nacos.client.Worker." + agent.getName());

        t.setDaemon(true);

        return t;

      }

    });

    //有cpu核數的線程,用來做長輪詢的,每次檢查配置,如果LongPollingRunnable任務的配置緩存超過一定數量,

    // 默認3000個,就要去開啟一個新任務去檢查配置

    executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {

      @Override

      public Thread newThread(Runnable r) {

        Thread t = new Thread(r);

        t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());

        t.setDaemon(true);

        return t;

      }

    });

    //配置檢查

    executor.scheduleWithFixedDelay(new Runnable() {

      @Override

      public void run() {

        checkConfigInfo();

      }

    }, 1L, 10L, TimeUnit.MILLISECONDS);

}

 

checkConfigInfo()方法后續會分析到;

 

 

 

參考:

https://blog.csdn.net/wangwei19871103/article/details/105814924 ,

https://blog.csdn.net/somenzz/article/details/100518028 ,

https://www.liaochuntao.cn/2019/09/16/java-web-54/ ,

https://www.liaochuntao.cn/categories/nacos/ ,

https://blog.csdn.net/wangwei19871103/article/details/105729211 ,

官方api : https://nacos.io/zh-cn/docs/open-api.html

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM