Eureka服務續約(Renew)源碼分析


主要對Eureka的Renew(服務續約),從服務提供者發起續約請求開始分析,通過閱讀源碼和畫時序圖的方式,展示Eureka服務續約的整個生命周期。服務續約主要是把服務續約的信息更新到自身的Eureka Server中,然后再同步到其它Eureka Server中。

 

Renew(服務續約)操作由Service Provider定期調用,類似於heartbeat。目的是隔一段時間Service Provider調用接口,告訴Eureka Server它還活着沒掛,不要把它T了。通俗的說就是它們兩之間的心跳檢測,避免服務提供者被剔除掉。

 

Renew源碼分析

服務提供者實現細節

服務提供者發發起服務續約的時序圖
服務提供者發起續約時序圖

 

 

在com.netflix.discovery.DiscoveryClient.initScheduledTasks()中的1272行,TimedSupervisorTask會定時發起服務續約,代碼如下所示:

// Heartbeat timer
  scheduler.schedule(
     new TimedSupervisorTask(
            "heartbeat",
             scheduler,
             heartbeatExecutor,
             renewalIntervalInSecs,
             TimeUnit.SECONDS,
             expBackOffBound,
              new HeartbeatThread()
            ),
  renewalIntervalInSecs, TimeUnit.SECONDS);

 

 

2.在com.netflix.discovery.DiscoveryClient中的1393行,有一個HeartbeatThread線程發起續約操作

private class HeartbeatThread implements Runnable {
        public void run() {
            //調用eureka-client中的renew
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
}

 

renew()調用eureka-client-1.4.11.jarcom.netflix.discovery.DiscoveryClient中829行renew()發起PUT Reset請求,

調用com.netflix.eureka.resources.InstanceResource中的renewLease()續約。

 

Netflix中的Eureka Core實現細節

NetFlix中Eureka Core中的服務續約時序圖,如下圖所示。
服務續約時序圖

 

打開com.netflix.eureka.resources.InstanceResource中的106行的renewLease()方法,代碼如下:

private final PeerAwareInstanceRegistry registry
@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    //調用
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
    //其余省略
}

 

 

點開registry.renew(app.getName(), id, isFromReplicaNode);我們可以看到,調用了org.springframework.cloud.netflix.eureka.server.InstanceRegistry中的renew()方法,代碼如下:

@Override
    public boolean renew(final String appName, final String serverId,
            boolean isReplication) {
        log("renew " + appName + " serverId " + serverId + ", isReplication {}"
                + isReplication);
        List<Application> applications = getSortedApplications();
        for (Application input : applications) {
            if (input.getName().equals(appName)) {
                InstanceInfo instance = null;
                for (InstanceInfo info : input.getInstances()) {
                    if (info.getHostName().equals(serverId)) {
                        instance = info;
                        break;
                    }
                }
                publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
                        instance, isReplication));
                break;
            }
        }
        //調用com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl中的renew方法
        return super.renew(appName, serverId, isReplication);
}

 

 

super.renew()看到調用了父類中的com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl420行的renew()方法

public boolean renew(final String appName, final String id, final boolean isReplication) {
        //服務續約成功,
        if (super.renew(appName, id, isReplication)) {
            //然后replicateToPeers同步其它Eureka Server中的數據
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
}

從上面代碼中super.renew(appName, id, isReplication)可以看出調用的是com.netflix.eureka.registry.AbstractInstanceRegistry中345行的renew()方法,

    /**
     * Marks the given instance of the given app name as renewed, and also marks whether it originated from
     * replication.
     *
     * @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)
     */
    public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    Object[] args = {
                            instanceInfo.getStatus().name(),
                            instanceInfo.getOverriddenStatus().name(),
                            instanceInfo.getId()
                    };
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", args);
                    instanceInfo.setStatus(overriddenInstanceStatus);
                }
            }
            renewsLastMin.increment();
            leaseToRenew.renew();
            return true;
        }
    }

 

 

其中 leaseToRenew.renew()是調用com.netflix.eureka.lease.Lease中的62行的renew()方法

public void renew() {
    lastUpdateTimestamp = System.currentTimeMillis() + duration;
}

 

 

 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);

調用自身的replicateToPeers()方法,在com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl中的618行,主要接口實現方式和register基本一致:首先更新自身Eureka Server中服務的狀態,再同步到其它Eureka Server中。

private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }
            // 同步把續約信息同步到其它的Eureka Server中
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                //根據action做相應操作的同步
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM