nacos提供com.alibaba.nacos.api.config.ConfigService作为客户端的API用于发布,订阅,获取配置信息;
- ConfigService获取配置信息流程: 优先使用本地配置 --> 从nacos服务器获取配置 --> 本地快照文件获取配置
本地文件获取路径: ${user.home}\nacos\config${serverName}_nacos\data\config-data\DEFAULT_GROUP\dataId
服务器配置获取地址:/v1/cs/configs
本地快照文件路径:${user.home}\nacos\config${serverName}_nacos\snapshot\DEFAULT_GROUP\dataId
public class NacosConfigService implements ConfigService {
/**
* Get config
*
* @param dataId dataId
* @param group group
* @param timeoutMs read timeout
* @return config value
* @throws NacosException NacosException
*/
@Override
public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
return getConfigInner(namespace, dataId, group, timeoutMs);
}
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
group = null2defaultGroup(group);
ParamUtils.checkKeyParam(dataId, group);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setTenant(tenant);
cr.setGroup(group);
// 优先使用本地配置
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
if (content != null) {
LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content));
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
// 从服务器端获取
try {
String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);
cr.setContent(ct[0]);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
} catch (NacosException ioe) {
LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}", agent.getName(), dataId, group, tenant, ioe.toString());
}
//从服务器获取失败后从快照获取
content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
}
如果成功从服务器段获取到信息后更新本地快照
public class ClientWorker {
// 从服务器端获取配置信息
public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException {
String[] ct = new String[2];
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP;
}
HttpResult result = null;
try {
List<String> params = new ArrayList<String>(Arrays.asList("dataId", dataId, "group", group, "tenant", tenant));
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
} catch (IOException e) {
LOGGER.error(message, e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
switch (result.code) {
case HttpURLConnection.HTTP_OK:
//更新本地快照
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
ct[0] = result.content;
return ct;
case HttpURLConnection.HTTP_NOT_FOUND:
//更新本地快照
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
return ct;
...........................
}
}
}
- 配置信息发布
public class NacosConfigService implements ConfigService {
/**
* Publish config.
*
* @param dataId dataId
* @param group group
* @param content content
* @return Whether publish
* @throws NacosException NacosException
*/
private boolean publishConfigInner(String tenant, String dataId, String group, String tag, String appName, String betaIps, String content) throws NacosException {
//组织参数
............................
// POST /v1/cs/configs
String url = Constants.CONFIG_CONTROLLER_PATH;
HttpResult result = null;
try {
result = agent.httpPost(url, headers, params, encode, POST_TIMEOUT);
} catch (IOException ioe) {
return false;
}
if (HttpURLConnection.HTTP_OK == result.code) {
return true;
} else if (HttpURLConnection.HTTP_FORBIDDEN == result.code) {
throw new NacosException(result.code, result.content);
} else {
return false;
}
}
}
- 配置信息监听
注册监听器,监听某个dataId
客户端通过长轮询不断获取哪些(dataId)变更列表
遍历变更的dataId列表依次调用ClientWorker.getServerConfig从服务器端获取最新信息
回调所有注册监听此dataId的所有监听器
public class NacosConfigService implements ConfigService {
/**
* Add a listener to the configuration, after the server modified the
* configuration, the client will use the incoming listener callback.
* Recommended asynchronous processing, the application can implement the
* getExecutor method in the ManagerListener, provide a thread pool of
* execution. If provided, use the main thread callback, May block other
* configurations or be blocked by other configurations.
*
* @param dataId dataId
* @param group group
* @param listener listener
* @throws NacosException NacosException
*/
@Override
public void addListener(String dataId, String group, Listener listener) throws NacosException {
worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}
}
public class ClientWorker {
/**
* groupKey -> cacheData
*/
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(new HashMap<String, CacheData>());
/**
* dataId对应的所有信息都包装成CacheData缓存到cacheMap中
* 注册监听往CacheData的 private final CopyOnWriteArrayList<ManagerListenerWrap> listeners增加一条记录
*/
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
group = null2defaultGroup(group);
String tenant = agent.getTenant();
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
for (Listener listener : listeners) {
cache.addListener(listener);
}
}
}
客户端长轮训获取最新配置信息:
- Long-Pulling-Timeout设置nacos服务器端挂起请求的时间。不低于10秒 (Math.max(10000, Long.parseLong(str) - delayTime));
- 如果有第一次获取数据的配置则设置 Long-Pulling-Timeout-No-Hangup=true时 nacos服务器发现没有变更的信息也不挂起请求立马返回,
Long-Pulling-Timeout-No-Hangup != true时 nacos服务器发现没有变更信息则挂起当前请求默认最长30000毫秒或有变更后才返回。 - 参数Listening-Configs=probeUpdateString表示监听遍历cacheDatas拼接的订阅
class LongPollingRunnable implements Runnable {
@Override
public void run() {
try {
// check server config
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
//遍历变更的dataId列表
for (String groupKey : changedGroupKeys) {
try {
//从服务器端获取最新信息
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);
} catch (NacosException ioe) {
LOGGER.error(message, ioe);
}
}
executorService.execute(this);
} catch (Throwable e) {
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
}