Nacos配置中心源碼分析


1.使用

compile 'com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-config:2.2.3.RELEASE'
spring:
  application:
    name: product
  cloud:
    nacos:
      config:
        server-addr: 127.0.0.1:9000
        namespace: 038b8be8-54da-44a5-9664-def33bc8cd19
        group: DEFAULT_GROUP
        prefix: ${spring.application.name}
     file-extension: yaml

Nacos配置文件命名規則
${prefix}-${spring.profile.active}.${file-extension}

prefix:服務名稱,對應spring:application:name

spring.profile.active:配置環境

file-extension:配置文件類型,默認是properties

2.配置類加載

NacosConfigBootstrapConfiguration配置文件的加載

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigBootstrapConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public NacosConfigProperties nacosConfigProperties() {
        return new NacosConfigProperties();
    }

    @Bean
    @ConditionalOnMissingBean
    public NacosConfigManager nacosConfigManager(
            NacosConfigProperties nacosConfigProperties) {
        return new NacosConfigManager(nacosConfigProperties);
    }

    @Bean
    public NacosPropertySourceLocator nacosPropertySourceLocator(
            NacosConfigManager nacosConfigManager) {
        return new NacosPropertySourceLocator(nacosConfigManager);
    }
}

NacosConfigManager

public NacosConfigManager(NacosConfigProperties nacosConfigProperties) {
    this.nacosConfigProperties = nacosConfigProperties;
    createConfigService(nacosConfigProperties);
}

// 創建ConfigService 配置服務
static ConfigService createConfigService(
        NacosConfigProperties nacosConfigProperties) {
    if (Objects.isNull(service)) {
     // 加鎖防止多線程多次創建
        synchronized (NacosConfigManager.class) {
            try {
                if (Objects.isNull(service)) {
                    service = NacosFactory.createConfigService(
                 // 完成一些配置的初始化
                            nacosConfigProperties.assembleConfigServiceProperties());
                }
            }
            ...
        }
    }
    return service;
}
public static ConfigService createConfigService(Properties properties) throws NacosException {
    return ConfigFactory.createConfigService(properties);
}
// 通過反射創建實例 :NacosConfigService
public static ConfigService createConfigService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}

NacosConfigService

NacosConfigService有參構造方法 public NacosConfigService(Properties properties) throws NacosException {
    // 檢查上下文路徑
    ValidatorUtils.checkInitParam(properties);
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        this.encode = Constants.ENCODE;
    } else {
        this.encode = encodeTmp.trim();
    }
    // 設置命名空間配置
    initNamespace(properties);
    // 安全驗證
    this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    this.agent.start();
    this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}

// SecurityProxy用來安全驗證的,默認沒有用戶名就沒啟用,登錄驗證直接返回true
public ServerHttpAgent(Properties properties) throws NacosException {
    this.serverListMgr = new ServerListManager(properties);
    this.securityProxy = new SecurityProxy(properties, NACOS_RESTTEMPLATE);
    this.namespaceId = properties.getProperty(PropertyKeyConst.NAMESPACE);
    init(properties);
    this.securityProxy.login(this.serverListMgr.getServerUrls());
    // 初始化執行服務
    this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.config.security.updater");
            t.setDaemon(true);
            return t;
        }
    });
    // 固定每5秒執行登錄驗證任務
    this.executorService.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            securityProxy.login(serverListMgr.getServerUrls());
        }
    }, 0, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
}

ClientWorker

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
        final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager;
    // 初始化超時時間和重試時間
    init(properties);
    // 傳進來的安全驗證
    this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });
    // 長輪詢
    this.executorService = Executors
            .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                    t.setDaemon(true);
                    return t;
                }
            });
    // 檢查配置任務,每10毫秒一次
    this.executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}
// 檢查配置信息
public void checkConfigInfo() {
    // 分派任務
    int listenerSize = cacheMap.get().size();
    // 把長任務數匯總起來。
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // The task list is no order.So it maybe has issues when changing.
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

LongPollingRunnable

@Override
public void run() {
    List<CacheData> cacheDatas = new ArrayList<CacheData>();
    List<String> inInitializingCacheList = new ArrayList<String>();
    try {
        // 檢查故障轉移配置
        for (CacheData cacheData : cacheMap.get().values()) {
            if (cacheData.getTaskId() == taskId) {
                cacheDatas.add(cacheData);
                try {
                    // 檢查本地配置
                    checkLocalConfig(cacheData);
                    if (cacheData.isUseLocalConfigInfo()) {
                        cacheData.checkListenerMd5();
                    }
                } catch (Exception e) {
                    LOGGER.error("get local config info error", e);
                }
            }
        }
        // 檢查服務配置,檢查是否沒有使用本地配置信息,如果沒有使用本地配置信息則請求獲取服務端配置信息
        List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
        if (!CollectionUtils.isEmpty(changedGroupKeys)) {
            LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
        }
        for (String groupKey : changedGroupKeys) {
            String[] key = GroupKey.parseKey(groupKey);
            String dataId = key[0];
            String group = key[1];
            String tenant = null;
            if (key.length == 3) {
                tenant = key[2];
            }
            try {
                // 獲取服務端配置信息
                String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
          // 更新緩存配置信息,此時緩存的MD5已經改變 cache.setContent(ct[
0]); if (null != ct[1]) { cache.setType(ct[1]); } LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]); } catch (NacosException ioe) { String message = String .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { // 如果輪訓任務異常,將對該任務下一次執行時間進行處罰 LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } }

 CacheData

void checkListenerMd5() {
    for (ManagerListenerWrap wrap : listeners) {
        // 如果監聽器中的緩存CacheMap的Md5與當前CacheMap的Md5不一致,則通知監聽器配置信息已經變更
        if (!md5.equals(wrap.lastCallMd5)) {
            safeNotifyListener(dataId, group, content, type, md5, wrap);
        }
    }
}
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
        final String md5, final ManagerListenerWrap listenerWrap) {
    final Listener listener = listenerWrap.listener;
    
    Runnable job = new Runnable() {
        @Override
        public void run() {
            ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
            ClassLoader appClassLoader = listener.getClass().getClassLoader();
            try {
                if (listener instanceof AbstractSharedListener) {
                    AbstractSharedListener adapter = (AbstractSharedListener) listener;
                    adapter.fillContext(dataId, group);
                    LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                }
                // 執行回調之前先將線程classloader設置為具體webapp的classloader,以免回調方法中調用spi接口是出現異常或錯用(多應用部署才會有該問題)。
                Thread.currentThread().setContextClassLoader(appClassLoader);
                
                ConfigResponse cr = new ConfigResponse();
                cr.setDataId(dataId);
                cr.setGroup(group);
                cr.setContent(content);
                configFilterChainManager.doFilter(null, cr);
                String contentTmp = cr.getContent();
          // 接收配置信息 listener.receiveConfigInfo(contentTmp);
// 比較緩存中的內容,並更新Md5值 if (listener instanceof AbstractConfigChangeListener) { Map data = ConfigChangeHandler.getInstance() .parseChangeData(listenerWrap.lastContent, content, type); ConfigChangeEvent event = new ConfigChangeEvent(data); ((AbstractConfigChangeListener) listener).receiveConfigChange(event); listenerWrap.lastContent = content; } listenerWrap.lastCallMd5 = md5; LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5, listener); } catch (NacosException ex) { ... } finally { Thread.currentThread().setContextClassLoader(myClassLoader); } } }; final long startNotify = System.currentTimeMillis(); try { if (null != listener.getExecutor()) { listener.getExecutor().execute(job); } else { job.run(); } } catch (Throwable t) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group, md5, listener, t.getCause()); } final long finishNotify = System.currentTimeMillis(); LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ", name, (finishNotify - startNotify), dataId, group, md5, listener); }

總結:

客戶端通過一個定時任務來檢查自己監聽的配置項的數據的,一旦服務端的數據發生變化時,客戶端將會獲取到最新的數據,並將最新的數據保存在一個 CacheData 對象中,然后會重新計算 CacheData 的 md5 屬性的值,此時就會對該 CacheData 所綁定的 Listener 觸發 receiveConfigInfo 回調。

考慮到服務端故障的問題,客戶端將最新數據獲取后會保存在本地的 snapshot 文件中,以后會優先從文件中獲取配置信息的值。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM