服務心跳
Nacos Client會維護一個定時任務通過持續調用服務端的接口更新心跳時間,保證自己處於存活狀態,防止服務端將服務剔除,Nacos默認5秒向服務端發送一次,通過請求服務端接口/instance/beat發送心跳。
客戶端服務在注冊服務的時候會增加一個心跳的任務,如下圖所示:

首先看下BeatInfo這個類,重點看標注的字段,該字段是給周期任務設定時間,如下圖:

該方法內部定義的一個DEFAULT_HEART_BEAT_INTERVAL的常量,設定5秒:

接下來我們看下addBeatInfo方法,該方法內部主要是將BeatTask任務加入到線程池ScheduledExecutorService當中,如下圖:

重點部分就是看BeatTask,BeatTask繼承Runnable,run方法就是我們的重點,該方法調用了NamingProxy的sendBeat方法,服務端請求地址為/instance/beat的方法


接下來我們把目光放到服務端,找到InstanceController的beat方法,如果是參數beat信息的話,說明是第一次發起心跳,則會帶有服務實例信息,因為發起心跳成功則服務端會返回下次不要帶beat信息的參數,這樣客戶端第二次就不會攜帶beat信息了。如果發現沒有該服務,又沒帶beat信息,說明這個服務可能被移除過了,直接返回沒找到。如果沒有服務,但是發現有beat信息,那就從beat中獲取服務實例信息,進行注冊,整體執行流程如下圖:

@CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat(HttpServletRequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); //設置心跳間隔 result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); RsInfo clientBeat = null; //判斷有無心跳內容 //如果存在心跳內容則不是輕量級心跳就轉化為RsInfo if (StringUtils.isNotBlank(beat)) { clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } String clusterName = WebUtils .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME); String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY); int port = Integer.parseInt(WebUtils.optional(request, "port", "0")); if (clientBeat != null) { if (StringUtils.isNotBlank(clientBeat.getCluster())) { clusterName = clientBeat.getCluster(); } else { // fix #2533 clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); //獲取實例的信息 Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); //如果實例不存在 if (instance == null) { if (clientBeat == null) { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; } Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName); //根據您心跳內容創建一個實例信息 instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setMetadata(clientBeat.getMetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral()); //注冊實例 serviceManager.registerInstance(namespaceId, serviceName, instance); } //獲取服務的信息 Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId); } //不存在的話,要創建一個進行處理 if (clientBeat == null) { clientBeat = new RsInfo(); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(clusterName); } //開啟心跳檢查任務 service.processClientBeat(clientBeat); result.put(CommonParams.CODE, NamingResponseCode.OK); //5秒間隔 if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) { result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval()); } //告訴客戶端不需要帶上心跳信息了,變成輕量級心跳了 result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; }
接下來我們看一下processClientBeat方法,該方法將ClientBeatProcessor放入到線程池中,接下來我們看下重點看下run方法,

該方法內部主要就是更新對應實例下心跳時間,整體上如下圖:

至此完成了從客戶端到服務端更新實例的心跳時間,下圖是整體的時序圖:

服務的健康檢查
Nacos Server會開啟一個定時任務來檢查注冊服務的健康情況,對於超過15秒沒收到客戶端的心跳實例會將它的 healthy屬性置為false,此時當客戶端不會將該實例的信息發現,如果某個服務的實例超過30秒沒收到心跳,則剔除該實例,如果剔除的實例恢復,發送心跳則會恢復。
當有實例注冊的時候,我們會看到有個service.init()的方法,該方法的實現主要是將ClientBeatCheckTask加入到線程池當中,如下圖:

ClientBeatCheckTask中的run方法主要做兩件事心跳時間超過15秒則設置該實例信息為不健康狀況和心跳時間超過30秒則刪除該實例信息,如下代碼:

public void run() { try { if (!getDistroMapper().responsible(service.getName())) { return; } if (!getSwitchDomain().isHealthCheckEnabled()) { return; } //獲取服務所有實例信息 List<Instance> instances = service.allIPs(true); // first set health status of instances: for (Instance instance : instances) { //如果心跳時間超過15秒則設置該實例信息為不健康狀況 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { instance.setHealthy(false); Loggers.EVT_LOG .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat()); getPushService().serviceChanged(service); ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } } if (!getGlobalConfig().isExpireInstance()) { return; } // then remove obsolete instances: for (Instance instance : instances) { if (instance.isMarked()) { continue; } //如果心跳時間超過30秒則刪除該實例信息 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JacksonUtils.toJson(instance)); deleteIp(instance); } } } catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); } }
首先我們來看一下deleteIp方法,該方法內部主要通過構建刪除請求,發送刪除請求,如下圖:

刪除實例的接口如下圖:

內部通過調用ServiceManager的removeInstance方法,如下圖:

重點看下substractIpAddresses內部通過調用updateIpAddresses,該方法內部主要就是移除到超過30秒的實例信息,如下圖:

到此完成刪除實例的過程,整體的時序圖如下:

接下來我們看標記不健康時候的代碼,這部分代碼在客戶端注冊的時候也出現相同的代碼,只是我們略過了,這部分也是觀察者模式的重要體現,從這里我們可以學習到的東西在於結合Spring的事件機制,輕松實現觀察者模式,當然這個里面也有部分我感覺寫的不太好,哈哈,大佬們看到勿噴。

首先我們看serviceChanged方法,該方法主要是發布一個服務不健康的事件,如下圖:

接下來我們看下如何處理這個事件,這個時候涉及PushService這個類,整體的繼承結構如下圖:

我們看到該類的繼承ApplicationListener接口,該接口是一個支持泛型的接口,傳入了ServiceChangeEvent的類,此處就是對事件的處理,如下圖:

接下來看一下onApplicationEvent方法,這個方法主要完成了准備數據,發送數據這幾件事情:

public void onApplicationEvent(ServiceChangeEvent event) { Service service = event.getService(); String serviceName = service.getName(); String namespaceId = service.getNamespaceId(); Future future = GlobalExecutor.scheduleUdpSender(() -> { try { Loggers.PUSH.info(serviceName + " is changed, add it to push queue."); //獲取所有需要推送的客戶端 ConcurrentMap<String, PushClient> clients = clientMap .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); if (MapUtils.isEmpty(clients)) { return; } Map<String, Object> cache = new HashMap<>(16); long lastRefTime = System.nanoTime(); for (PushClient client : clients.values()) { //超時的不刪除跳過處理 if (client.zombie()) { Loggers.PUSH.debug("client is zombie: " + client.toString()); clients.remove(client.toString()); Loggers.PUSH.debug("client is zombie: " + client.toString()); continue; } Receiver.AckEntry ackEntry; Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString()); String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent()); byte[] compressData = null; Map<String, Object> data = null; if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) { org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key); compressData = (byte[]) (pair.getValue0()); data = (Map<String, Object>) pair.getValue1(); Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr()); } //准備UDP數據 if (compressData != null) { ackEntry = prepareAckEntry(client, compressData, data, lastRefTime); } else { ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime); if (ackEntry != null) { cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data)); } } Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key)); //發送數據 udpPush(ackEntry); } } catch (Exception e) { Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e); } finally { //發送完成刪除 futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); } }, 1000, TimeUnit.MILLISECONDS); //增加待推送的任務 futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future); }
接下里我們重點看下udpPush的方法,整個方法主要是通過一個Map對象來記錄UDP請求,如果沒收到就重試發送請求,整體如下:

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) { if (ackEntry == null) { Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null."); return null; } //如果大於最大的嘗試次數 //移除發送的數據和待確認的key //失敗推送的次數+1 if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) { Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key); ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return ackEntry; } try { if (!ackMap.containsKey(ackEntry.key)) { totalPush++; } //記錄UDP請求的返回信息 ackMap.put(ackEntry.key, ackEntry); udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis()); Loggers.PUSH.info("send udp packet: " + ackEntry.key); //發送UDP請求 udpSocket.send(ackEntry.origin); ackEntry.increaseRetryTime(); //如果UDP沒收到返回信息 每10秒嘗試一下 GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS); return ackEntry; } catch (Exception e) { Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e); ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return null; } }
服務端有發送,那么客戶端就有接收的,接收部分我理解上是服務發現部分,這里我們就不做過多介紹,待下一篇再來聊聊。
結束
歡迎大家點點關注,點點贊,感謝!
