一:關於Nacos的思考
首先思考一個問題,Nacos作為配置中心,Nacos 客戶端是怎么實時獲取到 Nacos 服務端的最新數據?
其實客戶端和服務端之間的數據交互,無外乎兩種情況:
1.服務端推數據給客戶端
2.客戶端從服務端拉數據
zk作為配置中心,基於zk的watcher機制,配置發生變化通知客戶端,Nacos也是同樣的原理嗎?
二:Nacos的源碼解析
看看Nacos是如何獲取服務端的最新數據
createConfigService的作用:通過反射創建NacosConfigService

public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); Constructor constructor = driverImplClass.getConstructor(Properties.class); ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } }
NacosConfigService的構造函數
new MetricsHttpAgent(new ServerHttpAgent(properties))作用:MetricsHttpAgent包裝ServerHttpAgent(裝飾器模式)為了做一些統計等功能。
agent.start()作用 : 監聽服務器列表是否變化

public class MetricsHttpAgent implements HttpAgent { private HttpAgent httpAgent; public MetricsHttpAgent(HttpAgent httpAgent) { this.httpAgent = httpAgent; } @Override public void start() throws NacosException { httpAgent.start(); } @Override public HttpResult httpGet(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException { Histogram.Timer timer = MetricsMonitor.getConfigRequestMonitor("GET", path, "NA"); HttpResult result = null; try { result = httpAgent.httpGet(path, headers, paramValues, encoding, readTimeoutMs); } catch (IOException e) { throw e; } finally { timer.observeDuration(); timer.close(); } return result; } @Override public HttpResult httpPost(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException { Histogram.Timer timer = MetricsMonitor.getConfigRequestMonitor("POST", path, "NA"); HttpResult result = null; try { result = httpAgent.httpPost(path, headers, paramValues, encoding, readTimeoutMs); } catch (IOException e) { throw e; } finally { timer.observeDuration(); timer.close(); } return result; } @Override public HttpResult httpDelete(String path, List<String> headers, List<String> paramValues, String encoding, long readTimeoutMs) throws IOException { Histogram.Timer timer = MetricsMonitor.getConfigRequestMonitor("DELETE", path, "NA"); HttpResult result = null; try { result = httpAgent.httpDelete(path, headers, paramValues, encoding, readTimeoutMs); } catch (IOException e) { throw e; } finally { timer.observeDuration(); timer.close(); } return result; } @Override public String getName() { return httpAgent.getName(); } @Override public String getNamespace() { return httpAgent.getNamespace(); } @Override public String getTenant() { return httpAgent.getTenant(); } @Override public String getEncode() { return httpAgent.getEncode(); } }
new ClientWorker(agent, configFilterChainManager, properties)的作用:線程池輪詢服務器

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // Initialize the timeout parameter
init(properties); executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }
checkConfigInfo作用:檢查配置信息

public void checkConfigInfo() { // 分任務
int listenerSize = cacheMap.get().size(); // 向上取整為批數
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // 要判斷任務是否在執行 這塊需要好好想想。 任務列表現在是無序的。變化過程可能有問題
executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }
LongPollingRunnable:run方法是一個長輪詢服務端過程,大致分為四塊部分
第一部分:檢查本地配置相關
checkLocalConfig作用:判斷本地配置是否存在,是否有變更,dataId=“example”和group=“DEFAULT_GROUP”在Windows環境下默認配置路徑(C:\Users\Administrator\nacos\config\fixed-localhost_8848_nacos\data\config-data\DEFAULT_GROUP\example)。

private void checkLocalConfig(CacheData cacheData) { final String dataId = cacheData.dataId; final String group = cacheData.group; final String tenant = cacheData.tenant; File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant); // 沒有 -> 有 if (!cacheData.isUseLocalConfigInfo() && path.exists()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); String md5 = MD5.getInstance().getMD5String(content); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); return; } // 有 -> 沒有。不通知業務監聽器,從server拿到配置后通知。 if (cacheData.isUseLocalConfigInfo() && !path.exists()) { cacheData.setUseLocalConfigInfo(false); LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); return; } // 有變更 if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path.lastModified()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); String md5 = MD5.getInstance().getMD5String(content); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); } }
第二部分:檢查server端的變更
第三部分:如果server端有變更,更新CacheData的content
第四部分:檢查md5的值是否改變,即配置是否改變,選擇喚醒listener回調
checkListenerMd5作用:檢查CacheData的md5和wrap(listener的包裝器)的md5是否一致,如果不一致喚醒回調
sageNotifyListener作用 : 觸發listener的回調函數
配置中心的完整流程已經分析完畢了,可以發現,Nacos 並不是通過推的方式將服務端最新的配置信息發送給客戶端的,而是客戶端維護了一個長輪詢的任務,定時去拉取發生變更的配置信息,然后將最新的數據推送給 Listener 的持有者。
三:Nacos配置中心總結歸納
1. 關閉Nacos服務端,刪除本地配置,啟動測試類,依然能獲取配置信息,為什么呢?
跟蹤代碼 String config = configService.getConfig("example", "DEFAULT_GROUP", 10)
當前環境下快照地址為:C:\Users\Administrator\nacos\config\fixed-localhost_8848_nacos\snapshot\DEFAULT_GROUP\example
2.客戶端拉取服務端的數據與服務端推送數據給客戶端相比,優勢在哪呢,為什么 Nacos 不設計成主動推送數據,而是要客戶端去拉取呢?
如果用推的方式,服務端需要維持與客戶端的長連接,這樣的話需要耗費大量的資源,並且還需要考慮連接的有效性,例如需要通過心跳來維持兩者之間的連接。而用拉的方式,客戶端只需要通過一個無狀態的 http 請求即可獲取到服務端的數據。
3. Nacos實現配置中心的原理?
客戶端是通過一個定時任務來檢查自己監聽的配置項的數據,一旦本地配置或者服務端的數據發生變化時,客戶端將會獲取到最新的數據(跟蹤checkUpdateDataIds方法可以明白),優先本地配置,並將最新的數據保存在一個 CacheData 對象中,然后會重新計算 CacheData 的 md5 屬性的值,此時就會對該 CacheData 所綁定的 Listener 觸發 receiveConfigInfo 回調。考慮到服務端故障的問題,客戶端將最新數據獲取后會保存在本地的 snapshot 文件中。
上述為本人閱讀源碼的理解,可能存在誤差。