Nacos Client配置機制
spring boot加載遠程配置
在了解NACOS客戶端配置之前,我們先看看spring boot怎么樣加載遠程配置的。spring boot提供了加載遠程配置的擴展接口 PropertySourceLocator。下面看個簡單的例子:
實現PropertySourceLocator
public class GreizPropertySourceLocator implements PropertySourceLocator {
@Override
public PropertySource<?> locate(Environment environment) {
// 自定義配置,來源可以從任何地方
Map<String, Object> source = new HashMap<>();
source.put("userName", "Greiz");
source.put("userAge", 18);
return new MapPropertySource(GreizPropertySource.PROPERTY_NAME, source);
}
}
PropertySourceLocator 只有一個接口,我們可以在該接口實現自定義配置的加載,比如從數據庫中獲取配置,或者文件中獲取配置等。
springboot啟動配置類
@Configuration
public class GreizConfigBootstrapConfiguration {
@Bean
public GreizPropertySourceLocator greizPropertySourceLocator() {
return new GreizPropertySourceLocator();
}
}
在META-INF/spring.factories添加啟動指定加載類
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.greiz.demo.config.GreizConfigBootstrapConfiguration
使用
@Component
public class Greiz {
@Value("${userName}")
private String name;
@Value("${userAge}")
private Integer age;
// 省getter/setter
}
跟本地配置一樣使用。
spring啟動加載遠程配置流程
在spring啟動prepareContext階段會執行PropertySourceLocator所有實現類加載自定義的配置,最終添加到Environment中管理。
nacos-client
拉取遠程配置
nacos客戶端啟動時加載遠程配置就是用了上面的方式。下面我們根據源碼看一下具體過程。NacosPropertySourceLocator 實現了 PropertySourceLocator,所以spring啟動時會調用locate方法。
public PropertySource<?> locate(Environment env) {
// 1. 創建一個跟遠程打交道的對象NacosConfigService
ConfigService configService = nacosConfigProperties.configServiceInstance();
... 省略代碼
// 2. 操作NacosPropertySource對象,下面三個方法最終都會調用該對象build
nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout);
// 3.
String name = nacosConfigProperties.getName();
String dataIdPrefix = nacosConfigProperties.getPrefix();
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = name;
}
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = env.getProperty("spring.application.name");
}
// 從遠程獲取的properties會存放到該類,最終放到Environment中
CompositePropertySource composite = new CompositePropertySource(NACOS_PROPERTY_SOURCE_NAME);
// 加載公共模塊配置
loadSharedConfiguration(composite);
// 加載擴展配置
loadExtConfiguration(composite);
// 加載獨有配置
loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
return composite;
}
1處 - 創建 ConfigService 對象,是通過反射創建出 NacosConfigService 實例。該類是Nacos Client 跟 Nacos Server 重要的對接者。后面會圍繞該類細講。
2處 - 創建 NacosPropertySourceBuilder 實例,用於構建和緩存 NacosPropertySource,刷新時會用到此處緩存。
3處 - 加載配置的順序,公共配置 -> 擴展配置 -> 私有配置,如果有相同key的后面的覆蓋前面的。默認的 Data ID 生成規則 ${spring.application.name}.properties。
加載三種配置最終都會調用 NacosPropertySourceBuilder.build() 方法。
NacosPropertySource build(String dataId, String group, String fileExtension, boolean isRefreshable) {
// 加載配置
Properties p = loadNacosData(dataId, group, fileExtension);
NacosPropertySource nacosPropertySource = new NacosPropertySource(group, dataId, propertiesToMap(p), new Date(), isRefreshable);
// 緩存nacosPropertySource
NacosPropertySourceRepository.collectNacosPropertySources(nacosPropertySource);
return nacosPropertySource;
}
加載配置后封裝nacosPropertySource,並緩存。
主要邏輯在 NacosPropertySourceBuilder.loadNacosData() 中。
private Properties loadNacosData(String dataId, String group, String fileExtension) {
// 獲取配置
String data = configService.getConfig(dataId, group, timeout);
... 省略代碼
// .properties擴展名
if (fileExtension.equalsIgnoreCase("properties")) {
Properties properties = new Properties();
properties.load(new StringReader(data));
return properties;
} else if (fileExtension.equalsIgnoreCase("yaml") || fileExtension.equalsIgnoreCase("yml")) {// .yaml或者.yml擴展名
YamlPropertiesFactoryBean yamlFactory = new YamlPropertiesFactoryBean();
yamlFactory.setResources(new ByteArrayResource(data.getBytes()));
return yamlFactory.getObject();
}
return EMPTY_PROPERTIES;
}
把遠程獲取到的數據根據擴展名解析成統一的properties。nacos控制台配置支持properties和yaml兩個擴展名。
真正獲取遠程配置的是 NacosConfigService.getConfig(), 調用getConfigInner()。
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
group = null2defaultGroup(group);
ParamUtils.checkKeyParam(dataId, group);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setTenant(tenant);
cr.setGroup(group);
// 1. 優先使用failvoer配置
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
if (content != null) {
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
try {
// 2. 服務器獲取配置
content = worker.getServerConfig(dataId, group, tenant, timeoutMs);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
} catch (NacosException ioe) {
if (NacosException.NO_RIGHT == ioe.getErrCode()) {
throw ioe;
}
}
// 3. 當服務器掛了就拿本地快照
content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
1處 - 優先從failvoer獲取配置,該文件是怎么樣產生的,我暫時還不是很清楚,后面搞懂補充。
2處 - 從nacos服務中獲取配置。
3處 - 如果2失敗了就從本地快照文件獲取。該文件由首次讀取遠程配置文件生成,並且之后輪詢配置更新時如果有更新也會對應更新該文件。
訪問服務接口的臟活當然需要一個客戶端工作者ClientWorker,下面是 NacosConfigService.getConfig() 中調用 ClientWorker.getServerConfig()。
public String getServerConfig(String dataId, String group, String tenant, long readTimeout)
throws NacosException {
// 就是這么簡單http請求獲取的配置
HttpResult result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
... 省略代碼
// 寫本地文件快照
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
...省略代碼
return result.content;
}
看了上面獲取遠程配置的代碼是不是想喊出f**k,怎么這么簡單!!!是的,用http請求 http://ip:port/v1/cs/configs 接口,跟nacos控制台頁面訪問是一樣的。
到此Nacos Client啟動讀取遠程配置並封裝到Environment結束了。
長輪詢獲取更新
前一小節是對項目啟動時Nacos Client加載遠程配置過程分析,本節將對項目運行中配置改變了Nacos Client是怎么樣悉知的分析。
前面提到 NacosConfigService 是 Nacos Client 對接 Nacos Server 的橋梁,下面看一下該類在配置更新過程怎么樣運作的。先看一下 NacosConfigService 的構造方法。
public NacosConfigService(Properties properties) throws NacosException {
... 省略代碼
// 初始化 namespace
initNamespace(properties);
// 查詢服務列表變化情況
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
agent.start();
// 配置更新解決方案在這里面
worker = new ClientWorker(agent, configFilterChainManager, properties);
}
在構造函數中初始化 encode、namespace、HttpAgent 和 ClientWorker。
HttpAgent 是通過http獲取服務地址列表代理類,維護這服務地址列表和客戶端本地一致。
ClientWorker 是維護服務端配置和客戶端配置一致的工作者。前面初始化獲取遠程配置時也是該對象。
ClientWorker 內部是怎么樣維護客戶端屬性更新呢?看一下 ClientWorker 構造函數干了啥。
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
...省略代碼
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
...省略代碼
});
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
...省略代碼
});
// 每10毫秒檢查一遍配置
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
ClientWorker 構造函數創建了兩個線程池。executor 創建了一個定時任務,每10毫秒執行一次 checkConfigInfo(); executorService 作用是什么我們接着往下看。
public void checkConfigInfo() {
// 分任務 向上取整為批數
int listenerSize = cacheMap.get().size();
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
以分段方式把任務拆分交給 executorService 執行,默認3000個配置在一個任務中。executor 和 executorService 是不是很像 Netty 中的 boos 和 worker? Reactor 模式,分工明確。
LongPollingRunnable 是 ClientWorker 一個成員類,實現 Runnable 接口。看一下 run() 方法。
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// 1. 只處理該任務中的配置並且檢查failover配置
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// 2. 把客戶端的MD5值跟服務端的MD5比較,把不一樣的配置以 "example.properties+DEFAULT_GROUP"方式返回
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
// 3. 把有更新的配置重新從服務端拉取配置內容
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
String content = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
// 修改客戶端本地值並且重新計算該對象的md5值
cache.setContent(content);
} catch (NacosException ioe) {
...省略代碼
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
// 4. 根據md5值檢查是否更新,如果更新通知listener
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
// 5. 又把this放進線程池中,形成一個長輪詢檢查客戶端和服務端配置一致性
executorService.execute(this);
} catch (Throwable e) {
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
1處 - 篩選屬於該任務的配置,並檢查 failover 配置。
2處 - 把配置以"dataId group MD5 tenant\r\n"拼接后當做參數請求服務器 http://ip:port/v1/cs/configs/listener 接口。服務器返回有更新的配置,以 "example.properties+DEFAULT_GROUP"方式返回
3處 - 根據2處返回的列表遍歷請求服務器 http://ip:port/v1/cs/configs 接口,獲取最新配置。然后更新CacheData content值並更新md5值。
4處 - 把 CacheData 新的md5值跟之前的做比較,如果不一樣就通知監聽者更新值。下一節會跟進去詳解。
5處 - 把該 Runnable 對象重新放入線程池,形成一個長輪詢。
本節分析了 Nacos Client 配置是怎么樣保持跟服務器接近實時同步的。通過長輪詢+http短連接方式。
刷新值
在開始本節之前,我們先看一下上面多次出現的一個類 CacheData 結構。
public class CacheData {
private final String name;
private final ConfigFilterChainManager configFilterChainManager;
public final String dataId;
public final String group;
public final String tenant;
// 監聽列表
private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
// 內容md5值
private volatile String md5;
// 是否使用本地配置
private volatile boolean isUseLocalConfig = false;
// 本地版本號
private volatile long localConfigLastModified;
private volatile String content;
// 長輪詢中分段任務ID
private int taskId;
private volatile boolean isInitializing = true;
...省略代碼
}
根據名字可以得知, CacheData 是配置數據緩存中的對象。listeners 屬性比較有意思,在 BO 中擁有一個監聽列表,當該對象md5改變時會通過遍歷 listeners 通知監聽者們。
前一節從服務端獲取到有更新的配置之后會檢查md5,調用 CacheData.checkListenerMd5()方法:
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
safeNotifyListener(dataId, group, content, md5, wrap);
}
}
}
class ManagerListenerWrap {
final Listener listener;
String lastCallMd5 = CacheData.getMd5String(null);
... 省略代碼
}
ManagerListenerWrap 的 lastCallMd5 是舊配置的md5值,如果 CacheData 的md5和 ManagerListenerWrap 的lastCallMd5 值不一樣,說明配置有更新。需要通知未更新的監聽者。
private void safeNotifyListener(final String dataId, final String group, final String content, final String md5, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener;
Runnable job = new Runnable() {
@Override
public void run() {
... 省略代碼
// 調用監聽者的方法
listener.receiveConfigInfo(contentTmp);
listenerWrap.lastCallMd5 = md5;
... 省略代碼
}
};
try {
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
job.run();
}
} catch (Throwable t) {
}
}
調用了監聽者的 receiveConfigInfo() 方法,然后修改 ManagerListenerWrap 的lastCallMd5 值。
本節到這里分析了從服務端獲取更新配置后通知配置監聽者。但是監聽者是什么時候注冊的呢?接下來我們繼續分析監聽者注冊到 CacheData 過程。
NacosContextRefresher 實現了ApplicationListener
private void registerNacosListener(final String group, final String dataId) {
Listener listener = listenerMap.computeIfAbsent(dataId, i -> new Listener() {
// 通知監聽者調用的就是這個方法啦
@Override
public void receiveConfigInfo(String configInfo) {
refreshCountIncrement();
String md5 = "";
if (!StringUtils.isEmpty(configInfo)) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md5 = new BigInteger(1, md.digest(configInfo.getBytes("UTF-8"))).toString(16);
}
catch (NoSuchAlgorithmException | UnsupportedEncodingException e) {
log.warn("[Nacos] unable to get md5 for dataId: " + dataId, e);
}
}
refreshHistory.add(dataId, md5);
// spring的刷新事件通知,刷新監聽者會被執行
applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config"));
}
@Override
public Executor getExecutor() {
return null;
}
});
// 注冊本監聽者
configService.addListener(dataId, group, listener);
...省略代碼
}
通過 NacosConfigService.addListener()注冊監聽者。
NacosConfigService.addListener():
public void addListener(String dataId, String group, Listener listener) throws NacosException {
worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}
還是交給了 ClientWorker
ClientWorker.addTenantListeners()
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
group = null2defaultGroup(group);
String tenant = agent.getTenant();
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
for (Listener listener : listeners) {
cache.addListener(listener);
}
}
ClientWorker 把監聽者交給了 CacheData 完成了注冊。
匯總系統運行中更新配置的流程:
- 啟動時把本地更新 Listener 注冊到 CacheData。
- ClientWorker 長輪詢同步服務端的更新配置。
- 2中獲取到更新后的配置,重置 CacheData 內容。
- CacheData 回調1中注冊上來的 Listener.receiveConfigInfo()
- Listener 最終通知spring刷新事件,完成Context刷新屬性值。
總結
Nacos Config Client 和 Nacos Config Server 采用定時長輪詢http請求訪問配置更新,這樣設計 Nacos Config Server 和 Config Client 結構簡單。Server 也沒有長連接模式Client過多的壓力。
我的博客即將同步至騰訊雲+社區,邀請大家一同入駐:https://cloud.tencent.com/developer/support-plan?invite_code=16l9glm94a1q9