Service 實現了 RecordListener 接口,在 service 的 instance 發生變化時,調用 onChange 方法。
nacos 在創建 service 對象的時候,會把 service 放入 DistroConsistencyServiceImpl#listeners 中(這里只考慮服務的臨時實例)
// com.alibaba.nacos.naming.core.ServiceManager#putServiceAndInit private void putServiceAndInit(Service service) throws NacosException { putService(service); service.init(); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON()); }
當 service 有新的實例注冊時
1. 修改內存中的數據,推送服務實例變化給 nacos 客戶端
Notifier 增加 Task
// com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#onPut public void onPut(String key, Record value) { // 更新 Datum if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } // 創建 task 放入隊列 notifier.addTask(key, ApplyAction.CHANGE); }
從隊列取出任務執行,觸發 Service.onChange
// com.alibaba.nacos.naming.core.Service#onChange
1.1 更新 Service 對象的內容
1.2 推送變化給 udp 客戶端
// com.alibaba.nacos.naming.push.PushService#serviceChanged public void serviceChanged(Service service) { // merge some change events to reduce the push frequency: if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) { return; } this.applicationContext.publishEvent(new ServiceChangeEvent(this, service)); } // com.alibaba.nacos.naming.push.PushService#onApplicationEvent
客戶端在查詢服務實例的時候,如果提供 udp 端口,則 server 會創建 udpClient,當服務實例發生變化時,推送更新
// com.alibaba.nacos.naming.push.PushService#addClient
udp 推送實例變化,是一個思路,首先確保 nacos 主動連接服務實例網絡通暢,同時通信雙方需要協調好數據包大小。
2. 當前 nacos 要同步數據給其他 nacos 節點
// com.alibaba.nacos.naming.consistency.ephemeral.distro.DataSyncer#submit // com.alibaba.nacos.naming.misc.NamingProxy#syncData
URL 是 /v1/ns/distro/datum
處理請求是 DistroController#onSyncDatum
3. nacos 節點之間還有一種定時同步數據的方式
// com.alibaba.nacos.naming.consistency.ephemeral.distro.DataSyncer.TimedSync
當前 nacos 節點把自己負責的 service 的 key 和實例的 md5 簽名發送給其他 nacos 節點。
其他節點收到 key 和簽名后,對比本地數據,篩選出需要刪除和更新的服務
// com.alibaba.nacos.naming.controllers.DistroController#syncChecksum
需要刪除的直接刪除,需要更新的,發起更新請求
// com.alibaba.nacos.naming.misc.NamingProxy#getData
所以會有多個 nacos 節點推送 udp 報文給同一個客戶端嗎?