一.服務端處理
1.1 ConfigController.getConfig()接口獲取配置
@GetMapping
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void getConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, //租戶信息,對應 Nacos 的命名空間ID字段
@RequestParam(value = "tag", required = false) String tag){
// check tenant,檢查名稱等有效性(省略...)
// check params,校驗參數非空和有效性(省略...)
final String clientIp = RequestUtil.getRemoteIp(request);
inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);
}
1.1.1 RequestUtil.getRemoteIp方法盡可能獲取真實IP
獲取IP是盡可能獲取真實的,而不是代理的,如果有NGINX的話,優先使用X_FORWARDED_FOR,取出第一個就是最開始的客戶端真實地址。如果都沒有的話,只能用RemoteAddr。
public static String getRemoteIp(HttpServletRequest request) {
String xForwardedFor = request.getHeader(X_FORWARDED_FOR);
if (!StringUtils.isBlank(xForwardedFor)) {
return xForwardedFor.split(X_FORWARDED_FOR_SPLIT_SYMBOL)[0].trim();
}
String nginxHeader = request.getHeader(X_REAL_IP);
return StringUtils.isBlank(nginxHeader) ? request.getRemoteAddr() : nginxHeader;
}
1.1.2 ConfigServletInner.doGetConfig
根據groupKey更新緩存的屬性,然后根據單例運行是否用mysql,進行mysql查詢或者直接用文件零拷貝傳輸,
因為有個DumpService在初始化的時候會去mysql比對記錄,把文件保存到本地/data\config-data文件夾中,所以可以直接用零拷貝了。
public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,
String tenant, String tag, String clientIp) {
final String groupKey = GroupKey2.getKey(dataId, group, tenant); //拼接
String autoTag = request.getHeader("Vipserver-Tag");
String requestIpApp = RequestUtil.getAppName(request);
int lockResult = tryConfigReadLock(groupKey); //首先獲取讀鎖,先從緩存中讀,最大重試10次,間隔1
final String requestIp = RequestUtil.getRemoteIp(request);
boolean isBeta = false;
if (lockResult > 0) {
FileInputStream fis = null;
String md5 = Constants.NULL;
long lastModified = 0L;
//獲取緩存,包括MD5,lastmodifyedTs,type等..
CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey);
//configType例如yaml
final String configType = (null != cacheItem.getType()) ? cacheItem.getType() : FileTypeEnum.TEXT.getFileType();
File file = null;
ConfigInfoBase configInfoBase = null;
PrintWriter out = null;
md5 = cacheItem.getMd5(); //緩存中的md5值
lastModified = cacheItem.getLastModifiedTs(); //緩存中的最后修改時間
//確定是否直接讀取數據(嵌入式存儲derby+單機),如果使用mysql,降低數據庫讀取壓力; 如果使用raft + derby,降低leader讀取壓力;
// persistService.findConfigInfo方法后面有簡單分析
if (PropertyUtil.isDirectRead()) {
configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
} else {
file = DiskUtil.targetFile(dataId, group, tenant); //讀取本地文件
}
response.setHeader(Constants.CONTENT_MD5, md5);
response.setHeader("Pragma", "no-cache"); // 頭部禁用緩存
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
if (PropertyUtil.isDirectRead()) {
response.setDateHeader("Last-Modified", lastModified);
} else {
fis = new FileInputStream(file);
response.setDateHeader("Last-Modified", file.lastModified());
}
//設置一些頭信息后,進行響應輸出,如果有本地文件就用零拷貝,否則就用字符流。
if (PropertyUtil.isDirectRead()) {
out = response.getWriter(); //使用字符流
out.print(configInfoBase.getContent());
out.flush();
out.close();
} else {
//操作系統可以將字節直接從文件系統緩存傳輸到目標通道
fis.getChannel().transferTo(0L, fis.getChannel().size(), Channels.newChannel(response.getOutputStream()));
}
.....
finally { //釋放鎖 releaseConfigReadLock(groupKey); }
} else if (lockResult == 0) {
//如果獲取不到說明配置文件不存在或者有線程正在寫配置文件,也不能讀,為了保證數據的一致性:
...
}
return HttpServletResponse.SC_OK + "";
}
1.1.3 PersistService.findConfigInfo()查數據庫配置
這個方法會直接查詢數據庫, select **** 這種。
如果其他數據庫用JdbcTemplate操作;
1.2 DumpService將配置文件全部Dump到磁盤
Nacos Config模塊有一個特點,會將數據庫中的配置信息,dump成文件,
通過直接文件讀取的方式,替代直接讀取數據庫,降低數據庫的壓力,是的數據庫可以更好的處理寫操作。
圖片來源: https://blog.csdn.net/wangwei19871103/article/details/105814924
1.2.1 DumpService.init() 初始化
spring啟動加載時,會執行帶有 @PostConstruct 注解的初始化方法;
@PostConstruct
@Override
protected void init() throws Throwable {
if (ApplicationUtils.getStandaloneMode()) {
dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor,dumpAllTagProcessor);
return;
}
//非單機模式, 后面章節再分析
}
1.2.2 dumpOperate() 執行dump操作
protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,
DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {
String dumpFileContext = "CONFIG_DUMP_TO_FILE";
TimerContext.start(dumpFileContext);
// 構建並添加全部配置文件Dump處理器
Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
// 構建並添加全部灰度配置文件Dump處理器
Runnable dumpAllBeta = () -> dumpAllTaskMgr.addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());
// 構建並添加全部Tag配置文件Dump處理器
Runnable dumpAllTag = () -> dumpAllTaskMgr.addTask(DumpAllTagTask.TASK_ID, new DumpAllTagTask());
//清除歷史配置文件信息(xx天之前的歷史配置信息全部刪除)
Runnable clearConfigHistory = () -> {
// 單機模式返回true, derby + raft 模式 leader節點可以執行此任務
if (canExecute()) {
Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), 24 * getRetentionDays());
int totalCount = persistService.findConfigHistoryCountByTime(startTime);
if (totalCount > 0) {
// 分頁刪除歷史記錄 ; 采用分頁的方式,一是為了降低數據庫刪除數據時的壓力,
// 另一方面考慮數據庫集群的主從同步延遲的問題(bin-log)
int pageSize = 1000;
int removeTime = (totalCount + pageSize - 1) / pageSize;
while (removeTime > 0) {
persistService.removeConfigHistory(startTime, pageSize); // 分頁刪除以免批量太大報錯
removeTime--;
}
}
}
};
//全量Dump配置信息
dumpConfigInfo(dumpAllProcessor);
// 更新 Beta緩存,先刪除文件
DiskUtil.clearAllBeta();
if (persistService.isExistTable(BETA_TABLE_NAME)) {
dumpAllBetaProcessor.process(new DumpAllBetaTask());
}
// 更新 Tag 緩存,先刪除文件
DiskUtil.clearAllTag();
if (persistService.isExistTable(TAG_TABLE_NAME)) {
dumpAllTagProcessor.process(new DumpAllTagTask());
}
// add to dump aggr
List<ConfigInfoChanged> configList = persistService.findAllAggrGroup();
if (configList != null && !configList.isEmpty()) {
total = configList.size();
List<List<ConfigInfoChanged>> splitList = splitList(configList, INIT_THREAD_COUNT);
for (List<ConfigInfoChanged> list : splitList) {
MergeAllDataWorker work = new MergeAllDataWorker(list);
work.start();
}
}
// 非單機模式,則Nacos Config存在一個dump文件的心跳記錄,可以減少dump文件的開銷和任務耗時
if (!ApplicationUtils.getStandaloneMode()) {
Runnable heartbeat = () -> {
String heartBeatTime = TimeUtils.getCurrentTime().toString();
DiskUtil.saveHeartBeatToDisk(heartBeatTime); // 時間信息持久化
};
// 周期性執行任務
ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS);
// 隨機的任務延遲時間
long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
// 周期性執行dump全部灰度配置文件的操作 ,6小時
ConfigExecutor.scheduleConfigTask(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
// 周期性執行dump全部tag配置文件的操作 ,6小時
ConfigExecutor.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
}
// 周期性執行清除往期歷史配置信息記錄
ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);
} finally {
TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);
}
}
1.2.3 dumpConfigInfo()
作用: 主要是將數據庫中的所有ConfigInfo查詢出來寫到服務器的磁盤中
參數: dumpAllProcessor, 這個是TaskProcessor 任務處理器;處理器中有個 process()方法; 最終執行任務的時候就是執行這個方法的;
private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException {
int timeStep = 6;
Boolean isAllDump = true;
// initial dump all
FileInputStream fis = null;
Timestamp heartheatLastStamp = null;
//判斷是否快速啟動,即配置文件中的isQuickStart默認false ,
if (isQuickStart()) {
File heartbeatFile = DiskUtil.heartBeatFile();
if (heartbeatFile.exists()) {
fis = new FileInputStream(heartbeatFile);
String heartheatTempLast = IoUtils.toString(fis, Constants.ENCODE);
heartheatLastStamp = Timestamp.valueOf(heartheatTempLast);
//如果上一次服務正常的時間距離現在不超過6個小時; 那么設置 isAllDump = false;表示不需要全量Dump
if (TimeUtils.getCurrentTime().getTime() - heartheatLastStamp.getTime()
< timeStep * 60 * 60 * 1000) {
isAllDump = false;
}
}
if (isAllDump) {
DiskUtil.clearAll(); //先刪除本地file
dumpAllProcessor.process(new DumpAllTask()); //處理全部配置數據
} else {
//非全量dump,下面小節1.2.5介紹.dumpChangeProcessor
Timestamp beforeTimeStamp = getBeforeStamp(heartheatLastStamp, timeStep);
DumpChangeProcessor dumpChangeProcessor = new DumpChangeProcessor(this, beforeTimeStamp, TimeUtils.getCurrentTime());
dumpChangeProcessor.process(new DumpChangeTask());
// 文件的 MD5 檢查任務
Runnable checkMd5Task = () -> {
// 直接根據內存緩存中的配置信息的數據,進行快速檢查每個配置文件信息的變更情況
List<String> diffList = ConfigCacheService.checkMd5();
for (String groupKey : diffList) {
// 將對應格式的數據進行解析
String[] dg = GroupKey.parseKey(groupKey);
String dataId = dg[0];
String group = dg[1];
String tenant = dg[2];
// 直接查找對應的配置文件信息
ConfigInfoWrapper configInfo = persistService.queryConfigInfo(dataId, group, tenant);
// 進行變更判斷並dump出文件
ConfigCacheService.dumpChange(dataId, group, tenant, configInfo.getContent(),
configInfo.getLastModified());
}
};
// 進行周期任務調度執行
ConfigExecutor.scheduleConfigTask(checkMd5Task, 0, 12, TimeUnit.HOURS);
}
}
DiskUtil.heartBeatFile() 獲取心跳文件
心跳文件在 {NACOS_HOME}/status/heartBeat.txt,這是一個心跳文件,每十秒就會把當前時間寫入到這個文件中;
作用: (斷點續傳) 為了能夠快速啟動應用,可以選擇不需要全部Dump所有的配置文件,因為上一次可能已經Dump了文件在磁盤中了,
如果配置很大的話,走IO還是會花費一定的時間的; 所以每十秒來持久化一次當前時間,用於記錄上一次服務正常距離現在有多長時間;
假設服務宕機了,半個小時之后才啟動成功,那么我們只需要將這半小時之內數據庫中的配置變化重新Dump到磁盤中就行了,不需要DumpAll;
1.2.4 DumpAllProcessor.process()-全量dump
DumpAllProcessor.process()方法
@Override
public boolean process(NacosTask task) {
long currentMaxId = persistService.findConfigMaxId(); //查詢數據庫最大id
long lastMaxId = 0;
while (lastMaxId < currentMaxId) {
//分頁獲取數據庫的數據,每次1000
Page<ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);
if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) {
for (ConfigInfoWrapper cf : page.getPageItems()) {
long id = cf.getId();
lastMaxId = id > lastMaxId ? id : lastMaxId;
// AggrWhitelist是Nacos頁面自定義的一個DataId; 如果ConfigInfo的DataId是這個值的話就會被單獨解析,
if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {
AggrWhitelist.load(cf.getContent());
}
// ClientIpWhiteList也是Nacos自己定義的一個預留配置DataId,Ip白名單
if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
ClientIpWhiteList.load(cf.getContent());
}
// SwitchService也是Nacos內部預留的一個配置;DataId是 com.alibaba.nacos.meta.switch ;
//開發者可以配置這個里面的屬性,來進行一些設置內部屬性的操作;
if (cf.getDataId().equals(SwitchService.SWITCH_META_DATAID)) {
SwitchService.load(cf.getContent());
}
//dump方法里面會校驗MD5是否改變並更新,真正的磁盤寫入操作
boolean result = ConfigCacheService.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified(), cf.getType());
}
} else {
lastMaxId += PAGE_SIZE;
}
return true;
}
ConfigService.dump()方法真正的磁盤寫入操作;
這個方法首先將配置保存到磁盤文件中,並且緩存配置信息的MD5到內存中;
如果配置信息不一致(MD5不一致),則將會發送一個通知事件 LocalDataChangeEvent告知本地數據有更改;
CacheItem 是配置信息的對象;保存着配置信息的一些信息,但是沒有保存Content,只保存了content的MD5;
/*** Save config file and update md5 value in cache. */
public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
String type) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
//如果內存中沒有當前配置的緩存 CacheItem,則組裝對象保存進去;這時md5是空字符串;
CacheItem ci = makeSure(groupKey);
ci.setType(type);
final int lockResult = tryWriteLock(groupKey); //獲取寫鎖,沒獲取到則報錯或返回false
//計算content的MD5
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
...
} else if (!PropertyUtil.isDirectRead()) {
//上面計算的md5跟內存 CacheItem 中的md5做比較(第一次肯定不相等),如果不相等則將文件保存到磁盤中;
DiskUtil.saveToDisk(dataId, group, tenant, content);
}
//updateMd5方法中,如果MD5不相同,則更新 CacheItem 中的MD5屬性和lastModifiedTs屬性;
// lastModifiedTs是表示最后更新時間; 如果MD5不相同,還要發送通知告知數據有變更;
updateMd5(groupKey, md5, lastModifiedTs);
finally { releaseWriteLock(groupKey); }
}
updateMd5方法:
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (cache.md5 == null || !cache.md5.equals(md5)) {
cache.md5 = md5;
cache.lastModifiedTs = lastModifiedTs;
NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
}
}
LongPollingService監聽LocalDataChangeEvent事件
@Override
public void onEvent(Event event) {
//SwitchService配置中的一個屬性 isFixedPolling; 是否固定長輪詢
if (isFixedPolling()) {
// Ignore.
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
最終執行DataChangeTask
class DataChangeTask implements Runnable {
@Override
public void run() {
ConfigCacheService.getContentBetaMd5(groupKey);
//1.遍歷所有的長輪詢訂閱者者
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
continue; // 2. 如果是beta發布且不在beta列表直接跳過
}
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
continue; // 3.如果tag發布且不在tag列表直接跳過
}
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // Delete subscribers' relationships.
//4.發送Http請求通知所有未被上面2、3過濾掉的的訂閱者最新的配置數據ConfigInfo
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
}
}
1.2.5 DumpChangeProcessor.process()-非全量dump
即Dump有變化的數據的執行器;
public class DumpChangeProcessor implements NacosTaskProcessor {
@Override
public boolean process(NacosTask task) {
long startUpdateMd5 = System.currentTimeMillis();
//1.查詢數據庫所有的配置文件
List<ConfigInfoWrapper> updateMd5List = persistService.listAllGroupKeyMd5();
//2.將所有的配置文件緩存到內存中,並通知所有訂閱的客戶端
for (ConfigInfoWrapper config : updateMd5List) {
final String groupKey = GroupKey2.getKey(config.getDataId(), config.getGroup());
ConfigCacheService.updateMd5(groupKey, config.getMd5(), config.getLastModified());
}
long endUpdateMd5 = System.currentTimeMillis();
long startDeletedConfigTime = System.currentTimeMillis();
//3. 從 his_config_info 歷史表中找到從上一次心跳時間(heartBeat.txt)到現在的所有被刪除記錄,
// his_config_info 記錄的就是歷史的配置文件;
List<ConfigInfo> configDeleted = persistService.findDeletedConfig(startTime, endTime);
//4. 遍歷拿到的歷史配置數據的dataId,group,Tenant;然后去config_info表中查找能不能查到數據
// 如果能查到,說明配置不是被刪除了,只是修改了content;
// 如果不能查到,說明整個配置文件都被刪除了;則將磁盤對應的配置文件刪除;
並且通知訂閱的客戶端數據變更;
for (ConfigInfo configInfo : configDeleted) {
if (persistService.findConfigInfo(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant())
== null) {
ConfigCacheService.remove(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant());
}
}
long endDeletedConfigTime = System.currentTimeMillis();
final long startChangeConfigTime = System.currentTimeMillis();
//5. config_info表中查找 從上一次心跳時間(heartBeat.txt)到現在的所有有被修改過的配置數據,
// 然后執行 ConfigService.dumpChange 將這個改過的配置Dump的磁盤中,並通知;
List<ConfigInfoWrapper> changeConfigs = persistService.findChangeConfig(startTime, endTime);
for (ConfigInfoWrapper cf : changeConfigs) {
boolean result = ConfigCacheService.dumpChange(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified());
}
//6. load Nacos內置的一些DataId配置:ClientIpWhiteList,AggrWhitelist,SwitchService
ConfigCacheService.reloadConfig();
long endChangeConfigTime = System.currentTimeMillis();
return true;
}
}
每隔12個小時全量Dump一次數據
ConfigExecutor.scheduleConfigTask(checkMd5Task, 0, 12, TimeUnit.HOURS);
public static void scheduleConfigTask(Runnable command, long initialDelay, long delay, TimeUnit unit) {
TIMER_EXECUTOR.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
另外,還有部分邏輯在 1.2.2 dumpOperate()方法中;
二.客戶端讀取配置
2.1 服務啟動相關配置
spring-cloud-alibaba-nacos-config工程中 META-INF\spring.factories文件注入的類;
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.NacosConfigAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosConfigEndpointAutoConfiguration
org.springframework.boot.diagnostics.FailureAnalyzer=\
com.alibaba.cloud.nacos.diagnostics.analyzer.NacosConnectionFailureAnalyzer
2.1.1 NacosConfigBootstrapConfiguration
nacosConfigProperties實例和nacosPropertySourceLocator實例注入。
@Configuration
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigBootstrapConfiguration {
@Bean
@ConditionalOnMissingBean
public NacosConfigProperties nacosConfigProperties() {
return new NacosConfigProperties();
}
@Bean
public NacosPropertySourceLocator nacosPropertySourceLocator(
NacosConfigProperties nacosConfigProperties) {
return new NacosPropertySourceLocator(nacosConfigProperties);
}
}
2.1.2 NacosPropertySourceLocator
實現接口PropertySourceLocator;
locate()
先准備設置一堆屬性,然后進行共享配置和額外配置的加載,主要是loadApplicationConfiguration。
@Override
public PropertySource<?> locate(Environment env) {
//獲取配置服務實例,NacosFactory根據properties反射方式創建
ConfigService configService = nacosConfigProperties.configServiceInstance();
long timeout = nacosConfigProperties.getTimeout();
nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout); //屬性源建造器,timeout為超時30秒
String name = nacosConfigProperties.getName(); //dataid的名字
String dataIdPrefix = nacosConfigProperties.getPrefix(); //前綴
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = name;
}
//前綴為空的話默認就是spring.application.name
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = env.getProperty("spring.application.name");
}
//創建符合屬性源
CompositePropertySource composite = new CompositePropertySource(NACOS_PROPERTY_SOURCE_NAME);
loadSharedConfiguration(composite); //共享配置,主要是默認組里面的
loadExtConfiguration(composite); // 額外配置
loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
return composite;
}
loadApplicationConfiguration()
先獲取配置的擴展名和分組,根據分組配置文件加載和激活的環境加載,也就是我們經常用激活環境的配置文件xxx-dev-yaml這種。
加載后的信息都要放入CompositePropertySource符合屬性里並返回。
private void loadApplicationConfiguration(
CompositePropertySource compositePropertySource, String dataIdPrefix,
NacosConfigProperties properties, Environment environment) {
String fileExtension = properties.getFileExtension(); //擴展名,比如yml
String nacosGroup = properties.getGroup(); //分組,默認DEFAULT_GROUP
//根據分組配置文件加載
loadNacosDataIfPresent(compositePropertySource,
dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);
for (String profile : environment.getActiveProfiles()) { //有環境配置的更高級別,比如dev,prod
String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;
//加載環境配置
loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup,
fileExtension, true);
}
}
loadNacosDataIfPresent方法
private void loadNacosDataIfPresent(
final CompositePropertySource composite,
final String dataId, final String group, String fileExtension,
boolean isRefreshable) {
if (NacosContextRefresher.getRefreshCount() != 0) { //刷新過了
NacosPropertySource ps;
if (!isRefreshable) { //不刷新,直接緩存取
ps = NacosPropertySourceRepository.getNacosPropertySource(dataId);
}
else {
ps = nacosPropertySourceBuilder.build(dataId, group, fileExtension, true);
}
composite.addFirstPropertySource(ps);
}
else {
NacosPropertySource ps = nacosPropertySourceBuilder.build(dataId, group,
fileExtension, isRefreshable);
composite.addFirstPropertySource(ps);
}
}
NacosPropertySourceBuilder的build方法
先加載數據,然后結果封裝成NacosPropertySource,放進緩存(並發hashmap)。
NacosPropertySource build(String dataId, String group, String fileExtension, boolean isRefreshable) {
Properties p = loadNacosData(dataId, group, fileExtension);
NacosPropertySource nacosPropertySource = new NacosPropertySource(group, dataId,
propertiesToMap(p), new Date(), isRefreshable);
NacosPropertySourceRepository.collectNacosPropertySources(nacosPropertySource);
return nacosPropertySource;
}
loadNacosData()
用NacosConfigService來加載,加載到了就解析成LinkedHashMap返回,否則就是個空的LinkedHashMap。
private Properties loadNacosData(String dataId, String group, String fileExtension) {
String data = null;
//獲取到的是字符串,需要后面根據文件類型解析
data = configService.getConfig(dataId, group, timeout);
if (!StringUtils.isEmpty(data)) {
//properties格式配置文件
if (fileExtension.equalsIgnoreCase("properties")) {
Properties properties = new Properties();
properties.load(new StringReader(data));
return properties;
}
//yml和yaml格式配置文件
else if (fileExtension.equalsIgnoreCase("yaml")
|| fileExtension.equalsIgnoreCase("yml")) {
YamlPropertiesFactoryBean yamlFactory = new YamlPropertiesFactoryBean();
yamlFactory.setResources(new ByteArrayResource(data.getBytes()));
return yamlFactory.getObject();
}
return EMPTY_PROPERTIES; //返回空的數據
}
2.2 NacosConfigService
NacosConfigService結構在下面小節介紹。
2.2.1 getConfig()
@Override
public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
return getConfigInner(namespace, dataId, group, timeoutMs);
}
2.2.2 getConfigInner()方法
首先優先從本地獲取,再次從網絡獲取,否則從本地快照緩存文件獲取;
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
group = null2defaultGroup(group); //默認組
ParamUtils.checkKeyParam(dataId, group); //檢查參數
ConfigResponse cr = new ConfigResponse(); // 創建響應
cr.setDataId(dataId);
cr.setTenant(tenant);
cr.setGroup(group);
/ 優先使用本地配置,比如C:\Users\600336\nacos\config\fixed-localhost_8848_nacos\data\config-data\目錄中獲取相應配置文件
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
if (content != null) {
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
// 如果不本地不存在的話,就從網絡讀,下一節分析
content = worker.getServerConfig(dataId, group, tenant, timeoutMs);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
//如果上面都沒獲取到,則獲取本地快照緩存文件內容,
//比如 xxx\nacos\config\fixed-localhost_8848_nacos\snapshot\mytest\nacos-config-client.yml
dataId, group, tenant, ContentUtils.truncateContent(content));
content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
2.2.3 ClientWorker.getServerConfig()
用代理請求/v1/cs/configs,傳參數dataId,group,tenant獲取配置文件。
public String getServerConfig(String dataId, String group, String tenant, long readTimeout)
throws NacosException {
... //參數group空則默認組
HttpResult result = null;
List<String> params = null;
if (StringUtils.isBlank(tenant)) {
params = Arrays.asList("dataId", dataId, "group", group);
} else {
params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);
}
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
switch (result.code) {
case HttpURLConnection.HTTP_OK:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
return result.content;
}
}
2.3 NacosConfigService大致結構
圖片來源:https://blog.csdn.net/wangwei19871103/article/details/105738140
由於版本問題,可能結構有區別。
2.3.1 創建配置服務ConfigService
客戶端啟動時候, 前面介紹的NacosPropertySourceLocator.locate()方法中創建,
ConfigService configService = nacosConfigProperties.configServiceInstance();
調用:
此時會初始化一些配置到properties中;
configService = NacosFactory.createConfigService(properties);
其實就是反射出NacosConfigService,然后獲取有參構造方法,反射創建實例。
public static ConfigService createConfigService(Properties properties) throws NacosException {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
return vendorImpl;
}
2.3.2 NacosConfigService構造方法
內部組件有:
ServerHttpAgent : http請求的代理,
MetricsHttpAgent : 包裝了ServerHttpAgent,加了計時的功能,
ClientWorker : 做配置文件檢查。
public NacosConfigService(Properties properties) throws NacosException {
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
encode = Constants.ENCODE; //默認設置utf-8
} else {
encode = encodeTmp.trim();
}
initNamespace(properties);
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
agent.start();
worker = new ClientWorker(agent, configFilterChainManager, properties);
}
2.3.3 ServerHttpAgent
public ServerHttpAgent(Properties properties) throws NacosException {
serverListMgr = new ServerListManager(properties);
init(properties); //設置編碼,密碼,最大重試次數
}
2.3.4 ServerListManager.start()
ServerListManager用來管理注冊中心集群列表;
agent.start(); 最終調用到 ServerListManager的start方法;
public synchronized void start() throws NacosException {
if (isStarted || isFixed) {
return;
}
//這里會創建任務,從nameserver獲取serverlist
GetServerListTask getServersTask = new GetServerListTask(addressServerUrl);
//重試5次
for (int i = 0; i < initServerlistRetryTimes && serverUrls.isEmpty(); ++i) {
//如果有改變則發起ServerlistChangeEvent事件
getServersTask.run();
this.wait((i + 1) * 100L);
if (serverUrls.isEmpty()) { ...拋服務器異常... }
}
//無延遲開始調度,每30秒一次
TimerService.scheduleWithFixedDelay(getServersTask, 0L, 30L, TimeUnit.SECONDS);
isStarted = true;
}
2.4 ClientWorker
參數:
agent:http代理,MetricsHttpAgent對象,
ConfigFilterChainManager : 過濾器管理器 ,默認里面沒有過濾器,可以addFilter自己加。
這里也開啟了一個單線程的執行器,執行checkConfigInfo檢查配置任務,每10毫秒一次,去檢查當前的配置數量,
如果超過一個輪詢任務的限制數量,默認3000個,就開啟一個新的任務去做。
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
init(properties); // 初始化超時時間
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
//有cpu核數的線程,用來做長輪詢的,每次檢查配置,如果LongPollingRunnable任務的配置緩存超過一定數量,
// 默認3000個,就要去開啟一個新任務去檢查配置
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
//配置檢查
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
checkConfigInfo();
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
checkConfigInfo()方法后續會分析到;
參考:
https://blog.csdn.net/wangwei19871103/article/details/105814924 ,
https://blog.csdn.net/somenzz/article/details/100518028 ,
https://www.liaochuntao.cn/2019/09/16/java-web-54/ ,
https://www.liaochuntao.cn/categories/nacos/ ,
https://blog.csdn.net/wangwei19871103/article/details/105729211 ,
官方api : https://nacos.io/zh-cn/docs/open-api.html