nacos提供com.alibaba.nacos.api.config.ConfigService作為客戶端的API用於發布,訂閱,獲取配置信息;
- ConfigService獲取配置信息流程: 優先使用本地配置 --> 從nacos服務器獲取配置 --> 本地快照文件獲取配置
本地文件獲取路徑: ${user.home}\nacos\config${serverName}_nacos\data\config-data\DEFAULT_GROUP\dataId
服務器配置獲取地址:/v1/cs/configs
本地快照文件路徑:${user.home}\nacos\config${serverName}_nacos\snapshot\DEFAULT_GROUP\dataId
public class NacosConfigService implements ConfigService {
/**
* Get config
*
* @param dataId dataId
* @param group group
* @param timeoutMs read timeout
* @return config value
* @throws NacosException NacosException
*/
@Override
public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
return getConfigInner(namespace, dataId, group, timeoutMs);
}
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
group = null2defaultGroup(group);
ParamUtils.checkKeyParam(dataId, group);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setTenant(tenant);
cr.setGroup(group);
// 優先使用本地配置
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
if (content != null) {
LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content));
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
// 從服務器端獲取
try {
String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);
cr.setContent(ct[0]);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
} catch (NacosException ioe) {
LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}", agent.getName(), dataId, group, tenant, ioe.toString());
}
//從服務器獲取失敗后從快照獲取
content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
}
如果成功從服務器段獲取到信息后更新本地快照
public class ClientWorker {
// 從服務器端獲取配置信息
public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException {
String[] ct = new String[2];
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP;
}
HttpResult result = null;
try {
List<String> params = new ArrayList<String>(Arrays.asList("dataId", dataId, "group", group, "tenant", tenant));
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
} catch (IOException e) {
LOGGER.error(message, e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
switch (result.code) {
case HttpURLConnection.HTTP_OK:
//更新本地快照
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
ct[0] = result.content;
return ct;
case HttpURLConnection.HTTP_NOT_FOUND:
//更新本地快照
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
return ct;
...........................
}
}
}
- 配置信息發布
public class NacosConfigService implements ConfigService {
/**
* Publish config.
*
* @param dataId dataId
* @param group group
* @param content content
* @return Whether publish
* @throws NacosException NacosException
*/
private boolean publishConfigInner(String tenant, String dataId, String group, String tag, String appName, String betaIps, String content) throws NacosException {
//組織參數
............................
// POST /v1/cs/configs
String url = Constants.CONFIG_CONTROLLER_PATH;
HttpResult result = null;
try {
result = agent.httpPost(url, headers, params, encode, POST_TIMEOUT);
} catch (IOException ioe) {
return false;
}
if (HttpURLConnection.HTTP_OK == result.code) {
return true;
} else if (HttpURLConnection.HTTP_FORBIDDEN == result.code) {
throw new NacosException(result.code, result.content);
} else {
return false;
}
}
}
- 配置信息監聽
注冊監聽器,監聽某個dataId
客戶端通過長輪詢不斷獲取哪些(dataId)變更列表
遍歷變更的dataId列表依次調用ClientWorker.getServerConfig從服務器端獲取最新信息
回調所有注冊監聽此dataId的所有監聽器
public class NacosConfigService implements ConfigService {
/**
* Add a listener to the configuration, after the server modified the
* configuration, the client will use the incoming listener callback.
* Recommended asynchronous processing, the application can implement the
* getExecutor method in the ManagerListener, provide a thread pool of
* execution. If provided, use the main thread callback, May block other
* configurations or be blocked by other configurations.
*
* @param dataId dataId
* @param group group
* @param listener listener
* @throws NacosException NacosException
*/
@Override
public void addListener(String dataId, String group, Listener listener) throws NacosException {
worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}
}
public class ClientWorker {
/**
* groupKey -> cacheData
*/
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(new HashMap<String, CacheData>());
/**
* dataId對應的所有信息都包裝成CacheData緩存到cacheMap中
* 注冊監聽往CacheData的 private final CopyOnWriteArrayList<ManagerListenerWrap> listeners增加一條記錄
*/
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
group = null2defaultGroup(group);
String tenant = agent.getTenant();
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
for (Listener listener : listeners) {
cache.addListener(listener);
}
}
}
客戶端長輪訓獲取最新配置信息:
- Long-Pulling-Timeout設置nacos服務器端掛起請求的時間。不低於10秒 (Math.max(10000, Long.parseLong(str) - delayTime));
- 如果有第一次獲取數據的配置則設置 Long-Pulling-Timeout-No-Hangup=true時 nacos服務器發現沒有變更的信息也不掛起請求立馬返回,
Long-Pulling-Timeout-No-Hangup != true時 nacos服務器發現沒有變更信息則掛起當前請求默認最長30000毫秒或有變更后才返回。 - 參數Listening-Configs=probeUpdateString表示監聽遍歷cacheDatas拼接的訂閱
class LongPollingRunnable implements Runnable {
@Override
public void run() {
try {
// check server config
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
//遍歷變更的dataId列表
for (String groupKey : changedGroupKeys) {
try {
//從服務器端獲取最新信息
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);
} catch (NacosException ioe) {
LOGGER.error(message, ioe);
}
}
executorService.execute(this);
} catch (Throwable e) {
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
}