一、Nacos簡介
1、Nacos是什么?
Nacos是阿里巴巴開源的一個為微服務提供服務發現、服務配置和服務管理的微服務基礎設施,簡單說就是Nacos為微服務架構提供了分布式配置和服務注冊中心的工作。
2、Nacos有什么功能?
Nacos主要有兩大功能:注冊中心和配置中心
2.1、注冊中心
a.服務發布:服務提供者發布服務到nacos,nacos存儲服務和提供者關系;
b.服務訂閱:服務消費者從nacos訂閱服務,拉去服務提供者信息列表;
c.變更推送:當服務提供者信息變更時,實時通知服務消費者;
d.路由策略:根據不同路由規則,推送不同服務提供者信息給消費者;
e.健康檢測:和服務提供者和服務消費者保持心跳,檢測服務的健康狀態;
2.2、配置中心
a.管理配置:配置的增刪改查管理;
b.監聽配置:客戶端實時監聽配置的更新情況;
c.灰度更新:允許針對部分客戶端進行配置更新;
d.配置快照:客戶端需要緩存配置快照,當nacos服務器不可用時可以使用本地配置,提高整體容災能力。
3、Nacos有哪些概念?
3.1、命名空間(namespace)
命名空間是用於配置和服務的空間隔離,不同命名空間下的數據相互獨立,不同命名空間下可以存在相同配置和相同服務,通常命名空間可用於不同環境。如開發環境、測試環境和生產環境可以通過命名空間來進行區分隔離。
nacos默認有一個保留的命名空間為public,每一個命名空間都有一個唯一的ID,如果沒有手動配置則會自動生產一個。服務管理和配置管理都是在命名空間區域內進行管理,每一個服務和配置都會綁定一個命名空間。
3.2、配置分組(Group)
同一個命名空間下可以有多個應用的配置,每個應用都可能有相同的配置,所以需要有一個分組來將屬於同一個應用的配置進行區分。配置分組不需要單獨管理,在管理配置集時添加配置分組即可。
3.3、配置集(Data)
配置集是一組配置的集合,通常一個配置文件就是一個配置集,每一個配置集都有一個配置集ID叫做Data ID,如和緩存相關配置都可以放在配置集cache.properties中,數據庫配置放在db.properties中。
配置集ID可以重復,但是同一個命名空間下同一個配置分組下的配置集ID不可重復,也就是說命名空間+配置分組+配置集ID可以唯一定位一個配置文件。
3.4、服務
通過預定義接口網絡訪問的提供給客戶端的軟件功能。每個服務都有一個服務名是服務提供的標識,通過該標識可以唯一確定其指代的服務。
3.5、服務注冊
服務提供者將自己提供的服務注冊到nacos,nacos存儲服務和服務提供者關系。
3.6、服務訂閱
服務消費者從nacos上獲取對應服務的服務提供者信息列表
3.7、元數據
Nacos數據(如配置和服務)描述信息,如服務版本、權重、容災策略、負載均衡策略、鑒權配置、各種自定義標簽 (label),從作用范圍來看,分為服務級別的元信息、集群的元信息及實例的元信息。
3.8、權重
實例級別的配置。權重為浮點數。權重越大,分配給該實例的流量越大。
3.9、健康檢查
以指定方式檢查服務下掛載的實例 (Instance) 的健康度,從而確認該實例 (Instance) 是否能提供服務。根據檢查結果,實例 (Instance) 會被判斷為健康或不健康。對服務發起解析請求時,不健康的實例 (Instance) 不會返回給客戶端。
3.10、健康保護閾值
為了防止因過多實例 (Instance) 不健康導致流量全部流向健康實例 (Instance) ,繼而造成流量壓力把健康實例 (Instance) 壓垮並形成雪崩效應,應將健康保護閾值定義為一個 0 到 1 之間的浮點數。當域名健康實例數 (Instance) 占總服務實例數 (Instance) 的比例小於
該值時,無論實例 (Instance) 是否健康,都會將這個實例 (Instance) 返回給客戶端。這樣做雖然損失了一部分流量,但是保證了集群中剩余健康實例 (Instance) 能正常工作。
二、Nacos使用
2.1、Nacos的Open API
Nacos提供了大量的HTTP API,其中包括配置管理、服務管理和命名空間管理等,核心API如下
配置管理 | 獲取配置 | GET | /nacos/v1/cs/configs |
監聽配置 | POST | /nacos/v1/cs/configs/listener | |
發布配置 | POST | /nacos/v1/cs/configs | |
刪除配置 | DELETE | /nacos/v1/cs/configs | |
查詢歷史版本配置 | GET | /nacos/v1/cs/history?search=accurate | |
查詢上一個版本配置 | GET | /nacos/v1/cs/history/previous | |
服務發現 | 注冊實例 | POST | /nacos/v1/ns/instance |
注銷實例 | DELETE | /nacos/v1/ns/instance | |
修改實例 | PUT | /nacos/v1/ns/instance | |
查詢實例列表 | GET | /nacos/v1/ns/instance/list | |
查詢實例詳情 | GET | /nacos/v1/ns/instance | |
發送實例心跳 | PUT | /nacos/v1/ns/instance/beat | |
創建服務 | POST | /nacos/v1/ns/service | |
刪除服務 | DELETE | /nacos/v1/ns/service | |
修改服務 | PUT | /nacos/v1/ns/service | |
查詢服務詳情 | GET | /nacos/v1/ns/service | |
查詢服務列表 | GET | /nacos/v1/ns/service/list | |
查詢系統數據指標 | GET | /nacos/v1/ns/operator/metrics | |
查詢集群服務器列表 | GET | /nacos/v1/ns/operator/servers | |
查詢集群當前Leader | GET | /nacos/v1/ns/raft/leader | |
更新實例健康狀態 | PUT | /nacos/v1/ns/health/instance | |
批量更新實例元數據 | PUT | /nacos/v1/ns/instance/metadata/batch | |
命名空間 | 查詢命名空間列表 | GET | /nacos/v1/console/namespaces |
創建命名空間 | POST | /nacos/v1/console/namespaces | |
修改命名空間 | PUT | /nacos/v1/console/namespaces | |
刪除命名空間 | DELETE | /nacos/v1/console/namespaces |
2.2、JAVA集成Nacos的SDK
Maven依賴
<dependency> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-client</artifactId> <version>${version}</version> </dependency>
2.2.1、配置管理
和配置相關功能都定義在ConfigService接口中,根據NacosFactory可以創建ConfigService對象,調用ConfigService相關方法就可對配置文件進行增刪改查或監聽配置更新,ConfigService相關方法定義如下:
public interface ConfigService { /** * 獲取配置 */ String getConfig(String dataId, String group, long timeoutMs) throws NacosException; /** * 獲取配置並添加監聽器監聽配置變更 */ String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener) throws NacosException; /** * 添加監聽器監聽配置變更 */ void addListener(String dataId, String group, Listener listener) throws NacosException; /** * 發布配置 */ boolean publishConfig(String dataId, String group, String content) throws NacosException; /** * 發布指定類型的配置,如yml、xml、properties、json等 */ boolean publishConfig(String dataId, String group, String content, String type) throws NacosException; /** * 刪除配置 */ boolean removeConfig(String dataId, String group) throws NacosException; /** * 刪除監聽器 */ void removeListener(String dataId, String group, Listener listener); /** * 獲取服務器狀態 */ String getServerStatus(); /** * 關閉服務 */ void shutDown() throws NacosException; }
ConfigService測試案例代碼如下:
public static void main(String[] args) throws NacosException { /** 配置管理服務*/ String nacosServer = "localhost:8848"; ConfigService configService = NacosFactory.createConfigService(nacosServer); String dataId = "db.config"; String group = "lucky"; /** 1.發布配置*/ String configContent = ""; configService.publishConfig(dataId, group, configContent); /** 2.獲取配置*/ String config = configService.getConfig(dataId, group, 5000); /** 3.添加配置更新監聽器*/ configService.addListener(dataId, group, new Listener() { @Override public Executor getExecutor() { return null; } @Override public void receiveConfigInfo(String configInfo) { System.out.println("監聽配置更新:" + configInfo); //TODO 處理配置更新 } }); while (true){ } }
2.2.2、服務管理
服務管理相關功能都由NamingService接口定義,根據NacosFactory可以獲取NamingService實例,NamingService包含服務注冊、訂閱等相關方法,定義如下:
public interface NamingService { /** * 注冊服務實例 */ void registerInstance(String serviceName, String ip, int port) throws NacosException; void registerInstance(String serviceName, String groupName, String ip, int port) throws NacosException; void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException; void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException; void registerInstance(String serviceName, Instance instance) throws NacosException; void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException; /** * 注銷服務實例 */ void deregisterInstance(String serviceName, String ip, int port) throws NacosException; void deregisterInstance(String serviceName, String groupName, String ip, int port) throws NacosException; void deregisterInstance(String serviceName, String ip, int port, String clusterName) throws NacosException; void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException; void deregisterInstance(String serviceName, Instance instance) throws NacosException; void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException; /** * 根據條件獲取服務實例列表 */ List<Instance> getAllInstances(String serviceName) throws NacosException; List<Instance> getAllInstances(String serviceName, String groupName) throws NacosException; List<Instance> getAllInstances(String serviceName, boolean subscribe) throws NacosException; List<Instance> getAllInstances(String serviceName, String groupName, boolean subscribe) throws NacosException; List<Instance> getAllInstances(String serviceName, List<String> clusters) throws NacosException; List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters) throws NacosException; List<Instance> getAllInstances(String serviceName, List<String> clusters, boolean subscribe) throws NacosException; List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException; /** * 根據條件選擇服務實例列表 */ List<Instance> selectInstances(String serviceName, boolean healthy) throws NacosException; List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException; List<Instance> selectInstances(String serviceName, boolean healthy, boolean subscribe) throws NacosException; List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException; List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy) throws NacosException; List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy) throws NacosException; List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException; List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException; /** * 根據條件以及負載均衡策略選擇一個健康的服務實例 */ Instance selectOneHealthyInstance(String serviceName) throws NacosException; Instance selectOneHealthyInstance(String serviceName, String groupName) throws NacosException; Instance selectOneHealthyInstance(String serviceName, boolean subscribe) throws NacosException; Instance selectOneHealthyInstance(String serviceName, String groupName, boolean subscribe) throws NacosException; Instance selectOneHealthyInstance(String serviceName, List<String> clusters) throws NacosException; Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters) throws NacosException; Instance selectOneHealthyInstance(String serviceName, List<String> clusters, boolean subscribe) throws NacosException; Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException; /** * 訂閱服務,並開啟Listener監聽服務變更事件 */ void subscribe(String serviceName, EventListener listener) throws NacosException; void subscribe(String serviceName, String groupName, EventListener listener) throws NacosException; void subscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException; void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException; /** * 取消訂閱服務,並關閉Listener監聽服務變更事件 */ void unsubscribe(String serviceName, EventListener listener) throws NacosException; void unsubscribe(String serviceName, String groupName, EventListener listener) throws NacosException; void unsubscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException; void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException; /** * 根據條件獲取所有服務名稱列表 */ ListView<String> getServicesOfServer(int pageNo, int pageSize) throws NacosException; ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName) throws NacosException; ListView<String> getServicesOfServer(int pageNo, int pageSize, AbstractSelector selector) throws NacosException; ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException; /** * 獲取當前客戶端訂閱的服務列表 */ List<ServiceInfo> getSubscribeServices() throws NacosException; /** * 獲取服務器狀態 */ String getServerStatus(); /** * 關閉服務器 */ void shutDown() throws NacosException; }
NamingService測試案例代碼如下:
public static void main(String[] args) throws NacosException { String serverAddr = "42.192.94.208:8858"; /** 1.創建NamingService實例 */ NamingService namingService = NacosFactory.createNamingService(serverAddr); /** 2.注冊實例*/ namingService.registerInstance("testService", "localhost", 8080); /** 3.注銷實例*/ namingService.deregisterInstance("testService", "localhost", 8080); /** 4.獲取所有健康實例*/ List<Instance> instances = namingService.selectInstances("testService", true); /** 5.監聽服務變化*/ namingService.subscribe("testService", new EventListener() { @Override public void onEvent(Event event) { System.out.println("處理服務變更事件"); if(event instanceof NamingEvent){ //TODO } } }); while (true){ } }
2.3、dubbo集成Nacos注冊中心
dubbo采用Nacos作為注冊中心,只需要在配置注冊中心時將地址改成nacos地址即可,如下:
XML配置
<!-- nacos地址 --> <dubbo:registry address="nacos://127.0.0.1:8848" />
外部配置
## dubbo注冊中心地址 dubbo.registry.address = zookeeper://10.20.153.10:2181
2.4、SpringBoot集成Nacos配置中心
添加nacos依賴
<dependency> <groupId>com.alibaba.boot</groupId> <artifactId>nacos-config-spring-boot-starter</artifactId> <version>0.2.1</version> </dependency>
版本號0.2.x.RELEASE對應的是 Spring Boot 2.x 版本,版本0.1.x.RELEASE對應的是 Spring Boot 1.x 版本
在application.properties配置文件中添加nacos地址配置
nacos.config.server-addr=127.0.0.1:8848
在SpringBoot啟動類添加@NacosProperySource注解添加Nacos配置來源,autoRefreshed表示是否自動更新
@NacosPropertySource(dataId = "db.config", autoRefreshed = true)
通過nacos的@NacosValue注解給變量賦值配置的值,autoRefreshed表示是否自動更新,如:
@NacosValue(value = "${db.username:tempUser}", autoRefreshed = true) private String dbUser; @NacosValue(value = "${db.password:tempPassword}") private String dbPassword;
三、Nacos實現原理
3.1、配置中心實現原理
Nacos提供了大量的配置管理相關API供客戶端調用,客戶端可以很方便的調用API來進行配置管理。所以Nacos Client啟動的時候只需要調用Nacos server的接口就可以獲取到所有的配置。
所以客戶端獲取配置的重點是如何進行熱更新,也就是當服務端配置更新后,客戶端是如何根據監聽器進行實時更新的,監聽器又是如何實現的呢?首先就需要從ConfigService的addListener方法入手。
ConfigService接口的實現類是NacosConfigService,addListener方法源碼如下:
1 private final ClientWorker worker; 2 3 /** NacosConfigService類 添加配置更新監聽器方法 4 * @param dataId : 配置集 5 * @param group : 配置分組 6 * @param listener : 配置更新監聽器 7 * */ 8 public void addListener(String dataId, String group, Listener listener) throws NacosException { 9 //調用ClientWorker對象方法 10 worker.addTenantListeners(dataId, group, Arrays.asList(listener)); 11 } 12 13 //Http客戶端 14 private final HttpAgent agent; 15 16 /** ClientWorker類 添加監聽器方法 */ 17 public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException { 18 group = null2defaultGroup(group); 19 String tenant = agent.getTenant(); 20 CacheData cache = addCacheDataIfAbsent(dataId, group, tenant); 21 for (Listener listener : listeners) { 22 /** 調用CacheData對象的addListener方法*/ 23 cache.addListener(listener); 24 } 25 }
/** CacheData類 監聽器列表*/ private final CopyOnWriteArrayList<ManagerListenerWrap> listeners; /** * CacheData類 添加監聽器 * */ public void addListener(Listener listener) { if (null == listener) { throw new IllegalArgumentException("listener is null"); } /** 包裝Listener*/ ManagerListenerWrap wrap = (listener instanceof AbstractConfigChangeListener) ? new ManagerListenerWrap(listener, md5, content) : new ManagerListenerWrap(listener, md5); /** 將監聽器添加到列表中*/ if (listeners.addIfAbsent(wrap)) { LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group, listeners.size()); } }
邏輯並不復雜,最終是將Listener對象進行封裝並添加到了CacheData對象的listeners列表中存儲起來。既然有地方存了,那么就需要有地方去讀,而開啟監聽是通過ClientWorker實例來實現。
NacosConfigService初始化時,會初始化ClientWorker對象,ClientWorker構造函數如下:
/** ClientWorker構造函數 */ public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; /** 1.初始化配置*/ init(properties); /** 2.創建定時任務線程池*/ this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); /** 3.創建定時任務線程池*/ this.executorService = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); /** 4.開啟定時任務,10毫秒執行一次*/ this.executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { /** 5.檢測配置信息*/ checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }
ClientWorker初始化時會創建兩個定時任務線程池,一個只有一個線程每10毫秒執行一次checkConfigInfo方法,而另一個線程池就是專門用來處理checkConfigInfo方法內部的檢查配置的邏輯,源碼如下:
/** ClientWorker檢查配置信息方法*/ public void checkConfigInfo() { /** 1.獲取CacheData對象,key是dataId*/ int listenerSize = cacheMap.size(); int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { /** 2.線程池執行LongPollingRunnable任務*/ executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }
checkConfigInfo方法實際就是向定時任務線程池中提交一個長輪訓任務LongPollingRunnable,該任務執行邏輯如下:
/** LongPollingRunnable線程執行邏輯 */ public void run() { List<CacheData> cacheDatas = new ArrayList<CacheData>(); List<String> inInitializingCacheList = new ArrayList<String>(); try { //遍歷所有CacheData for (CacheData cacheData : cacheMap.values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { /** 檢查CacheData的本地配置*/ checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } // 校驗服務器配置,檢查需要更新的DataId List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); if (!CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys); } /** 遍歷所有更新的配置分組key*/ for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { /** 獲取服務器配置 */ String[] ct = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant)); /** 更新服務器配置*/ cache.setContent(ct[0]); if (null != ct[1]) { cache.setType(ct[1]); } LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]); } catch (NacosException ioe) { String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { /** 校驗配置的MD5*/ cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } } }
首先是檢查本地配置,所以及時服務器崩潰了,nacos客戶端也可以保證可以使用本地配置,本地配置存儲在~nacos/config/目錄下,檢查完本地配置之后,再查詢服務器配置,然后和本地配置進行比較的到需要更新的配置,將最新的配置寫入本地。
最后執行CacheData的checkListenerMd5()方法,該方法作用是比較配置文件的MD5加密數據是否一致,如果不一致則表示更新過,那么就需要觸發監聽器的回調,源碼如下:
1 /** CacheData類*/ 2 void checkListenerMd5() { 3 for (ManagerListenerWrap wrap : listeners) { 4 //比較MD5加密數據是否一致 5 if (!md5.equals(wrap.lastCallMd5)) { 6 /** 回調Listener*/ 7 safeNotifyListener(dataId, group, content, type, md5, wrap); 8 } 9 } 10 } 11 12 private void safeNotifyListener(final String dataId, final String group, final String content, final String type, 13 final String md5, final ManagerListenerWrap listenerWrap) { 14 final Listener listener = listenerWrap.listener; 15 16 Runnable job = new Runnable() { 17 @Override 18 public void run() { 19 ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader(); 20 ClassLoader appClassLoader = listener.getClass().getClassLoader(); 21 try { 22 if (listener instanceof AbstractSharedListener) { 23 AbstractSharedListener adapter = (AbstractSharedListener) listener; 24 adapter.fillContext(dataId, group); 25 LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5); 26 } 27 // 執行回調之前先將線程classloader設置為具體webapp的classloader,以免回調方法中調用spi接口是出現異常或錯用(多應用部署才會有該問題)。 28 Thread.currentThread().setContextClassLoader(appClassLoader); 29 30 ConfigResponse cr = new ConfigResponse(); 31 cr.setDataId(dataId); 32 cr.setGroup(group); 33 cr.setContent(content); 34 configFilterChainManager.doFilter(null, cr); 35 String contentTmp = cr.getContent(); 36 /** 回調執行Listener的receiveConfigInfo方法 */ 37 listener.receiveConfigInfo(contentTmp); 38 39 // compare lastContent and content 40 if (listener instanceof AbstractConfigChangeListener) { 41 Map data = ConfigChangeHandler.getInstance() 42 .parseChangeData(listenerWrap.lastContent, content, type); 43 ConfigChangeEvent event = new ConfigChangeEvent(data); 44 ((AbstractConfigChangeListener) listener).receiveConfigChange(event); 45 listenerWrap.lastContent = content; 46 } 47 48 listenerWrap.lastCallMd5 = md5; 49 LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5, 50 listener); 51 } catch (NacosException ex) { 52 LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", 53 name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg()); 54 } catch (Throwable t) { 55 LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, 56 group, md5, listener, t.getCause()); 57 } finally { 58 Thread.currentThread().setContextClassLoader(myClassLoader); 59 } 60 } 61 }; 62 63 final long startNotify = System.currentTimeMillis(); 64 try { 65 if (null != listener.getExecutor()) { 66 listener.getExecutor().execute(job); 67 } else { 68 job.run(); 69 } 70 } catch (Throwable t) { 71 LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, 72 group, md5, listener, t.getCause()); 73 } 74 final long finishNotify = System.currentTimeMillis(); 75 LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ", 76 name, (finishNotify - startNotify), dataId, group, md5, listener); 77 }
當比較更新完的配置和之前的配置不一樣時,就會觸發監聽器Listener的回調,執行Listener的receiveConfigInfo方法
總結:
Nacos配置中心采用的是客戶端pull的方式從nacos服務器獲取配置數據,並且沒有和nacos服務器保持長連接,而是以定時任務執行HTTP請求的方式從Nacos服務器獲取最新配置,然后再刷新到本地存儲,最后再觸發監聽器Listener的回調方法。
所以Nacos客戶端的監聽器的通知並不是nacos服務器主動推送過來的,而是nacos客戶端本地輪訓查詢發現了配置變更之后才觸發的回調。另外nacos客戶端本地采用了線程池方式拉取配置,所以不會影響核心業務線程。
3.2、服務管理實現原理
nacos提供了大量關於服務發布和訂閱的API,作為Nacos客戶端,無論是服務提供者還是服務消費者,只需要在啟動時調用nacos的API即可完成服務發布和服務訂閱功能。但是作為注冊中心,還需要有服務實例健康檢查功能,服務消費者實時監聽服務提供者變化的
通知功能。而服務訂閱的監聽邏輯和nacos配置的變更監聽流程基本上相同,訂閱功能主要由subscribe方法實現,NamingService實現類是NacosNamingService,初始化時會執行init方法,初始化服務器代理serverProxy,心跳處理器beatReactor,host處理器
hostReactor等對象,服務訂閱方法subscribe方法邏輯如下:
private HostReactor hostReactor; private BeatReactor beatReactor; private NamingProxy serverProxy; /** NacosNamingService初始化方法 */ private void init(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); this.namespace = InitUtils.initNamespaceForNaming(properties); InitUtils.initSerialization(); initServerAddr(properties); InitUtils.initWebRootContext(properties); initCacheDir(); initLogName(properties); this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties); this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties)); this.hostReactor = new HostReactor(this.serverProxy, beatReactor, this.cacheDir, isLoadCacheAtStart(properties), isPushEmptyProtect(properties), initPollingThreadCount(properties)); } /** NacosNamingService服務訂閱方法 */ public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException { hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener); } /** HostReactor的服務訂閱方法,並開啟監聽器*/ public void subscribe(String serviceName, String clusters, EventListener eventListener) { /** 1.注冊監聽器,存入InstanceChangeNotifier對象的Map中,key是服務名稱和集群,value是監聽器集合 */ notifier.registerListener(serviceName, clusters, eventListener); /** 2.根據服務名稱獲取服務器信息 */ getServiceInfo(serviceName, clusters); }
方法執行到HostReactor對象的subscribe方法,首先是將監聽器存入InstanceChangeNotifier對象的Map中,根據服務名稱和集群名稱作為key存儲,value是監聽器的集合,存儲起來之后調用getServiceInfo方法從nacos服務器獲取服務實例信息,邏輯如下:
1 /** HostReactor類 獲取服務實例信息方法 */ 2 public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { 3 String key = ServiceInfo.getKey(serviceName, clusters); 4 if (failoverReactor.isFailoverSwitch()) { 5 return failoverReactor.getService(key); 6 } 7 /** 從本地緩存中獲取ServiceInfo對象 */ 8 ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); 9 10 if (null == serviceObj) {// 如果本地緩存中沒有服務實例 11 serviceObj = new ServiceInfo(serviceName, clusters); 12 serviceInfoMap.put(serviceObj.getKey(), serviceObj); 13 updatingMap.put(serviceName, new Object()); 14 /** 立即更新服務實例*/ 15 updateServiceNow(serviceName, clusters); 16 updatingMap.remove(serviceName); 17 18 } else if (updatingMap.containsKey(serviceName)) {//判斷當前服務實例是否正在更新 19 if (UPDATE_HOLD_INTERVAL > 0) { 20 synchronized (serviceObj) { 21 try { 22 serviceObj.wait(UPDATE_HOLD_INTERVAL); 23 } catch (InterruptedException e) { 24 NAMING_LOGGER 25 .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); 26 } 27 } 28 } 29 } 30 31 /** 定時更新服務實例信息 */ 32 scheduleUpdateIfAbsent(serviceName, clusters); 33 return serviceInfoMap.get(serviceObj.getKey()); 34 }
核心邏輯是先從本地獲取服務實例信息,如果不存在那么立即執行updateServiceNow方法進行更新;如果已經存在那么先執行scheuleUpdateIfAbsent方法定時更新。updateServiceNow方法也就是當前線程立即更新服務實例,執行了updateService方法,
而定時更新邏輯是先構建一個UpdateTask,然后提交給線程池來執行,定時每1秒執行一次,邏輯如下:
/** HostReactor類 */ public void scheduleUpdateIfAbsent(String serviceName, String clusters) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } /** 創建UpdateTask,並添加定時任務 */ ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } } /** HostReactor類添加任務*/ public synchronized ScheduledFuture<?> addTask(UpdateTask task) { /** 線程池執行,每1秒執行一次*/ return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS); }
所以更新的邏輯主要在UpdateTask執行體類,且邏輯肯定包含了updateService方法的邏輯,源碼核心邏輯如下:
/** HostReactor 更新服務實例方法 */ public void updateService(String serviceName, String clusters) throws NacosException { /** 1.從本地獲取舊的服務實例 */ ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { /** 2.從服務器查詢最新服務實例列表 */ String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { /** 3.刷新本地緩存 */ processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } } /** UpdateTask 執行體*/ public void run() { long delayTime = DEFAULT_DELAY; try { /** 1.從緩存中獲取服務實例*/ ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); if (serviceObj == null) { /** 2.如果緩存中沒有,則執行updateService方法查詢*/ updateService(serviceName, clusters); return; } /** 2.如果本地服務實例更新時間延遲,那么就執行updateService方法刷新*/ if (serviceObj.getLastRefTime() <= lastRefTime) { updateService(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { refreshOnly(serviceName, clusters); } lastRefTime = serviceObj.getLastRefTime(); if (!notifier.isSubscribed(serviceName, clusters) && !futureMap .containsKey(ServiceInfo.getKey(serviceName, clusters))) { // abort the update task NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters); return; } if (CollectionUtils.isEmpty(serviceObj.getHosts())) { /** 3.如果查詢失敗,那么失敗次數自增*/ incFailCount(); return; } delayTime = serviceObj.getCacheMillis(); /** 4.如果查詢成功,那么重置失敗次數*/ resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } finally { /** 5.提交下一次延遲任務*/ executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } }
可以發現更新邏輯就是執行updateService方法,首先從服務器查詢最新的服務實例列表,然后將查詢結果刷新到本地緩存中,然后開啟下一次定時任務繼續執行。默認是1秒鍾執行一次,如果查詢不到任何記錄(服務器異常或無可用實例),那么就增加失敗次數,每
增加一次失敗次數延遲執行時間就翻倍,最長會1分鍾執行一次。
另外當執行updateService方法刷新服務實例時,如果觸發了服務更新,就需要更新本地緩存並且寫入磁盤的持久化文件中保持,並且還會調用NotifyCenter的publishEvent方法發布服務實例變更事件,邏輯如下:
/** HostReactor 處理查詢服務實例結果方法*/ public ServiceInfo processServiceJson(String json) { //...... boolean changed = false; if (oldService != null) { //...... } else { changed = true; /** 刷新內存中緩存*/ serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); /** 發布服務實例變更事件*/ NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); serviceInfo.setJsonFromServer(json); /** 寫入磁盤本地數據*/ DiskCache.write(serviceInfo, cacheDir); } //...... return serviceInfo; } /** NotifyCenter 發布事件方法*/ public static boolean publishEvent(Event event) { try { return publishEvent(event.getClass(), event); } catch (Throwable var2) { LOGGER.error("There was an exception to the message publishing : {}", var2); return false; } } private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) { if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) { return INSTANCE.sharePublisher.publish(event); } final String topic = ClassUtils.getCanonicalName(eventType); EventPublisher publisher = INSTANCE.publisherMap.get(topic); if (publisher != null) { /** 執行EventPublisher對象publish方法*/ return publisher.publish(event); } LOGGER.warn("There are no [{}] publishers for this event, please register", topic); return false; }
實際是調用了EventPublisher對象的publish方法,默認實現是DefaultPublisher類,DefaultPublisher會先將通知事件存入本地隊列,然后采用線程異步通知,邏輯如下:
1 /** DefaultPublisher類 發布事件方法*/ 2 public boolean publish(Event event) { 3 /** 1.檢查並開啟線程 */ 4 checkIsStart(); 5 /** 2.將事件存入隊列*/ 6 boolean success = this.queue.offer(event); 7 if (!success) { 8 LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event); 9 /** 3.如果存入隊列失敗,那么立即通知*/ 10 receiveEvent(event); 11 return true; 12 } 13 return true; 14 } 15 16 public void run() { 17 openEventHandler(); 18 } 19 20 void openEventHandler() { 21 try { 22 23 // This variable is defined to resolve the problem which message overstock in the queue. 24 int waitTimes = 60; 25 // To ensure that messages are not lost, enable EventHandler when 26 // waiting for the first Subscriber to register 27 for (; ; ) { 28 if (shutdown || hasSubscriber() || waitTimes <= 0) { 29 break; 30 } 31 ThreadUtils.sleep(1000L); 32 waitTimes--; 33 } 34 35 for (; ; ) { 36 if (shutdown) { 37 break; 38 } 39 final Event event = queue.take(); 40 receiveEvent(event); 41 UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence())); 42 } 43 } catch (Throwable ex) { 44 LOGGER.error("Event listener exception : {}", ex); 45 } 46 } 47 48 void receiveEvent(Event event) { 49 final long currentEventSequence = event.sequence(); 50 /** 遍歷所有訂閱者,*/ 51 for (Subscriber subscriber : subscribers) { 52 // Whether to ignore expiration events 53 if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) { 54 LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", 55 event.getClass()); 56 continue; 57 } 58 /** 通知訂閱者,執行訂閱者的onEvent方法 */ 59 notifySubscriber(subscriber, event); 60 } 61 }
DefaultPublisher先將事件存入隊列,然后通過異步線程從隊列中取任務,遍歷事件所有訂閱者,依次遍歷執行訂閱者的onEvent方法實現事件回調通知。
總結:
服務管理的實現和配置管理實現原理基本一致,啟動時首先會調用Nacos服務器的HTTP接口初始化一次,並且在本地內存中緩存一份,磁盤中持久化一份。然后開啟定時任務輪訓查詢服務器最新數據,如果數據發生變化,那么就更新內存中緩存,重新寫入磁盤,
然后再由線程池異步遍歷所有訂閱者,回調執行訂閱者的回調函數實現變更通知的邏輯。
3.3、心跳檢測
作為服務提供者,需要和nacos服務器保持心跳,服務提供者在注冊實例時會創建心跳任務,邏輯如下:
1 /** 服務提供者 注冊實例*/ 2 public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { 3 NamingUtils.checkInstanceIsLegal(instance); 4 String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); 5 /** 如果實例是臨時節點*/ 6 if (instance.isEphemeral()) { 7 /** 構建心跳任務交給BeatReactor處理 */ 8 BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); 9 beatReactor.addBeatInfo(groupedServiceName, beatInfo); 10 } 11 serverProxy.registerService(groupedServiceName, groupName, instance); 12 }
調用BeatReactor的addBeatInfo方法提交心跳任務
public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); /** 創建並提交心跳定時任務,默認是5秒執行一次*/ executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } /** 心跳定時任務執行體 */ class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } long nextTime = beatInfo.getPeriod(); try { /** 發送心跳給Nacos服務器 * 調用Nacos服務器的 /instance/beat 接口 */ JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { /** 如果返回404,那么就重新注冊實例*/ serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg()); } /** 開啟下一次心跳定時任務*/ executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); } }
核心邏輯就是構建心跳定時任務交給NacosNamingService的線程池,默認每5秒發送一次心跳,實際就是調用nacos服務器的 /instance/beat接口發送心跳,心跳發送完成再開啟下一次的定時任務,整體邏輯比較簡單。
總結:
雖然nacos實現了配置中心和服務發現、服務訂閱、健康檢測等功能,但是nacos客戶端實際上並沒有和nacos服務器保持長連接,而是采用HTTP請求的方式來實現。
配置中心就是調用查詢配置HTTP接口查詢並緩存在本地,然后開啟定時任務輪訓查詢,如果發送變更就刷新本地緩存,並觸發回調通知監聽器;
服務發布就是調用注冊服務HTTP接口實現注冊,然后開啟定時任務每5秒向nacos調用一次HTTP接口發送心跳數據,nacos根據心跳來管理服務提供者的健康狀態;
服務訂閱就是調用查詢服務HTTP接口實現服務訂閱並將服務實例信息緩存在本地,然后開啟定時任務輪訓查詢並和本地數據進行比較,如果有更新那么就異步觸發回調通知所有服務訂閱者;