【一起學源碼-微服務】Nexflix Eureka 源碼九:服務續約源碼分析


前言

前情回顧

上一講 我們講解了服務發現的相關邏輯,所謂服務發現 其實就是注冊表抓取,服務實例默認每隔30s去注冊中心抓取一下注冊表增量數據,然后合並本地注冊表數據,最后有個hash對比的操作。

本講目錄

今天主要是看下服務續約的邏輯,服務續約就是client端給server端發送心跳檢測,告訴對方我還活着。現在很多分布式系統都會有心跳檢查的機制,這里一起來學習下Eureka是怎么做心跳檢查的。

目錄如下:

  1. client端心跳檢查調度任務
  2. server端接收心跳檢查,設置最后renew時間

這一講內容不太多,因為上一篇文章寫全量和增量注冊表信息內容有點多,所以這里將博客盡量一篇保持一個知識點,后面還會講服務實例下線、摘除、注冊中心自我保護等機制的實現原理。

說明

原創不易,如若轉載 請標明來源:一枝花算不算浪漫

源碼分析

client端心跳檢查調度任務

服務實例續約代碼比較簡單,這里還是從DiscovertClient.java 開始,很多源碼的入口都是在這里,因為client端初始化、注冊 都是走的這里,因為前幾篇文章對這個類已經分析很多了,這里只截取部分重要代碼:

DiscovertClient.java 初始化后 會繼續初始化一些調度任務:

private void initScheduledTasks() {
    if (clientConfig.shouldRegisterWithEureka()) {
    	//  默認也是30s
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

        // Heartbeat timer
		// 執行heartbeatExecutor心跳檢查,默認是30s
        scheduler.schedule(
                new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // 執行線程
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

private class HeartbeatThread implements Runnable {

    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == 404) {
            REREGISTER_COUNTER.increment();
            logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
        return false;
    }
}

public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
    String urlPath = "apps/" + appName + '/' + id;
    Response response = null;
    try {
        WebTarget webResource = jerseyClient.target(serviceUrl)
                .path(urlPath)
                .queryParam("status", info.getStatus().toString())
                .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
        if (overriddenStatus != null) {
            webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
        }
        Builder requestBuilder = webResource.request();
        addExtraProperties(requestBuilder);
        addExtraHeaders(requestBuilder);
        requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE);
        response = requestBuilder.put(Entity.entity("{}", MediaType.APPLICATION_JSON_TYPE)); // Jersey2 refuses to handle PUT with no body
        EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
        if (response.hasEntity()) {
            eurekaResponseBuilder.entity(response.readEntity(InstanceInfo.class));
        }
        return eurekaResponseBuilder.build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey2 HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

這里的流程很簡單,初始化DiscoveryClient 后會新建一個調度任務,然后執行HeartbeatThread中的run方法,默認是renewalIntervalInSecs 30s執行一次。
具體就是給Server端發送一個http請求,類似於:http://localhost:8080/v2/apps/ServiceA/i-000000-1, 走的是put請求。
最后拿到響應結果,續約成功后會更新lastSuccessfulHeartbeatTimestamp 最近成功心跳檢測的時間戳。

server端接收心跳檢查請求

前幾篇文章已經說過,Server端接收http請求的入口在eureka-core模塊下的 resource包里面,這里直接找到ApplicationResource.java中的getInstanceInfo 方法,這里直接請求的InstanceResource 類的構造方法,找到這個方法中的@PUT請求。可以直接看下代碼:

InstanceResource.renewLease +AbstractInstanceRegistry.renew 方法:

@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

    // 省略部分代碼

    logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
    return response;
}


public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        leaseToRenew = gMap.get(id);
    }
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
        return false;
    } else {
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // touchASGCache(instanceInfo.getASGName());
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                        + "; re-register required", instanceInfo.getId());
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                Object[] args = {
                        instanceInfo.getStatus().name(),
                        instanceInfo.getOverriddenStatus().name(),
                        instanceInfo.getId()
                };
                logger.info(
                        "The instance status {} is different from overridden instance status {} for instance {}. "
                                + "Hence setting the status to overridden status", args);
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
            }
        }
        renewsLastMin.increment();
        leaseToRenew.renew();
        return true;
    }
}

這里主要看renew方法, 這里看到registry 是一個注冊表,通過appName獲取對應的服務注冊表信息。

這里主要還是看leaseToRenew.renew() 其實很簡單,就是設置當前示例注冊表的renew屬性的lastUpdateTimestamp 為最新時間+duration。

至於這里的duration 我們下一講會詳細講解,duration 和服務實例摘除有關。

總結

(1)DiscoveryClient初始化的時候,會去調度一堆定時任務,其中有一個就是HeartbeatThread,心跳線程

(2)在這里可以看到,默認是每隔30秒去發送一次心跳,每隔30秒執行一次HeartbeatTHread線程的邏輯,發送心跳

(3)這邊的話就是去發送這個心跳,走的是EurekaHttpClient的sendHeartbeat()方法,http://localhost:8080/v2/apps/ServiceA/i-000000-1,走的是put請求

(4)負責承接服務實例的心跳相關的這些操作的,是ApplicationsResource,服務相關的controller。找到ApplicationResource,再次找到InstanceResource,通過PUT請求,可以找到renewLease方法。

(5)通過注冊表的renew()方法,進去完成服務續約,實際進入AbstractInstanceRegistry的renew()方法

(6)從注冊表的map中,根據服務名和實例id,獲取一個Lease ,實際的服務續約的邏輯,其實就是在Lease對象中,更新一下lastUpdateTimestamp這個時間戳,每次續約,就更新一下這個時間戳就ok了。

申明

本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!

感興趣的小伙伴可關注個人公眾號:壹枝花算不算浪漫

22.jpg


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM