Nacos服務心跳和健康檢查源碼介紹


服務心跳

Nacos Client會維護一個定時任務通過持續調用服務端的接口更新心跳時間,保證自己處於存活狀態,防止服務端將服務剔除,Nacos默認5秒向服務端發送一次,通過請求服務端接口/instance/beat發送心跳。
客戶端服務在注冊服務的時候會增加一個心跳的任務,如下圖所示:


首先看下BeatInfo這個類,重點看標注的字段,該字段是給周期任務設定時間,如下圖:

該方法內部定義的一個DEFAULT_HEART_BEAT_INTERVAL的常量,設定5秒:

接下來我們看下addBeatInfo方法,該方法內部主要是將BeatTask任務加入到線程池ScheduledExecutorService當中,如下圖:

重點部分就是看BeatTask,BeatTask繼承Runnable,run方法就是我們的重點,該方法調用了NamingProxy的sendBeat方法,服務端請求地址為/instance/beat的方法


接下來我們把目光放到服務端,找到InstanceController的beat方法,如果是參數beat信息的話,說明是第一次發起心跳,則會帶有服務實例信息,因為發起心跳成功則服務端會返回下次不要帶beat信息的參數,這樣客戶端第二次就不會攜帶beat信息了。如果發現沒有該服務,又沒帶beat信息,說明這個服務可能被移除過了,直接返回沒找到。如果沒有服務,但是發現有beat信息,那就從beat中獲取服務實例信息,進行注冊,整體執行流程如下圖:
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {

    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    //設置心跳間隔
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());

    String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    RsInfo clientBeat = null;
    //判斷有無心跳內容
    //如果存在心跳內容則不是輕量級心跳就轉化為RsInfo
    if (StringUtils.isNotBlank(beat)) {
        clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    }
    String clusterName = WebUtils
            .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
    int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
    if (clientBeat != null) {
        if (StringUtils.isNotBlank(clientBeat.getCluster())) {
            clusterName = clientBeat.getCluster();
        } else {
            // fix #2533
            clientBeat.setCluster(clusterName);
        }
        ip = clientBeat.getIp();
        port = clientBeat.getPort();
    }
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
    //獲取實例的信息
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    //如果實例不存在
    if (instance == null) {
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }

        Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
        //根據您心跳內容創建一個實例信息
        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setMetadata(clientBeat.getMetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());
        //注冊實例
        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }
    //獲取服務的信息
    Service service = serviceManager.getService(namespaceId, serviceName);

    if (service == null) {
        throw new NacosException(NacosException.SERVER_ERROR,
                "service not found: " + serviceName + "@" + namespaceId);
    }
    //不存在的話,要創建一個進行處理
    if (clientBeat == null) {
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
        clientBeat.setPort(port);
        clientBeat.setCluster(clusterName);
    }
    //開啟心跳檢查任務
    service.processClientBeat(clientBeat);

    result.put(CommonParams.CODE, NamingResponseCode.OK);
    //5秒間隔
    if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    }
    //告訴客戶端不需要帶上心跳信息了,變成輕量級心跳了
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    return result;
}
View Code

接下來我們看一下processClientBeat方法,該方法將ClientBeatProcessor放入到線程池中,接下來我們看下重點看下run方法,


該方法內部主要就是更新對應實例下心跳時間,整體上如下圖:

至此完成了從客戶端到服務端更新實例的心跳時間,下圖是整體的時序圖:

服務的健康檢查

Nacos Server會開啟一個定時任務來檢查注冊服務的健康情況,對於超過15秒沒收到客戶端的心跳實例會將它的 healthy屬性置為false,此時當客戶端不會將該實例的信息發現,如果某個服務的實例超過30秒沒收到心跳,則剔除該實例,如果剔除的實例恢復,發送心跳則會恢復。
當有實例注冊的時候,我們會看到有個service.init()的方法,該方法的實現主要是將ClientBeatCheckTask加入到線程池當中,如下圖:


ClientBeatCheckTask中的run方法主要做兩件事心跳時間超過15秒則設置該實例信息為不健康狀況和心跳時間超過30秒則刪除該實例信息,如下代碼:
public void run() {
    try {
        if (!getDistroMapper().responsible(service.getName())) {
            return;
        }

        if (!getSwitchDomain().isHealthCheckEnabled()) {
            return;
        }
        //獲取服務所有實例信息
        List<Instance> instances = service.allIPs(true);

        // first set health status of instances:
        for (Instance instance : instances) {
            //如果心跳時間超過15秒則設置該實例信息為不健康狀況
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
                    if (instance.isHealthy()) {
                        instance.setHealthy(false);
                        Loggers.EVT_LOG
                                .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                        instance.getIp(), instance.getPort(), instance.getClusterName(),
                                        service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                        instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                        getPushService().serviceChanged(service);
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }

        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }

        // then remove obsolete instances:
        for (Instance instance : instances) {

            if (instance.isMarked()) {
                continue;
            }
            //如果心跳時間超過30秒則刪除該實例信息
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                // delete instance
                Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                        JacksonUtils.toJson(instance));
                deleteIp(instance);
            }
        }

    } catch (Exception e) {
        Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    }

}
View Code

首先我們來看一下deleteIp方法,該方法內部主要通過構建刪除請求,發送刪除請求,如下圖:


刪除實例的接口如下圖:

內部通過調用ServiceManager的removeInstance方法,如下圖:

重點看下substractIpAddresses內部通過調用updateIpAddresses,該方法內部主要就是移除到超過30秒的實例信息,如下圖:

到此完成刪除實例的過程,整體的時序圖如下:

接下來我們看標記不健康時候的代碼,這部分代碼在客戶端注冊的時候也出現相同的代碼,只是我們略過了,這部分也是觀察者模式的重要體現,從這里我們可以學習到的東西在於結合Spring的事件機制,輕松實現觀察者模式,當然這個里面也有部分我感覺寫的不太好,哈哈,大佬們看到勿噴。

首先我們看serviceChanged方法,該方法主要是發布一個服務不健康的事件,如下圖:

接下來我們看下如何處理這個事件,這個時候涉及PushService這個類,整體的繼承結構如下圖:

我們看到該類的繼承ApplicationListener接口,該接口是一個支持泛型的接口,傳入了ServiceChangeEvent的類,此處就是對事件的處理,如下圖:

接下來看一下onApplicationEvent方法,這個方法主要完成了准備數據,發送數據這幾件事情:
public void onApplicationEvent(ServiceChangeEvent event) {
    Service service = event.getService();
    String serviceName = service.getName();
    String namespaceId = service.getNamespaceId();

    Future future = GlobalExecutor.scheduleUdpSender(() -> {
        try {
            Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
            //獲取所有需要推送的客戶端
            ConcurrentMap<String, PushClient> clients = clientMap
                    .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            if (MapUtils.isEmpty(clients)) {
                return;
            }

            Map<String, Object> cache = new HashMap<>(16);
            long lastRefTime = System.nanoTime();
            for (PushClient client : clients.values()) {
                //超時的不刪除跳過處理
                if (client.zombie()) {
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    clients.remove(client.toString());
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    continue;
                }

                Receiver.AckEntry ackEntry;
                Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                byte[] compressData = null;
                Map<String, Object> data = null;

                if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                    org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                    compressData = (byte[]) (pair.getValue0());
                    data = (Map<String, Object>) pair.getValue1();
                    Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                }
                //准備UDP數據
                if (compressData != null) {
                    ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                } else {
                    ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                    if (ackEntry != null) {
                        cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                    }
                }

                Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                        client.getServiceName(), client.getAddrStr(), client.getAgent(),
                        (ackEntry == null ? null : ackEntry.key));
                //發送數據
                udpPush(ackEntry);
            }
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

        } finally {
            //發送完成刪除
            futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
        }

    }, 1000, TimeUnit.MILLISECONDS);
    //增加待推送的任務
    futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

}
View Code

接下里我們重點看下udpPush的方法,整個方法主要是通過一個Map對象來記錄UDP請求,如果沒收到就重試發送請求,整體如下:

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
    if (ackEntry == null) {
        Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
        return null;
    }

    //如果大於最大的嘗試次數
    //移除發送的數據和待確認的key
    //失敗推送的次數+1
    if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
        Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        return ackEntry;
    }

    try {
        if (!ackMap.containsKey(ackEntry.key)) {
            totalPush++;
        }
        //記錄UDP請求的返回信息
        ackMap.put(ackEntry.key, ackEntry);
        udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());

        Loggers.PUSH.info("send udp packet: " + ackEntry.key);
        //發送UDP請求
        udpSocket.send(ackEntry.origin);

        ackEntry.increaseRetryTime();
        //如果UDP沒收到返回信息 每10秒嘗試一下
        GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
                TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);

        return ackEntry;
    } catch (Exception e) {
        Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
                ackEntry.origin.getAddress().getHostAddress(), e);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;

        return null;
    }
}
View Code

服務端有發送,那么客戶端就有接收的,接收部分我理解上是服務發現部分,這里我們就不做過多介紹,待下一篇再來聊聊。

結束

歡迎大家點點關注,點點贊,感謝!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM