Eureka客戶端續約及服務端過期租約清理源碼...


Eureka客戶端續約及服務端過期租約清理源碼解析
在之前的文章: EurekaClient自動裝配及啟動流程解析中,我們提到了在構造DiscoveryClient時除了包含注冊流程之外,還調度了一個心跳線程:
scheduler.schedule(                    new TimedSupervisorTask(                            "heartbeat",                            scheduler,                            heartbeatExecutor,                            renewalIntervalInSecs,                            TimeUnit.SECONDS,                            expBackOffBound,                            new HeartbeatThread()                    ),                    renewalIntervalInSecs, TimeUnit.SECONDS);                    
其中HeartbeatThread線程如下:
    private class HeartbeatThread implements Runnable {        public void run() {        //續約            if (renew()) {              //續約成功時間戳更新                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();            }        }    } boolean renew() {        EurekaHttpResponse<InstanceInfo> httpResponse;        try {          //發送續約請求            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());            if (httpResponse.getStatusCode() == 404) {                REREGISTER_COUNTER.increment();                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());                long timestamp = instanceInfo.setIsDirtyWithTime();              //重新注冊                boolean success = register();                if (success) {                    instanceInfo.unsetIsDirty(timestamp);                }                return success;            }            return httpResponse.getStatusCode() == 200;        } catch (Throwable e) {            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);            return false;        }    }
這里直接發出了續約請求,如果續約請求失敗則會嘗試再次去注冊
服務端接受續約請求
服務端接受續約請求的Controller在InstanceResource類中
@PUT    public Response renewLease(            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,            @QueryParam("overriddenstatus"String overriddenStatus,            @QueryParam("status"String status,            @QueryParam("lastDirtyTimestamp"String lastDirtyTimestamp) {        boolean isFromReplicaNode = "true".equals(isReplication);      //續約        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);        // 續約失敗        if (!isSuccess) {            logger.warn("Not Found (Renew): {} - {}", app.getName(), id);            return Response.status(Status.NOT_FOUND).build();        }        // 校驗客戶端與服務端的時間差異,如果存在問題則需要重新發起注冊        Response response = null;        if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);            if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()                    && (overriddenStatus != null)                    && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))                    && isFromReplicaNode) {                registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));            }        } else {            response = Response.ok().build();        }        logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());        return response;    }
可以看到續約之后還有一個檢查時間差的問題,這個不詳細展開,繼續往下看續約的相關信息
    public boolean renew(final String appName, final String id, final boolean isReplication) {        if (super.renew(appName, id, isReplication)) {          //集群同步            replicateToPeers(Action.Heartbeat, appName, id, nullnull, isReplication);            return true;        }        return false;    }
這里集群同步的相關內容在之前的文章已經說過了,不再展開,續約的核心處理在下面
public boolean renew(String appName, String id, boolean isReplication) {        RENEW.increment(isReplication);   //獲取已存在的租約        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);        Lease<InstanceInfo> leaseToRenew = null;        if (gMap != null) {            leaseToRenew = gMap.get(id);        }   //租約不存在        if (leaseToRenew == null) {            RENEW_NOT_FOUND.increment(isReplication);            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);            return false;        } else {          //獲取客戶端            InstanceInfo instanceInfo = leaseToRenew.getHolder();          //設置客戶端的狀態            if (instanceInfo != null) {                // touchASGCache(instanceInfo.getASGName());                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(                        instanceInfo, leaseToRenew, isReplication);                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"                            + "; re-register required", instanceInfo.getId());                    RENEW_NOT_FOUND.increment(isReplication);                    return false;                }                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {                    logger.info(                            "The instance status {} is different from overridden instance status {} for instance {}. "                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),                                    instanceInfo.getOverriddenStatus().name(),                                    instanceInfo.getId());                  //覆蓋當前狀態                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);                }            }            renewsLastMin.increment();          //設置租約最后更新時間            leaseToRenew.renew();            return true;        }    }
對於看過之前文章的同學來說整體流程比較簡單
服務端過期租約清理
在文章 Eureka應用注冊與集群數據同步源碼解析一文中大家應該對下面這行代碼比較熟悉
int registryCount = registry.syncUp();
上面這行代碼發起了集群數據同步,而緊接着這行代碼的就是服務端的過期租約清理邏輯
registry.openForTraffic(applicationInfoManager, registryCount);
openForTraffic方法的最后調用了一個方法postInit,而在postInit方法中啟動了一個線程EvictionTask,這個線程就負責清理已經過期的租約
evictionTimer.schedule(evictionTaskRef.get(),       serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs());
看一下這個線程
class EvictionTask extends TimerTask {   @Override   public void run() {       try {           //補償時間毫秒數           long compensationTimeMs = getCompensationTimeMs();           logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);           // 清理邏輯           evict(compensationTimeMs);       } catch (Throwable e) {           logger.error("Could not run the evict task", e);       }   }}
其中補償時間的獲取是這樣的:
long getCompensationTimeMs() {            long currNanos = getCurrentTimeNano();            long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);            if (lastNanos == 0l) {                return 0l;            }            long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);            //當前時間 - 最后任務執行時間 - 任務執行頻率            long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();            return compensationTime <= 0l ? 0l : compensationTime;        }
接着看清理的核心邏輯
public void evict(long additionalLeaseMs) {        logger.debug("Running the evict task");        if (!isLeaseExpirationEnabled()) {            logger.debug("DS: lease expiration is currently disabled.");            return;        }        // 1. 獲得所有的過期租約        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();        for (Entry<StringMap<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();            if (leaseMap != null) {                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {                    Lease<InstanceInfo> lease = leaseEntry.getValue();                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {                        expiredLeases.add(lease);                    }                }            }        }        // 2. 計算允許清理的數量        int registrySize = (int) getLocalRegistrySize();        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());        int evictionLimit = registrySize - registrySizeThreshold;        int toEvict = Math.min(expiredLeases.size(), evictionLimit);                // 3. 過期        if (toEvict > 0) {            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);            Random random = new Random(System.currentTimeMillis());            for (int i = 0; i < toEvict; i++) {                // Pick a random item (Knuth shuffle algorithm)                int next = i + random.nextInt(expiredLeases.size() - i);                Collections.swap(expiredLeases, i, next);                Lease<InstanceInfo> lease = expiredLeases.get(i);                String appName = lease.getHolder().getAppName();                String id = lease.getHolder().getId();                EXPIRED.increment();                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);                internalCancel(appName, id, false);            }        }    }
整個過期的執行過程主要分為以下3個步驟:
  • 獲得所有的過期租約
    過期租約的計算方法為isExpired
public boolean isExpired(long additionalLeaseMs) {        return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));}
服務下線時間>0||當前時間>(最后更新時間+租約持續時間+補償時間)
  • 計算允許清理的數量
    getRenewalPercentThreshold()默認值為0.85,也是就說默認情況下每次清理最大允許過期數量和15%的所有注冊數量兩者之間的最小值
  • 過期
    過期的清理是隨機進行的,這樣設計也是為了避免單個應用全部過期的。
    過期的處理則和注冊的處理正好是相反的:
protected boolean internalCancel(String appName, String id, boolean isReplication) {        try {            read.lock();            CANCEL.increment(isReplication);            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);            Lease<InstanceInfo> leaseToCancel = null;            if (gMap != null) {                leaseToCancel = gMap.remove(id);            }            synchronized (recentCanceledQueue) {                recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));            }            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);            if (instanceStatus != null) {                logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());            }            if (leaseToCancel == null) {                CANCEL_NOT_FOUND.increment(isReplication);                logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);                return false;            } else {                leaseToCancel.cancel();                InstanceInfo instanceInfo = leaseToCancel.getHolder();                String vip = null;                String svip = null;                if (instanceInfo != null) {                    instanceInfo.setActionType(ActionType.DELETED);                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));                    instanceInfo.setLastUpdatedTimestamp();                    vip = instanceInfo.getVIPAddress();                    svip = instanceInfo.getSecureVipAddress();                }                invalidateCache(appName, vip, svip);                logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);                return true;            }        } finally {            read.unlock();        }    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM