溫馨提示:
本文內容基於個人學習Nacos 2.0.1版本代碼總結而來,因個人理解差異,不保證完全正確。如有理解錯誤之處歡迎各位拍磚指正,相互學習;轉載請注明出處。
在《Distro協議概覽》這篇文章內簡要的從全局角度來分析了Distro協議的整體面貌。若還未閱讀過的同學可以去看一下,大腦中對相關的組件有個映像。在DistroProtocol章節介紹了Distro協議是從這里開始的,本文將圍繞這個入口展開分析。
Distro協議的初始任務
在DistroProtocol章節介紹道,從構造方法中它啟動了一個startDistroTask()
任務,內部又分為驗證任務和同步任務。
private void startDistroTask() {
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
startVerifyTask();
startLoadTask();
}
在真正開始分析任務的具體操作之前,請允許我先介紹【DistroProtocol】內部的一些屬性:
/**
* 節點管理器
*/
private final ServerMemberManager memberManager;
/**
* Distro組件持有者
*/
private final DistroComponentHolder distroComponentHolder;
/**
* Distro任務引擎持有者
*/
private final DistroTaskEngineHolder distroTaskEngineHolder;
其中【DistroComponentHolder】和【DistroTaskEngineHolder】的基本概念已經在《Distro協議概覽》一文中介紹過,請點擊名稱跳轉查看。
此處介紹一下他們的初始化。
初始化Distro協議的組件
DistroHttpRegistry
是v1版本中的組件,這里介紹v1相關內容是想讓各位讀者直觀的對比一下v1和v2的巨大差異。因為此篇文章先於《Distro協議概覽》完成,並且當初學習的時候並未注意即便使用了2.0.1版本的代碼,它還是默認開啟了對v1的兼容。因此花了大量時間研究的卻是v1相關的操作。不過通過對v1和v2的相關處理的分析可以更加明確v2的提升點。
DistroHttpRegistry
v1版本的組件注冊器
package com.alibaba.nacos.naming.consistency.ephemeral.distro;
@Component
public class DistroHttpRegistry {
// 一些Distro組件的集合
private final DistroComponentHolder componentHolder;
// 一些任務執行引擎的集合
private final DistroTaskEngineHolder taskEngineHolder;
// Distro協議http請求方式的數據對象
private final DataStore dataStore;
// 協議映射器
private final DistroMapper distroMapper;
// 一些全局配置
private final GlobalConfig globalConfig;
// Distro 一致性協議服務
private final DistroConsistencyServiceImpl consistencyService;
// Nacos節點管理器
private final ServerMemberManager memberManager;
public DistroHttpRegistry(DistroComponentHolder componentHolder, DistroTaskEngineHolder taskEngineHolder,
DataStore dataStore, DistroMapper distroMapper, GlobalConfig globalConfig,
DistroConsistencyServiceImpl consistencyService, ServerMemberManager memberManager) {
this.componentHolder = componentHolder;
this.taskEngineHolder = taskEngineHolder;
this.dataStore = dataStore;
this.distroMapper = distroMapper;
this.globalConfig = globalConfig;
this.consistencyService = consistencyService;
this.memberManager = memberManager;
}
/**
* Register necessary component to distro protocol for HTTP implement.
*/
@PostConstruct
public void doRegister() {
// 注冊com.alibaba.nacos.naming.iplist.類型數據的數據倉庫實現
componentHolder.registerDataStorage(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroDataStorageImpl(dataStore, distroMapper));
// 注冊com.alibaba.nacos.naming.iplist.類型數據的數據傳輸代理對象實現
componentHolder.registerTransportAgent(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpAgent(memberManager));
// 注冊com.alibaba.nacos.naming.iplist.類型的失敗任務處理器
componentHolder.registerFailedTaskHandler(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpCombinedKeyTaskFailedHandler(taskEngineHolder));
// 注冊com.alibaba.nacos.naming.iplist.類型的任務處理器
taskEngineHolder.registerNacosTaskProcessor(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpDelayTaskProcessor(globalConfig, taskEngineHolder));
// 注冊com.alibaba.nacos.naming.iplist.類型的DistroData數據處理器
componentHolder.registerDataProcessor(consistencyService);
}
}
DistroClientComponentRegistry
v2版本的組件注冊器,請對比v1的異同。
package com.alibaba.nacos.naming.consistency.ephemeral.distro.v2;
@Component
public class DistroClientComponentRegistry {
// Nacos節點管理器
private final ServerMemberManager serverMemberManager;
// Distro協議對象
private final DistroProtocol distroProtocol;
// 一些Distro組件的集合
private final DistroComponentHolder componentHolder;
// 一些任務執行引擎的集合
private final DistroTaskEngineHolder taskEngineHolder;
// Nacos 客戶端管理器
private final ClientManager clientManager;
// 集群rpc客戶端代理對象
private final ClusterRpcClientProxy clusterRpcClientProxy;
// 版本升級判斷
private final UpgradeJudgement upgradeJudgement;
public DistroClientComponentRegistry(ServerMemberManager serverMemberManager, DistroProtocol distroProtocol,
DistroComponentHolder componentHolder, DistroTaskEngineHolder taskEngineHolder,
ClientManagerDelegate clientManager, ClusterRpcClientProxy clusterRpcClientProxy,
UpgradeJudgement upgradeJudgement) {
this.serverMemberManager = serverMemberManager;
this.distroProtocol = distroProtocol;
this.componentHolder = componentHolder;
this.taskEngineHolder = taskEngineHolder;
this.clientManager = clientManager;
this.clusterRpcClientProxy = clusterRpcClientProxy;
this.upgradeJudgement = upgradeJudgement;
}
/**
* Register necessary component to distro protocol for v2 {@link com.alibaba.nacos.naming.core.v2.client.Client}
* implement.
*/
@PostConstruct
public void doRegister() {
DistroClientDataProcessor dataProcessor = new DistroClientDataProcessor(clientManager, distroProtocol, upgradeJudgement);
DistroTransportAgent transportAgent = new DistroClientTransportAgent(clusterRpcClientProxy, serverMemberManager);
DistroClientTaskFailedHandler taskFailedHandler = new DistroClientTaskFailedHandler(taskEngineHolder);
// 注冊Nacos:Naming:v2:ClientData類型數據的數據倉庫實現
componentHolder.registerDataStorage(DistroClientDataProcessor.TYPE, dataProcessor);
// 注冊Nacos:Naming:v2:ClientData類型的DistroData數據處理器
componentHolder.registerDataProcessor(dataProcessor);
// 注冊Nacos:Naming:v2:ClientData類型數據的數據傳輸代理對象實現
componentHolder.registerTransportAgent(DistroClientDataProcessor.TYPE, transportAgent);
// 注冊Nacos:Naming:v2:ClientData類型的失敗任務處理器
componentHolder.registerFailedTaskHandler(DistroClientDataProcessor.TYPE, taskFailedHandler);
}
}
DistroClientComponentRegistry注冊了一些Nacos:Naming:v2:ClientData
類型的操作對象。他們的功能和上一節的DistroHttpRegistry
一樣。區別就是實現類不同。根據DistroComponentHolder
注冊的內容來看,它所操作的數據和《Distro協議概覽》中介紹的【Distro協議重要角色】剛好對應。它明確了v2版本中數據保存在哪里,由什么數據處理器來處理,用什么DistroTransportAgent來發送。同時請注意這幾個注冊方法在v1和v2版本注冊時傳遞的key,在v1中它是com.alibaba.nacos.naming.iplist.
,v2中它是Nacos:Naming:v2:ClientData
,后面你會經常看見他們的。
節點數據驗證
現在正式開始分析DistroProtocol在構造方法中啟動的任務。
驗證的執行流程
前面說道有v1和v2的實現,即便在2.0.*版本中也依然會兼容v1的邏輯,除非你關閉了這個兼容性。從這點來看,接下來的代碼結構的良好設計為升級也提供了很好的基礎。本節內容主要分析的就是面向接口編程
。
private void startVerifyTask() {
GlobalExecutor.schedulePartitionDataTimedSync(
new DistroVerifyTimedTask(memberManager, distroComponentHolder, distroTaskEngineHolder.getExecuteWorkersManager()),
DistroConfig.getInstance().getVerifyIntervalMillis()
);
}
驗證功能從startVerifyTask()
方法開始啟動,此處它構建了一個名為DistroVerifyTimedTask的定時任務,延遲5秒開始,間隔5秒輪詢。
// DistroVerifyTimedTask.java
@Override
public void run() {
try {
// 獲取除自身節點之外的其他節點
List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("server list is: {}", targetServer);
}
// 每一種類型的數據,都要向其他節點發起驗證
for (String each : distroComponentHolder.getDataStorageTypes()) {
// 對dataStorage內的數據進行驗證
verifyForDataStorage(each, targetServer);
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
}
}
private void verifyForDataStorage(String type, List<Member> targetServer) {
// 獲取數據類型
DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);
// 若數據還未同步完畢則不處理
if (!dataStorage.isFinishInitial()) {
Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data", dataStorage.getClass().getSimpleName());
return;
}
// ① 獲取驗證數據
List<DistroData> verifyData = dataStorage.getVerifyData();
if (null == verifyData || verifyData.isEmpty()) {
return;
}
// 對每個節點開啟一個異步的線程來執行
for (Member member : targetServer) {
DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);
if (null == agent) {
continue;
}
executeTaskExecuteEngine.addTask(member.getAddress() + type, new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));
}
}
在DistroVerifyTimedTask
任務中,對每一個節點的所有驗證數據都創建了一個新的任務 DistroVerifyExecuteTask,由它來執行具體的驗證工作。
重點:
被驗證的數據集合從接口DistroDataStorage
中被獲取,並被傳入DistroVerifyExecuteTask
任務中。
// DistroVerifyExecuteTask.java
@Override
public void run() {
for (DistroData each : verifyData) {
try {
// 判斷傳輸對象是否支持回調(若是http的則不支持,實際上沒區別,當前2.0.1版本沒有實現回調的實質內容)
if (transportAgent.supportCallbackTransport()) {
doSyncVerifyDataWithCallback(each);
} else {
doSyncVerifyData(each);
}
} catch (Exception e) {
Loggers.DISTRO
.error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer, e);
}
}
}
/**
* 支持回調的同步數據驗證
* @param data
*/
private void doSyncVerifyDataWithCallback(DistroData data) {
// 回調實際上,也沒啥。。。基本算是空對象
transportAgent.syncVerifyData(data, targetServer, new DistroVerifyCallback());
}
/**
* 不支持回調的同步數據驗證
* @param data
*/
private void doSyncVerifyData(DistroData data) {
transportAgent.syncVerifyData(data, targetServer);
}
每一個DistroVerifyExecuteTask
任務都持有一組驗證數據List<DistroData>
和數據發送的目的地ServertargetServer
,它將使用DistroTransportAgent
為每一個DistroData執行一次驗證操作。這里僅僅描述它的執行流程,不對細節作過多描述。
數據驗證流程:
從DistroDataStorage獲取驗證數據 -> 使用DistroTransportAgent開始驗證。
請注意他們都是接口,對應不同類型的數據將會有不同的實現。
通過下圖可以比較直觀的看到驗證任務的創建流程。
驗證流程中的任務產生說明:
- 當前節點的
DistroVerifyTimedTask
會根據節點的數量來創建DistroVerifyExecuteTask
,並向其傳遞自身負責的所有Client的clientId集合(clientId最終被包裝成DistroData)。- 每一個
DistroVerifyExecuteTask
會為傳入的List中的每一個DistroData創建一個異步的rpc請求。
疑問:
這里是將為本節點中的所有Client單獨都創建一個rpc請求,為何不一次性將所有client發送出去?難道是為了性能考量?
小技巧:
當你看到紅色部分的Task它是被放入一個阻塞隊列里面的時候,你就應該要想到它一定會有一個從隊列不斷獲取任務來執行的操作。這樣在你要分析這些產生的任務如何執行的時候就變的相當容易了
驗證數據的類型
前面我們知道Distro協議數據交互的對象是DistroData
。其內部的content保存的數據才是真正的需要驗證的對象。在v1版本中的DistroData.content保存的是序列化后的Map<String, String>
, key為serviceName,value為Service下的所有Instance的checksum值;v2版本中的DistroData.content保存的是序列化后的DistroClientVerifyInfo
。
public class DistroClientVerifyInfo implements Serializable {
private static final long serialVersionUID = 2223964944788737629L;
// 客戶端ID
private String clientId;
// 修訂版本號,驗證時固定為0
private long revision;
// ...省略getter/setter
}
執行驗證任務
相信通過前面的章節你已經知道驗證任務會處理不同的數據類型,而不同的數據類型的處理方式也不相同。這里假定你已經充分理解不同類型數據的處理流程和實際的處理對象。本節的分析將直接使用對應的處理方法而不再重復它屬於哪個具體的實例(這里的實例指的是接口的實例並非業務中的那個Instance)。
注意:
當你使用2.0.X版本的代碼,並且關閉了雙寫服務,就表示你的Nacos集群會徹底使用2.0版本的特性,它將不再兼容1.版本,同時1.版本中的檢查邏輯將不再被執行,也就是com.alibaba.nacos.naming.iplist.*
類型的數據檢查將不被執行。取而代之的是Nacos:Naming:v2:ClientData
。
v1版本的數據驗證
在v1版本中數據驗證是驗證com.alibaba.nacos.naming.iplist.*
類型, 驗證主要目的是在v1版本中向其他節點發送checksum請求。用於報告自身節點內部的服務列表狀態。發送方不用關注,只管將節點內的服務信息發送出去即可。但當前節點也會接收到其他節點發送來的checksum請求。
從DistroDataStorage獲取驗證數據
// DistroDataStorageImpl.java
@Override
public List<DistroData> getVerifyData() {
// 用於保存屬於當前節點處理的,且數據真實有效的Service
Map<String, String> keyChecksums = new HashMap<>(64);
// 遍歷當前節點已有的datastore
for (String key : dataStore.keys()) {
// 若當前的服務不是本機處理,則排除
if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
continue;
}
// 若當key對應的數據為空,則排除
Datum datum = dataStore.get(key);
if (datum == null) {
continue;
}
keyChecksums.put(key, datum.value.getChecksum());
}
if (keyChecksums.isEmpty()) {
return Collections.emptyList();
}
// 構建DistroData
DistroKey distroKey = new DistroKey("checksum", KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
DistroData data = new DistroData(distroKey, ApplicationUtils.getBean(Serializer.class).serialize(keyChecksums));
// 設置當前操作類型為Verify
data.setType(DataOperation.VERIFY);
return Collections.singletonList(data);
}
注意:
keyChecksums內部存儲的是每個服務所對應實例列表的checksum驗證字符串
使用DistroTransportAgent發送驗證數據
// DistroHttpAgent.java
@Override
public boolean syncVerifyData(DistroData verifyData, String targetServer) {
// 若本機節點緩存中沒有targetServer,說明此節點已不具備服務能力,也沒有報告的必要。
if (!memberManager.hasMember(targetServer)) {
return true;
}
// 發送checksum請求
NamingProxy.syncCheckSums(verifyData.getContent(), targetServer);
return true;
}
// NamingProxy.java
/**
* Synchronize check sums.
*
* @param checksums checksum map bytes
* @param server server address
*/
public static void syncCheckSums(byte[] checksums, String server) {
try {
Map<String, String> headers = new HashMap<>(128);
headers.put(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.version);
headers.put(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION);
headers.put(HttpHeaderConsts.CONNECTION, "Keep-Alive");
// 請求示例:http://10.53.126.16:8848/nacos/v1/ns/distro/checksum?source=10.53.155.22:8848
HttpClient.asyncHttpPutLarge("http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + TIMESTAMP_SYNC_URL + "?source=" + NetUtils.localServer(), headers, checksums,
new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.DISTRO.error("failed to req API: {}, code: {}, msg: {}", "http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + TIMESTAMP_SYNC_URL, result.getCode(), result.getMessage());
}
}
@Override
public void onError(Throwable throwable) {
Loggers.DISTRO.error("failed to req API:" + "http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + TIMESTAMP_SYNC_URL, throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.DISTRO.warn("NamingProxy", e);
}
}
提示:
請求路徑為:http://其他節點的IP地址:其他節點的端口號/nacos/v1/ns/distro/checksum?source=本機的IP地址:本機的端口號
參數為:DistroData,DistroData內部包裝的則是一個Map<服務名稱,服務下實例的checksum驗證字符串>
處理DistroTransportAgent的驗證請求
naming
模塊下的DistroController
用於接收HTTP方式的請求。請注意,此處的驗證請求是處理其他節點發送來的請求,寫在這里是為了便於理解具體的驗證功能。一定要將此小節的內容看做是接收方,因為他們的角色不同,內部緩存的服務數據歸屬也不同。發送方發送的服務數據是屬於在發送方注冊的,而此處作為接收方它自己內部也有
自己負責的服務。弄清楚角色的關系才能理解接下來的驗證流程。
// DistroController.java
/**
* Checksum.
*
* @param source source server
* @param dataMap checksum map
* @return 'ok'
*/
@PutMapping("/checksum")
public ResponseEntity syncChecksum(@RequestParam String source, @RequestBody Map<String, String> dataMap) {
// 構建驗證數據對象
DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(source), dataMap);
// 開始驗證
distroProtocol.onVerify(distroHttpData, source);
return ResponseEntity.ok("ok");
}
// DistroProtocol.java
/**
* Receive verify data, find processor to process.
* @param distroData verify data
* @param sourceAddress source server address, might be get data from source server
* @return true if verify data successfully, otherwise false
*/
public boolean onVerify(DistroData distroData, String sourceAddress) {
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(), distroData.getDistroKey());
}
// 根據不同類型獲取不同的數據處理器
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);
return false;
}
return dataProcessor.processVerifyData(distroData, sourceAddress);
}
// DistroConsistencyServiceImpl.java
@Override
public boolean processVerifyData(DistroData distroData, String sourceAddress) {
DistroHttpData distroHttpData = (DistroHttpData) distroData;
Map<String, String> verifyData = (Map<String, String>) distroHttpData.getDeserializedContent();
onReceiveChecksums(verifyData, sourceAddress);
return true;
}
篩選需要處理的服務
服務篩選的目的可以用7個字總結:個人自掃門前雪。
/**
* Check sum when receive checksums request.
* Service檢查服務,用於從其他節點更新已變更的Service
* @param checksumMap map of checksum 某個服務對應的checksum
* @param server source server request checksum checksum請求的來源
*/
public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
// 若已包含此節點的信息,說明正在處理(處理完畢之后會清空)
if (syncChecksumTasks.containsKey(server)) {
// Already in process of this server:
Loggers.DISTRO.warn("sync checksum task already in process with {}", server);
return;
}
// 將當前要處理的節點信息暫存,表示當前正在處理
syncChecksumTasks.put(server, "1");
try {
List<String> toUpdateKeys = new ArrayList<>();
List<String> toRemoveKeys = new ArrayList<>();
// 第一個for循環
/**
* 判斷當前節點中哪些服務需要去遠程更新
* 需要更新的服務將被添加至toUpdateKeys
*/
for (Map.Entry<String, String> entry : checksumMap.entrySet()) {
/**
* 判斷當前的entry(指的是服務Service)是否由本節點處理, 若是本機負責的則沒必要從其他節點發送過來
* 因為在本節點注冊的服務最終任何新的操作都會被路由到本機,那么它的狀態在本機就是最新的
*/
if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
// this key should not be sent from remote server:
Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
// abort the procedure:
return;
}
// 若當前節點不存在此服務,或服務是空的,或服務的實例列表(checksum驗證字符串)跟傳入的不一致,則標記此服務需要更新
if (!dataStore.contains(entry.getKey()) ||
dataStore.get(entry.getKey()).value == null ||
!dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
// 添加到待更新列表
toUpdateKeys.add(entry.getKey());
}
}
// 第二個for循環
/**
* 此處用於判斷當前節點已有的服務是不是當前接收的請求的來源節點負責處理的,如果是的話,就說明他是最新的不應該被刪除
*/
for (String key : dataStore.keys()) {
// 不是請求方處理的服務就不處理
if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {
continue;
}
// 是請求方處理的服務但不在請求列表中,說明此服務在請求方已經被刪除了
if (!checksumMap.containsKey(key)) {
toRemoveKeys.add(key);
}
}
Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", toRemoveKeys, toUpdateKeys, server);
// 刪掉已經變更的服務
for (String key : toRemoveKeys) {
// 移除服務,並發送變更通知
onRemove(key);
}
// 若沒有需要更新的終止此流程
if (toUpdateKeys.isEmpty()) {
return;
}
// 經過上述流程的處理,目前本機緩存的DataStorage中保留的服務是:在其他節點注冊的、服務。
try {
DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, server);
distroKey.getActualResourceTypes().addAll(toUpdateKeys);
// 更新變更了的服務
DistroData remoteData = distroProtocol.queryFromRemote(distroKey);
if (null != remoteData) {
processData(remoteData.getContent());
}
} catch (Exception e) {
Loggers.DISTRO.error("get data from " + server + " failed!", e);
}
} finally {
// 全部處理完畢之后,將syncChecksumTasks中標記正在處理的節點移除
// Remove this 'in process' flag:
syncChecksumTasks.remove(server);
}
}
請各位看官務必詳細閱讀onReceiveChecksums(Map<String, String> checksumMap, String server)
方法內的注釋。當前的方法主要處理服務的3種場景:不需要處理的、需要更新的、需要刪除的。
checksum的請求發送方發送的是它自身節點負責的數據,那么接收方接收到的也都不是自己負責的數據。因此在onReciveChecksums方法的第一個for循環中就對數據進行篩選,避免本機負責的數據由其他節點誤發過來(這種可能性也有可能存在,那就是當本機暫時不可用,某服務本來是本機負責,又被轉移到其他節點的時候,此觀點暫未實際測試),這里處理的是不需要處理
的數據。
接着在第一個for循環內部的第二個if語句中判斷傳遞過來的服務信息是否存在,是否為空,是否和傳遞過來的不一致。若條件成立則將其放入需要更新
的toUPdateKeys
集合內。第二個for循環則處理的是需要刪除
的服務,這里是處理本機內部緩存的服務,首先判斷本機內的哪些服務不是由傳遞請求的這個節點處理的,對於這類服務忽略掉,因為它所屬的節點也會來發送checksum請求,放在那次處理即可。若是傳遞請求的節點處理的但又沒有在這次請求中帶過來則需要標記刪除,接着對標記刪除的服務進行移除。
服務篩選示意圖:
- 假設現在有3個節點,A、B、C他們以不同顏色區分,每個節點內部DataStore緩存的服務列表假設都是一致每個節點的服務名稱和顏色都與節點相關聯。
- 這里虛擬了一個服務的名稱,用於描述它所屬的節點、狀態、以及它的實例數量以#分隔。實例數量用於表示某個服務在某一時刻的狀態。
- 每個節點內部保留着本節點的服務和其他節點的服務,但每個節點只保證自己的服務是最新的(和節點顏色相同的服務)。
- 在發送checksum請求的時候,以服務名稱作為key。
上圖表示了節點A向節點B發送checksum請求的時候所帶參數和接收方的篩選結果
- 節點A請求的時候攜帶了自身負責處理的紅色服務,有多少發多少。
- 節點B接收到請求之后發現節點A的3、4、5服務實例數量有變更,並且A節點傳遞過來的6服務節點B中沒有,因此將3、4、5、6服務添加到
toUpdateKeys
列表中稍后用於更新- 節點B同時發現節點A此次沒有攜帶2服務,說明2服務在節點A已經被刪除了,因此將其添加到
toRemoveKeys
列表中稍后要從本機的DataStore中移除- 節點B中保存的C節點的服務,不管他,因為它們需要等待節點C向節點B發送checksum。
更新已變更服務
在對傳入的服務進行篩選之后就開始更新。
// 組裝DistroData查詢對象
DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, server);
distroKey.getActualResourceTypes().addAll(toUpdateKeys);
// 從請求發送方獲取最新數據
DistroData remoteData = distroProtocol.queryFromRemote(distroKey);
if (null != remoteData) {
// 處理查詢結果
processData(remoteData.getContent());
}
// DistroProtocol.java
/**
* Query data from specified server.
* 從指定的節點獲取DistroData
* @param distroKey data key
* @return data
*/
public DistroData queryFromRemote(DistroKey distroKey) {
if (null == distroKey.getTargetServer()) {
Loggers.DISTRO.warn("[DISTRO] Can't query data from empty server");
return null;
}
String resourceType = distroKey.getResourceType();
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
if (null == transportAgent) {
Loggers.DISTRO.warn("[DISTRO] Can't find transport agent for key {}", resourceType);
return null;
}
// 使用DistroTransportAgent獲取數據
return transportAgent.getData(distroKey, distroKey.getTargetServer());
}
根據當前處理的服務類型,可以得知這里的DistroTransportAgent為DistroHttpAgent。
// DistroTransportAgent.java
@Override
public DistroData getData(DistroKey key, String targetServer) {
try {
List<String> toUpdateKeys = null;
if (key instanceof DistroHttpCombinedKey) {
toUpdateKeys = ((DistroHttpCombinedKey) key).getActualResourceTypes();
} else {
toUpdateKeys = new ArrayList<>(1);
toUpdateKeys.add(key.getResourceKey());
}
// 使用NamingProxy獲取數據
byte[] queriedData = NamingProxy.getData(toUpdateKeys, key.getTargetServer());
return new DistroData(key, queriedData);
} catch (Exception e) {
throw new DistroException(String.format("Get data from %s failed.", key.getTargetServer()), e);
}
}
// NamingProxy.java
/**
* Get Data from other server.
*
* @param keys keys of datum
* @param server target server address
* @return datum byte array
* @throws Exception exception
*/
public static byte[] getData(List<String> keys, String server) throws Exception {
// 組裝http請求參數
Map<String, String> params = new HashMap<>(8);
params.put("keys", StringUtils.join(keys, ","));
// 發送http請求
RestResult<String> result = HttpClient.httpGetLarge("http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL, new HashMap<>(8), JacksonUtils.toJson(params));
// 處理請求結果
if (result.ok()) {
return result.getData().getBytes();
}
throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL + ". code: " + result.getCode() + " msg: "
+ result.getMessage());
}
成功獲取遠端節點的DistroData數據之后將進行解析, 對於Service類型的數據這里做了兩個操作:一是將獲取的最新數據放入當前節點的DataStore內;二是對獲取的最新服務開啟健康檢查。
// DistroConsistencyServiceImpl.java
private boolean processData(byte[] data) throws Exception {
if (data.length > 0) {
// 序列化將要解析的值
Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);
// 此for循環主要功能是用於將獲取到的Service添加到監聽器列表
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
// 遍歷並放入數據倉庫中
dataStore.put(entry.getKey(), entry.getValue());
// ① 判斷是否有當前服務的監聽器,若有則觸發監聽器用於更新服務信息
if (!listeners.containsKey(entry.getKey())) {
// 因為當前的類是用於處理臨時節點的同步信息的(詳情請看v1版本ServiceManager內部的邏輯)
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) {
// 根據獲取的數據信息構建一個v1版本的Service對象
// create empty service
Loggers.DISTRO.info("creating service {}", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
// 尤其是要注意此處的key類型,因為listener集合內部可能存放多種類型的監聽器
// ② 獲取ServiceManager,因為其也是一個監聽器
// The Listener corresponding to the key value must not be empty
RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
if (Objects.isNull(listener)) {
return false;
}
// 觸發ServiceManager的onChange事件,
listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
}
}
}
// 此for循環主要用於觸發監聽器列表中的Service監聽器
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
if (!listeners.containsKey(entry.getKey())) {
// Should not happen:
Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
continue;
}
try {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
}
} catch (Exception e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
continue;
}
// Update data store if listener executed successfully:
dataStore.put(entry.getKey(), entry.getValue());
}
}
return true;
}
提示:
- 方法中標記的①②處主要就是用於在拿到其他節點的數據時判斷當前節點有沒有處理過這個服務(Service),若有處理過那么ServiceManager就會將本服務放入ConsistencyService的監聽器列表屬性中(當前類型的數據處理器是DistroConsistencyServiceImpl)。因為Service本身它也是一個監聽器,請結合代碼中的注釋來理解此段代碼的兩個for循環。
- ServiceManager 本身作為監聽器加入listener時的key為:com.alibaba.nacos.naming.domains.meta.
- Service 本身作為監聽器加入listener時的key為:com.alibaba.nacos.naming.iplist.ephemeral.命名空間##分組名稱@@服務名稱
監聽器列表示例數據:
listeners = {ConcurrentHashMap@13820} size = 2
"com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@TO_REMOTE_65_T001" -> {ConcurrentLinkedQueue@13869} size = 1
key = "com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@TO_REMOTE_65_T001"
value = {ConcurrentLinkedQueue@13869} size = 1
0 = {Service@10668} "Service{name='DEFAULT_GROUP@@TO_REMOTE_65_T001', protectThreshold=0.0, appName='null', groupName='DEFAULT_GROUP', metadata={}}"
"com.alibaba.nacos.naming.domains.meta." -> {ConcurrentLinkedQueue@13871} size = 1
key = "com.alibaba.nacos.naming.domains.meta."
value = {ConcurrentLinkedQueue@13871} size = 1
0 = {ServiceManager@10671}
ServiceManager.onChange()
// ServiceManager.java
@Override
public void onChange(String key, Service service) throws Exception {
try {
if (service == null) {
Loggers.SRV_LOG.warn("received empty push from raft, key: {}", key);
return;
}
// 檢查namespace
if (StringUtils.isBlank(service.getNamespaceId())) {
service.setNamespaceId(Constants.DEFAULT_NAMESPACE_ID);
}
Loggers.RAFT.info("[RAFT-NOTIFIER] datum is changed, key: {}, value: {}", key, service);
// 從ServiceManager內部的serviceMap緩存中獲取
Service oldDom = getService(service.getNamespaceId(), service.getName());
// 若不為空,則更新
if (oldDom != null) {
oldDom.update(service);
// 重新將其放入監聽器列表
// re-listen to handle the situation when the underlying listener is removed:
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), oldDom);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), oldDom);
} else {
// 若之前沒有緩存,添加並初始化
putServiceAndInit(service);
}
} catch (Throwable e) {
Loggers.SRV_LOG.error("[NACOS-SERVICE] error while processing service update", e);
}
}
private void putServiceAndInit(Service service) throws NacosException {
// 插入緩存列表
putService(service);
service = getService(service.getNamespaceId(), service.getName());
// 初始化
service.init();
// 放入監聽器列表
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
public void putService(Service service) {
// 判斷namespace是否存在,若不存在創建一個namespace,並new一個新的Map用於存放namespace對應的service
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
// 將service存放於它所屬的namespace容器內部
serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
}
在ServiceManager的onChange方法內部主要就是更新本次獲取的Service信息,並將其添加到監聽器列表內部。最重要的是,當此次傳遞進來的Service不在緩存中的時候它觸發了一個Service.init()方法。它將會開啟Service和Cluster的心跳檢查。不過他們開啟的是針對v1版本的心跳檢查,若版本是2.0.x的話,心跳檢查將不會執行。
// Service.java
private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);
public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
// 還開啟了Cluster的健康檢查
entry.getValue().init();
}
}
// Cluster.java
/**
* Init cluster.
*/
public void init() {
if (inited) {
return;
}
checkTask = new HealthCheckTask(this);
HealthCheckReactor.scheduleCheck(checkTask);
inited = true;
}
// ClientBeatCheckTask.java
public void run() {
try {
// If upgrade to 2.0.X stop health check with v1
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
// 省略部分代碼...
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
// HealthCheckTask.java
@Override
public void run() {
try {
// If upgrade to 2.0.X stop health check with v1
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
} catch (Throwable e) {
Loggers.SRV_LOG.error("[HEALTH-CHECK] error while process health check for {}:{}", cluster.getService().getName(), cluster.getName(), e);
}
}
Service.onChange()
v2版本的數據驗證
在v2版本中Nacos:Naming:v2:ClientData
類型作為驗證數據類型,相較於v1版本以Service為維度的數據驗證在整體處理流程上簡潔了許多,一次驗證一個客戶端提供的所有服務在邏輯上的完整性也很強。
從DistroDataStorage獲取驗證數據
// DistroClientDataProcessor.java
@Override
public List<DistroData> getVerifyData() {
List<DistroData> result = new LinkedList<>();
// 遍歷當前節點緩存的所有client
for (String each : clientManager.allClientId()) {
Client client = clientManager.getClient(each);
if (null == client || !client.isEphemeral()) {
continue;
}
// 是本機負責的Client才進行處理
if (clientManager.isResponsibleClient(client)) {
// TODO add revision for client.
DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
DistroData data = new DistroData(distroKey, ApplicationUtils.getBean(Serializer.class).serialize(verifyData));
data.setType(DataOperation.VERIFY);
result.add(data);
}
}
return result;
}
注意:
- 在當前節點執行
Nacos:Naming:v2:ClientData
類型數據的驗證任務時,它只會向集群中的其他節點發送自己負責的,且未被移除的數據。
使用DistroTransportAgent發送驗證數據
提示:
- 執行發送數據的這個方法調用的本身是在上層
DistroVerifyExecuteTask
的run方法的for循環內部的。也就是說為每個Client發送一次驗證請求。
// DistroClientTransportAgent.java
@Override
public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
// 若此節點不在當前節點緩存中,直接返回,因為可能下線、或者過期,不需要驗證了
if (isNoExistTarget(targetServer)) {
callback.onSuccess();
}
// 構建請求對象
DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
Member member = memberManager.find(targetServer);
try {
// 創建一個回調對象(Wrapper實現了RequestCallBack接口)
DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer, verifyData.getDistroKey().getResourceKey(), callback, member);
// 使用集群Rpc請求對象發送異步任務
clusterRpcClientProxy.asyncRequest(member, request, wrapper);
} catch (NacosException nacosException) {
callback.onFailed(nacosException);
}
}
// ClusterRpcClientProxy.java
/**
* aync send request to member with callback.
* 其他節點發送異步請求並回調
* @param member member of server.
* @param request request.
* @param callBack RequestCallBack.
* @throws NacosException exception may throws.
*/
public void asyncRequest(Member member, Request request, RequestCallBack callBack) throws NacosException {
// 獲取目標節點對應的RpcClient
RpcClient client = RpcClientFactory.getClient(memberClientKey(member));
if (client != null) {
// 通過Client發送異步請求
client.asyncRequest(request, callBack);
} else {
throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);
}
}
// RpcClient.java
/**
* send async request.
*
* @param request request.
*/
public void asyncRequest(Request request, RequestCallBack callback) throws NacosException {
int retryTimes = 0;
Exception exceptionToThrow = null;
long start = System.currentTimeMillis();
// 重試次數少於3次,且總執行時間小於3秒
while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < start + callback.getTimeout()) {
boolean waitReconnect = false;
try {
if (this.currentConnection == null || !isRunning()) {
waitReconnect = true;
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "Client not connected.");
}
// 使用GrpcConnection發送異步請求
this.currentConnection.asyncRequest(request, callback);
return;
} catch (Exception e) {
if (waitReconnect) {
try {
//wait client to re connect.
Thread.sleep(Math.min(100, callback.getTimeout() / 3));
} catch (Exception exception) {
//Do nothing.
}
}
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Send request fail, request={}, retryTimes={},errorMessage={}", name, request, retryTimes, e.getMessage());
exceptionToThrow = e;
}
retryTimes++;
}
// 判斷RpcClientStatus 是不是RUNNING狀態,若不是則設置其為UNHEALTHY
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
// 若成功設置了UNHEALTHY,則置為失敗
switchServerAsyncOnRequestFail();
}
if (exceptionToThrow != null) {
throw (exceptionToThrow instanceof NacosException) ? (NacosException) exceptionToThrow : new NacosException(SERVER_ERROR, exceptionToThrow);
} else {
throw new NacosException(SERVER_ERROR, "AsyncRequest fail, unknown error");
}
}
// GrpcConnection.java
@Override
public void asyncRequest(Request request, final RequestCallBack requestCallBack) throws NacosException {
// ① 轉換為Grpc的請求載體對象
Payload grpcRequest = GrpcUtils.convert(request);
// 發送Grpc請求
ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
//set callback .
Futures.addCallback(requestFuture, new FutureCallback<Payload>() {
@Override
public void onSuccess(@Nullable Payload grpcResponse) {
Response response = (Response) GrpcUtils.parse(grpcResponse);
if (response != null) {
if (response instanceof ErrorResponse) {
requestCallBack.onException(new NacosException(response.getErrorCode(), response.getMessage()));
} else {
// 若成功返回,觸發回調的onResponse對象,由回調函數處理后續邏輯
requestCallBack.onResponse(response);
}
} else {
requestCallBack.onException(new NacosException(ResponseCode.FAIL.getCode(), "response is null"));
}
}
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof CancellationException) {
requestCallBack.onException(
new TimeoutException("Timeout after " + requestCallBack.getTimeout() + " milliseconds."));
} else {
requestCallBack.onException(throwable);
}
}
}, requestCallBack.getExecutor() != null ? requestCallBack.getExecutor() : this.executor);
// set timeout future.
ListenableFuture<Payload> payloadListenableFuture = Futures
.withTimeout(requestFuture, requestCallBack.getTimeout(), TimeUnit.MILLISECONDS,
RpcScheduledExecutor.TIMEOUT_SCHEDULER);
}
這里不同於v1版本的checksum
,在v2版本的驗證任務中使用了rpc請求方式。和v1版本的請求邏輯相同,都是發送當前節點所負責的Client信息到其他節點。
注意:
在①標注的地方,它在轉換request的時候作了一些必要的操作,包括構建Grpc請求對象(Payload)所需要的元數據,其中最需要注意的就是Metadata內部的type屬性,接收方將根據這個屬性進行判斷,分別處理不同類型的請求。
處理DistroTransportAgent的驗證請求
其他節點接收到驗證請求,是如何處理呢?GrpcRequestAcceptor
用於接收rpc請求。需要注意的是,它將接收所有類型的rpc請求,例如:ServerCheckRequest
、DistroDataRequest
、HealthCheckRequest
等,具體可查看com.alibaba.nacos.api.remote.request
包下的請求對象。此處我們只關注驗證請求DistroDataRequest
。
// GrpcRequestAcceptor.java
@Override
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
// 如果有必要跟蹤的話
traceIfNecessary(grpcRequest, true);
// 獲取請求對象中的請求類型
String type = grpcRequest.getMetadata().getType();
// 若當前節點還未啟動完畢,則直接返回
//server is on starting.
if (!ApplicationUtils.isStarted()) {
Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(NacosException.INVALID_SERVER_STATUS, "Server is starting,please try later."));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
// 判斷請求是否為ServerCheckRequest
// server check.
if (ServerCheckRequest.class.getSimpleName().equals(type)) {
// 構建一個ServerCheckResponse作為返回對象,並包裝為Payload返回給調用方
Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(CONTEXT_KEY_CONN_ID.get()));
traceIfNecessary(serverCheckResponseP, false);
responseObserver.onNext(serverCheckResponseP);
responseObserver.onCompleted();
return;
}
// 根據請求的類型,找到對應的處理器
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
//no handler found.
if (requestHandler == null) {
// 若未找到類型對應的處理器,返回給調用方一個ErrorResponse
Loggers.REMOTE_DIGEST.warn(String.format("[%s] No handler for request type : %s :", "grpc", type));
Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(NacosException.NO_HANDLER, "RequestHandler Not Found"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
// 檢查連接狀態
//check connection status.
String connectionId = CONTEXT_KEY_CONN_ID.get();
boolean requestValid = connectionManager.checkValid(connectionId);
if (!requestValid) {
Loggers.REMOTE_DIGEST.warn("[{}] Invalid connection Id ,connection [{}] is un registered ,", "grpc", connectionId);
Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(NacosException.UN_REGISTER, "Connection is unregistered."));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
// 接下來開始真正的處理請求邏輯
// 解析請求對象
Object parseObj = null;
try {
parseObj = GrpcUtils.parse(grpcRequest);
} catch (Exception e) {
// 解析失敗返回
Loggers.REMOTE_DIGEST.warn("[{}] Invalid request receive from connection [{}] ,error={}", "grpc", connectionId, e);
Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(NacosException.BAD_GATEWAY, e.getMessage()));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
// 請求對象為空,返回
if (parseObj == null) {
Loggers.REMOTE_DIGEST.warn("[{}] Invalid request receive ,parse request is null", connectionId);
Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(NacosException.BAD_GATEWAY, "Invalid request"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
}
// 請求對象貨不對板,返回
if (!(parseObj instanceof Request)) {
Loggers.REMOTE_DIGEST.warn("[{}] Invalid request receive ,parsed payload is not a request,parseObj={}", connectionId, parseObj);
Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(NacosException.BAD_GATEWAY, "Invalid request"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
// 將接收的對象轉換為請求對象
Request request = (Request) parseObj;
try {
Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
// 使用對應的處理器進行處理
Response response = requestHandler.handleRequest(request, requestMeta);
Payload payloadResponse = GrpcUtils.convert(response);
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
} catch (Throwable e) {
Loggers.REMOTE_DIGEST.error("[{}] Fail to handle request from connection [{}] ,error message :{}", "grpc", connectionId, e);
Payload payloadResponse = GrpcUtils.convert(buildErrorResponse((e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(), e.getMessage()));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
}
最終的DistroData數據將會由對應的Handler來處理,並返回結果給調用方。
/**
* Distro data request handler.
* Distro協議數據的請求處理器,用於處理客戶端發送來的Distro協議 rpc 請求
* @author xiweng.yy
*/
@Component
public class DistroDataRequestHandler extends RequestHandler<DistroDataRequest, DistroDataResponse> {
private final DistroProtocol distroProtocol;
public DistroDataRequestHandler(DistroProtocol distroProtocol) {
this.distroProtocol = distroProtocol;
}
@Override
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
try {
switch (request.getDataOperation()) {
case VERIFY:
return handleVerify(request.getDistroData(), meta);
case SNAPSHOT:
return handleSnapshot();
case ADD:
case CHANGE:
case DELETE:
return handleSyncData(request.getDistroData());
case QUERY:
return handleQueryData(request.getDistroData());
default:
return new DistroDataResponse();
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
DistroDataResponse result = new DistroDataResponse();
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("handle distro request with exception");
return result;
}
}
/**
* 處理驗證信息
* @param distroData 請求方發送來的驗證信息
* @param meta 請求元數據
* @return
*/
private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {
DistroDataResponse result = new DistroDataResponse();
// 使用DistroProtocol來處理
if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {
result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");
}
return result;
}
private DistroDataResponse handleSnapshot() {
DistroDataResponse result = new DistroDataResponse();
DistroData distroData = distroProtocol.onSnapshot(DistroClientDataProcessor.TYPE);
result.setDistroData(distroData);
return result;
}
private DistroDataResponse handleSyncData(DistroData distroData) {
DistroDataResponse result = new DistroDataResponse();
if (!distroProtocol.onReceive(distroData)) {
result.setErrorCode(ResponseCode.FAIL.getCode());
result.setMessage("[DISTRO-FAILED] distro data handle failed");
}
return result;
}
private DistroDataResponse handleQueryData(DistroData distroData) {
DistroDataResponse result = new DistroDataResponse();
DistroKey distroKey = distroData.getDistroKey();
DistroData queryData = distroProtocol.onQuery(distroKey);
result.setDistroData(queryData);
return result;
}
}
根據DistroDataRequestHandler.handle()方法可以看出來,它不止可以處理VERIFY請求,這里我們只分析VERIFY類型的請求處理。可以發現它調用了DistroProtocol.onVerify()來處理。
// DistroProtocol.java
/**
* Receive verify data, find processor to process.
* @param distroData verify data
* @param sourceAddress source server address, might be get data from source server
* @return true if verify data successfully, otherwise false
*/
public boolean onVerify(DistroData distroData, String sourceAddress) {
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(), distroData.getDistroKey());
}
// 根據此次處理的數據類型獲取對應的處理器,此處我們處理的類型是Client類型(Nacos:Naming:v2:ClientData)
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);
return false;
}
return dataProcessor.processVerifyData(distroData, sourceAddress);
}
// DistroClientDataProcessor.java
@Override
public boolean processVerifyData(DistroData distroData, String sourceAddress) {
DistroClientVerifyInfo verifyData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), DistroClientVerifyInfo.class);
// 調用ClientManager來驗證
if (clientManager.verifyClient(verifyData.getClientId())) {
return true;
}
Loggers.DISTRO.info("client {} is invalid, get new client from {}", verifyData.getClientId(), sourceAddress);
return false;
}
// EphemeralIpPortClientManager.java
@Override
public boolean verifyClient(String clientId) {
// 從客戶端管理器中獲取客戶端實例
IpPortBasedClient client = clients.get(clientId);
if (null != client) {
// 若不為空,啟動一個心跳更新任務
NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(clientId, new ClientBeatUpdateTask(client));
return true;
}
return false;
}
public class ClientBeatUpdateTask extends AbstractExecuteTask {
private final IpPortBasedClient client;
public ClientBeatUpdateTask(IpPortBasedClient client) {
this.client = client;
}
@Override
public void run() {
// 獲取當前時間,更新Client和Client下的Instance的最新活躍時間
long currentTime = System.currentTimeMillis();
for (InstancePublishInfo each : client.getAllInstancePublishInfo()) {
((HealthCheckInstancePublishInfo) each).setLastHeartBeatTime(currentTime);
}
// 更新client的最新更新時間
client.setLastUpdatedTime();
}
}
通過最后一步的ClientBeatUpdateTask
可以發現,其他節點發送來的verify請求,實際上就是發送端的狀態報告請求。Client.getAllInstancePublishInfo()返回的是某一個客戶端下的所有實例信息列表,最后再更新Client自身的最新活躍時間。
驗證任務功能總結:
- 每個節點在應用啟動完畢之后延遲
5
秒,之后間隔5
秒向其他節點發送Verify請求。- 每個節點只發送自己負責的、且未被移除的Client,且每一個Client都會發送一次請求,請求參數里面只附帶了Client的clientId屬性。(意味着當前節點只會告訴其他節點我目前有這個Client正在提供服務,並未提供服務的具體信息。)
- 接收Verify請求的節點從請求參數中獲取clientId,並檢查自身是否有這個Client,若此Client存在,則更新Client下的所有Instance、以及Client自身的最新活躍時間為當前時間。
節點數據同步
除了開啟驗證任務之外,還開啟了一個數據加載的任務startLoadTask()
,用於從其他節點同步數據到本節點。同時它也維護着本機節點是否初始化完成的一個狀態isInitialized
,當前節點從其他節點加載數據成功之后,將這個狀態設置為true。表示當前節點已經同步完畢數據,可以參與正常服務;同時又設置了DistroDataStorage
的isFinishInitial
屬性為true,表示數據已准備好,驗證任務可以執行了。
數據同步的執行流程
// DistroProtocol.java
private void startLoadTask() {
DistroCallback loadCallback = new DistroCallback() {
@Override
public void onSuccess() {
isInitialized = true;
}
@Override
public void onFailed(Throwable throwable) {
isInitialized = false;
}
};
GlobalExecutor.submitLoadDataTask(new DistroLoadDataTask(memberManager, distroComponentHolder, DistroConfig.getInstance(), loadCallback));
}
加載任務的一開始創建了一個 DistroLoadDataTask 任務,並傳入了一個在加載完畢之后更改當前節點Distro協議完成狀態的回調函數。
// DistroLoadDataTask.java
private void load() throws Exception {
// 若出自身之外沒有其他節點,則休眠1秒,可能其他節點還未啟動完畢
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
// 若數據類型為空,說明distroComponentHolder的組件注冊器還未初始化完畢(v1版本為DistroHttpRegistry, v2版本為DistroClientComponentRegistry)
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
// 加載每個類型的數據
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
// 調用加載方法,並標記已處理
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
/**
* 從其他節點獲取同步數據
* @param resourceType
* @return
*/
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
// 獲取數據傳輸對象
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
// 獲取數據處理器
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}", resourceType, transportAgent, dataProcessor);
return false;
}
// 向每個節點請求數據
for (Member each : memberManager.allMembersWithoutSelf()) {
try {
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
// 獲取到數據
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
// 解析數據
boolean result = dataProcessor.processSnapshot(distroData);
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(), result);
// 若解析成功,標記此類型數據已加載完畢
if (result) {
distroComponentHolder.findDataStorage(resourceType).finishInitial();
return true;
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
}
}
return false;
}
加載的流程實際上和驗證類似,都是通過獲取需要加載的數據類型,使用DistroTransportAgent獲取數據,使用DistroDataProcessor來處理數據。
執行同步任務
提示:
對於v1版本的相關操作,后續將不再分析,感興趣的同學可以自行研究。此處已經給出了數據加載流程,無非就是他們的處理邏輯不同。
v2版本的數據同步
真正的同步任務從DistroLoadDataTask
的loadAllDataSnapshotFromRemote
方法開始。
使用DistroTransportAgent獲取數據
// DistroClientTransportAgent.java
public DistroData getDatumSnapshot(String targetServer) {
// 從節點管理器獲取目標節點信息
Member member = memberManager.find(targetServer);
// 判斷目標服務器是否健康
if (checkTargetServerStatusUnhealthy(member)) {
throw new DistroException(String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
}
// 構建請求參數
DistroDataRequest request = new DistroDataRequest();
// 設置請求的操作類型為DataOperation.SNAPSHOT
request.setDataOperation(DataOperation.SNAPSHOT);
try {
// 使用Rpc代理對象發送同步rpc請求
Response response = clusterRpcClientProxy.sendRequest(member, request);
if (checkResponse(response)) {
return ((DistroDataResponse) response).getDistroData();
} else {
throw new DistroException(String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s", targetServer, response.getErrorCode(), response.getMessage()));
}
} catch (NacosException e) {
throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);
}
}
使用DistroDataProcessor處理數據
// DistroClientDataProcessor.java
public boolean processSnapshot(DistroData distroData) {
// 反序列化獲取的DistroData為ClientSyncDatumSnapshot
ClientSyncDatumSnapshot snapshot = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncDatumSnapshot.class);
// 處理結果集,這里將返回遠程節點負責的所有client以及client下面的service、instance信息
for (ClientSyncData each : snapshot.getClientSyncDataList()) {
// 每次處理一個client
handlerClientSyncData(each);
}
return true;
}
private void handlerClientSyncData(ClientSyncData clientSyncData) {
Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
// 因為是同步數據,因此創建IpPortBasedClient,並緩存
clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
Client client = clientManager.getClient(clientSyncData.getClientId());
// 升級此客戶端的服務信息
upgradeClient(client, clientSyncData);
}
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
// 當前處理的遠端節點中的數據集合
// 獲取所有的namespace
List<String> namespaces = clientSyncData.getNamespaces();
// 獲取所有的groupNames
List<String> groupNames = clientSyncData.getGroupNames();
// 獲取所有的serviceNames
List<String> serviceNames = clientSyncData.getServiceNames();
// 獲取所有的instance
List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
// 已同步的服務集合
Set<Service> syncedService = new HashSet<>();
// ①
for (int i = 0; i < namespaces.size(); i++) {
// 從獲取的數據中構建一個Service對象
Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
Service singleton = ServiceManager.getInstance().getSingleton(service);
// 標記此service已被處理
syncedService.add(singleton);
// 獲取當前的實例
InstancePublishInfo instancePublishInfo = instances.get(i);
// 判斷是否已經包含當前實例
if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
// 不包含則添加
client.addServiceInstance(singleton, instancePublishInfo);
// 當前節點發布服務注冊事件
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
}
}
// 若當前client內部已發布的service不在本次同步的列表內,說明已經過時了,要刪掉
for (Service each : client.getAllPublishedService()) {
if (!syncedService.contains(each)) {
client.removeServiceInstance(each);
// 發布客戶端下線事件
NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
}
}
}
注釋①標記處的處理邏輯和ClientSyncData
這個對象存儲的對象有關系,此處是存放的以Service為維度的信息,它將一個Service的全部信息分別保存,並保證所有列表中的數據順序一致。
ClientSyncData示例數據:
clientSyncData = {ClientSyncData@12737}
clientId = "10.55.56.1:8888#true"
attributes = {ClientSyncAttributes@12740}
namespaces = {ArrayList@12741} size = 2
0 = "public"
1 = "public"
groupNames = {ArrayList@12742} size = 2
0 = "DEFAULT_GROUP"
1 = "DEFAULT_GROUP"
serviceNames = {ArrayList@12743} size = 2
0 = "SERVICE_01"
1 = "SERVICE_02"
instancePublishInfos = {ArrayList@12744} size = 2
0 = {InstancePublishInfo@12941} "InstancePublishInfo{ip='10.55.56.1', port=8888, healthy=false}"
1 = {InstancePublishInfo@12942} "InstancePublishInfo{ip='10.55.56.1', port=8888, healthy=false}"
通過示例數據可以看出在10.55.56.1
這個客戶端中有兩個服務,他們都在同一個namespace、同一個group中,因為InstancePublishInfo是和Service一對一的關系,而一個客戶端下的服務IP一定和客戶端的IP是一致的,所以也會存在兩條instance信息。upgradeClient的主要功能就是,將從其他節點獲取的所有注冊的服務注冊到當前節點內。
TODO 這里有個疑問,首次同步數據只會執行一次拉取(若拉取失敗則會再次拉取,若拉取過后沒有數據也不會再次拉取),而且拉取的是某個節點負責的服務數據,為何當前節點要發布事件呢?服務的狀態維護不是應該由它負責的節點來維護嘛,比如我拉取的A服務是B節點的,我同步過來就OK了,如果A服務下線了,B節點來觸發變更不就行了。然后再通知其他節點下線。
同步數據功能總結:
在一個節點啟動之后,會主動向集群中的其他節點發送數據同步請求,接收到結果之后將其注冊到自身節點內。
DistroProtocol 功能總結:
- 節點狀態報告:能用於向其他節點報告自身的服務狀態。
- 數據同步: 從其他節點拉取服務數據注冊。