nacos--配置中心之客戶端


nacos提供com.alibaba.nacos.api.config.ConfigService作為客戶端的API用於發布,訂閱,獲取配置信息;

  • ConfigService獲取配置信息流程: 優先使用本地配置 --> 從nacos服務器獲取配置 --> 本地快照文件獲取配置
    本地文件獲取路徑: ${user.home}\nacos\config${serverName}_nacos\data\config-data\DEFAULT_GROUP\dataId
    服務器配置獲取地址:/v1/cs/configs
    本地快照文件路徑:${user.home}\nacos\config${serverName}_nacos\snapshot\DEFAULT_GROUP\dataId
public class NacosConfigService implements ConfigService {

    /**
     * Get config
     *
     * @param dataId    dataId
     * @param group     group
     * @param timeoutMs read timeout
     * @return config value
     * @throws NacosException NacosException
     */
      @Override
      public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
        return getConfigInner(namespace, dataId, group, timeoutMs);
      }

      private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
        group = null2defaultGroup(group);
        ParamUtils.checkKeyParam(dataId, group);
        ConfigResponse cr = new ConfigResponse();
        cr.setDataId(dataId);
        cr.setTenant(tenant);
        cr.setGroup(group);

        // 優先使用本地配置
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        if (content != null) {
            LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content));
            cr.setContent(content);
            configFilterChainManager.doFilter(null, cr);
            content = cr.getContent();
            return content;
        }
        
        // 從服務器端獲取
        try {
            String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);
            cr.setContent(ct[0]);
            configFilterChainManager.doFilter(null, cr);
            content = cr.getContent();

            return content;
        } catch (NacosException ioe) {
            LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}", agent.getName(), dataId, group, tenant, ioe.toString());
        }

        //從服務器獲取失敗后從快照獲取
        content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
        cr.setContent(content);
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();
        return content;
    }
}

如果成功從服務器段獲取到信息后更新本地快照

public class ClientWorker {
      // 從服務器端獲取配置信息
      public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException {
        String[] ct = new String[2];
        if (StringUtils.isBlank(group)) {
            group = Constants.DEFAULT_GROUP;
        }

        HttpResult result = null;
        try {
            List<String> params = new ArrayList<String>(Arrays.asList("dataId", dataId, "group", group, "tenant", tenant));
            result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
        } catch (IOException e) {
            LOGGER.error(message, e);
            throw new NacosException(NacosException.SERVER_ERROR, e);
        }

        switch (result.code) {
            case HttpURLConnection.HTTP_OK:
                //更新本地快照
                LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
                ct[0] = result.content;
                return ct;
            case HttpURLConnection.HTTP_NOT_FOUND:
                //更新本地快照
                LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
                return ct;
           ...........................
        }
    }
}
  • 配置信息發布
public class NacosConfigService implements ConfigService {

    /**
     * Publish config.
     *
     * @param dataId  dataId
     * @param group   group
     * @param content content
     * @return Whether publish
     * @throws NacosException NacosException
     */
     private boolean publishConfigInner(String tenant, String dataId, String group, String tag, String appName, String betaIps, String content) throws NacosException {
        //組織參數
        ............................
        // POST /v1/cs/configs
        String url = Constants.CONFIG_CONTROLLER_PATH;
        HttpResult result = null;
        try {
            result = agent.httpPost(url, headers, params, encode, POST_TIMEOUT);
        } catch (IOException ioe) {
            return false;
        }

        if (HttpURLConnection.HTTP_OK == result.code) {
            return true;
        } else if (HttpURLConnection.HTTP_FORBIDDEN == result.code) {
            throw new NacosException(result.code, result.content);
        } else {
            return false;
        }
    }
}
  • 配置信息監聽
    注冊監聽器,監聽某個dataId
    客戶端通過長輪詢不斷獲取哪些(dataId)變更列表
    遍歷變更的dataId列表依次調用ClientWorker.getServerConfig從服務器端獲取最新信息
    回調所有注冊監聽此dataId的所有監聽器
public class NacosConfigService implements ConfigService {
   /**
     * Add a listener to the configuration, after the server modified the
     * configuration, the client will use the incoming listener callback.
     * Recommended asynchronous processing, the application can implement the
     * getExecutor method in the ManagerListener, provide a thread pool of
     * execution. If provided, use the main thread callback, May block other
     * configurations or be blocked by other configurations.
     *
     * @param dataId   dataId
     * @param group    group
     * @param listener listener
     * @throws NacosException NacosException
     */
    @Override
    public void addListener(String dataId, String group, Listener listener) throws NacosException {
        worker.addTenantListeners(dataId, group, Arrays.asList(listener));
    }
}
public class ClientWorker {
    /**
     * groupKey -> cacheData
     */
    private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(new HashMap<String, CacheData>());

    /**
     * dataId對應的所有信息都包裝成CacheData緩存到cacheMap中
     * 注冊監聽往CacheData的 private final CopyOnWriteArrayList<ManagerListenerWrap> listeners增加一條記錄
     */
    public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
        group = null2defaultGroup(group);
        String tenant = agent.getTenant();
        CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
        for (Listener listener : listeners) {
            cache.addListener(listener);
        }
    }
}

客戶端長輪訓獲取最新配置信息:

  1. Long-Pulling-Timeout設置nacos服務器端掛起請求的時間。不低於10秒 (Math.max(10000, Long.parseLong(str) - delayTime));
  2. 如果有第一次獲取數據的配置則設置 Long-Pulling-Timeout-No-Hangup=true時 nacos服務器發現沒有變更的信息也不掛起請求立馬返回,
    Long-Pulling-Timeout-No-Hangup != true時 nacos服務器發現沒有變更信息則掛起當前請求默認最長30000毫秒或有變更后才返回。
  3. 參數Listening-Configs=probeUpdateString表示監聽遍歷cacheDatas拼接的訂閱
 class LongPollingRunnable implements Runnable {

        @Override
        public void run() {
            try {
                // check server config
                List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
                //遍歷變更的dataId列表
                for (String groupKey : changedGroupKeys) {
                    try {
                        //從服務器端獲取最新信息
                        String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                        CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                        cache.setContent(ct[0]);
                    } catch (NacosException ioe) {
                        LOGGER.error(message, ioe);
                    }
                }
                executorService.execute(this);

            } catch (Throwable e) {
                executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
            }
        }
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM