nacos--配置中心之客户端


nacos提供com.alibaba.nacos.api.config.ConfigService作为客户端的API用于发布,订阅,获取配置信息;

  • ConfigService获取配置信息流程: 优先使用本地配置 --> 从nacos服务器获取配置 --> 本地快照文件获取配置
    本地文件获取路径: ${user.home}\nacos\config${serverName}_nacos\data\config-data\DEFAULT_GROUP\dataId
    服务器配置获取地址:/v1/cs/configs
    本地快照文件路径:${user.home}\nacos\config${serverName}_nacos\snapshot\DEFAULT_GROUP\dataId
public class NacosConfigService implements ConfigService {

    /**
     * Get config
     *
     * @param dataId    dataId
     * @param group     group
     * @param timeoutMs read timeout
     * @return config value
     * @throws NacosException NacosException
     */
      @Override
      public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
        return getConfigInner(namespace, dataId, group, timeoutMs);
      }

      private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
        group = null2defaultGroup(group);
        ParamUtils.checkKeyParam(dataId, group);
        ConfigResponse cr = new ConfigResponse();
        cr.setDataId(dataId);
        cr.setTenant(tenant);
        cr.setGroup(group);

        // 优先使用本地配置
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        if (content != null) {
            LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content));
            cr.setContent(content);
            configFilterChainManager.doFilter(null, cr);
            content = cr.getContent();
            return content;
        }
        
        // 从服务器端获取
        try {
            String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);
            cr.setContent(ct[0]);
            configFilterChainManager.doFilter(null, cr);
            content = cr.getContent();

            return content;
        } catch (NacosException ioe) {
            LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}", agent.getName(), dataId, group, tenant, ioe.toString());
        }

        //从服务器获取失败后从快照获取
        content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
        cr.setContent(content);
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();
        return content;
    }
}

如果成功从服务器段获取到信息后更新本地快照

public class ClientWorker {
      // 从服务器端获取配置信息
      public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException {
        String[] ct = new String[2];
        if (StringUtils.isBlank(group)) {
            group = Constants.DEFAULT_GROUP;
        }

        HttpResult result = null;
        try {
            List<String> params = new ArrayList<String>(Arrays.asList("dataId", dataId, "group", group, "tenant", tenant));
            result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
        } catch (IOException e) {
            LOGGER.error(message, e);
            throw new NacosException(NacosException.SERVER_ERROR, e);
        }

        switch (result.code) {
            case HttpURLConnection.HTTP_OK:
                //更新本地快照
                LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
                ct[0] = result.content;
                return ct;
            case HttpURLConnection.HTTP_NOT_FOUND:
                //更新本地快照
                LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
                return ct;
           ...........................
        }
    }
}
  • 配置信息发布
public class NacosConfigService implements ConfigService {

    /**
     * Publish config.
     *
     * @param dataId  dataId
     * @param group   group
     * @param content content
     * @return Whether publish
     * @throws NacosException NacosException
     */
     private boolean publishConfigInner(String tenant, String dataId, String group, String tag, String appName, String betaIps, String content) throws NacosException {
        //组织参数
        ............................
        // POST /v1/cs/configs
        String url = Constants.CONFIG_CONTROLLER_PATH;
        HttpResult result = null;
        try {
            result = agent.httpPost(url, headers, params, encode, POST_TIMEOUT);
        } catch (IOException ioe) {
            return false;
        }

        if (HttpURLConnection.HTTP_OK == result.code) {
            return true;
        } else if (HttpURLConnection.HTTP_FORBIDDEN == result.code) {
            throw new NacosException(result.code, result.content);
        } else {
            return false;
        }
    }
}
  • 配置信息监听
    注册监听器,监听某个dataId
    客户端通过长轮询不断获取哪些(dataId)变更列表
    遍历变更的dataId列表依次调用ClientWorker.getServerConfig从服务器端获取最新信息
    回调所有注册监听此dataId的所有监听器
public class NacosConfigService implements ConfigService {
   /**
     * Add a listener to the configuration, after the server modified the
     * configuration, the client will use the incoming listener callback.
     * Recommended asynchronous processing, the application can implement the
     * getExecutor method in the ManagerListener, provide a thread pool of
     * execution. If provided, use the main thread callback, May block other
     * configurations or be blocked by other configurations.
     *
     * @param dataId   dataId
     * @param group    group
     * @param listener listener
     * @throws NacosException NacosException
     */
    @Override
    public void addListener(String dataId, String group, Listener listener) throws NacosException {
        worker.addTenantListeners(dataId, group, Arrays.asList(listener));
    }
}
public class ClientWorker {
    /**
     * groupKey -> cacheData
     */
    private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(new HashMap<String, CacheData>());

    /**
     * dataId对应的所有信息都包装成CacheData缓存到cacheMap中
     * 注册监听往CacheData的 private final CopyOnWriteArrayList<ManagerListenerWrap> listeners增加一条记录
     */
    public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
        group = null2defaultGroup(group);
        String tenant = agent.getTenant();
        CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
        for (Listener listener : listeners) {
            cache.addListener(listener);
        }
    }
}

客户端长轮训获取最新配置信息:

  1. Long-Pulling-Timeout设置nacos服务器端挂起请求的时间。不低于10秒 (Math.max(10000, Long.parseLong(str) - delayTime));
  2. 如果有第一次获取数据的配置则设置 Long-Pulling-Timeout-No-Hangup=true时 nacos服务器发现没有变更的信息也不挂起请求立马返回,
    Long-Pulling-Timeout-No-Hangup != true时 nacos服务器发现没有变更信息则挂起当前请求默认最长30000毫秒或有变更后才返回。
  3. 参数Listening-Configs=probeUpdateString表示监听遍历cacheDatas拼接的订阅
 class LongPollingRunnable implements Runnable {

        @Override
        public void run() {
            try {
                // check server config
                List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
                //遍历变更的dataId列表
                for (String groupKey : changedGroupKeys) {
                    try {
                        //从服务器端获取最新信息
                        String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                        CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                        cache.setContent(ct[0]);
                    } catch (NacosException ioe) {
                        LOGGER.error(message, ioe);
                    }
                }
                executorService.execute(this);

            } catch (Throwable e) {
                executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
            }
        }
    }


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM