【一起學源碼-微服務】Nexflix Eureka 源碼八:EurekaClient注冊表抓取 精妙設計分析!


前言

前情回顧

上一講 我們通過單元測試 來梳理了EurekaClient是如何注冊到server端,以及server端接收到請求是如何處理的,這里最重要的關注點是注冊表的一個數據結構:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>()

本講目錄

回頭看了下之前的博客,沒有一個總目錄說明,每篇都是直接源碼分析了。從此篇文章開始都會加上目錄,以及文章最后會加上總結及讀此篇源碼的感受。希望這個博客系列的文章會越來越好。

目錄如下:

  1. client端第一次注冊全量抓取注冊表的邏輯
  2. server端返回注冊表信息集合的多級緩存機制
  3. server端注冊表多級緩存過期機制:主動+定時+被動
  4. client端增量抓取注冊表邏輯

技術亮點:

  1. 注冊表抓取的多級緩存機制
  2. 增量抓取返回的全量數據hashCode,和本地數據hashCode對比,保證數據一致性

這里再啰嗦一點,之前一直吐槽EurekaClient注冊的邏輯,今天看了EurekaClient注冊表抓取的邏輯后,不由的感嘆設計的精妙之處,這里說的精妙是指EurekaServer端對於注冊表讀取邏輯的設計,緩存邏輯以及增量獲取時Hash一致性的判斷,真的很妙,感覺又學到了不少東西。讀完這段代碼 一大早就很興奮,哈哈哈,一起看看吧。

說明

原創不易,如若轉載 請標明來源:一枝花算不算浪漫

EurekaClient全量抓取注冊表邏輯

一直在想着怎么才能把自己看完代碼后的理解用文字表達出來,這里采用一種新模式吧,先畫圖,然后源碼,然后解讀。

04_EurekaClient注冊表全量抓取邏輯.png

圖片看起來很簡單,Client發送Http請求給Server端,Server端返回全量的注冊表信息給Client端。接下來就是跟進代碼一步步分析,這里先有個大概印象

源碼解析

  1. Client端發送獲取全量注冊表請求
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {

    // 省略很多無關代碼

    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }

}

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
            logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
            logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
            logger.info("Application is null : {}", (applications == null));
            logger.info("Registered Applications size is zero : {}",
                    (applications.getRegisteredApplications().size() == 0));
            logger.info("Application version is -1: {}", (applications.getVersion() == -1));
            getAndStoreFullRegistry();
        } else {
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    // 刪減掉一些代碼

    // registry was fetched successfully, so return true
    return true;
}

private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    logger.info("Getting all instance registry info from the eureka server");

    Applications apps = null;
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());

    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}

這里就不再贅述Client端是如何一步步跟進到發請求的代碼的,因為之前通過單元測試代碼已經搞清楚了Server端接受請求的類是ApplicationsResource.java, Client端主要核心的代碼也在 DiscoveryClient.java中。

代碼還是之前看了好多遍的祖傳代碼,只是省略了很多內容,只展示我們需要分析的地方。
clientConfig.shouldFetchRegistry() 這個配置默認是true,然后fetchRegistry方法中getAndStoreFullRegistry(),因為第一次都是獲取全量注冊表信息,繼續往后。

getAndStoreFullRegistry 方法中可以看到就是發送Http請求給Server端,然后等待Server端返回全量注冊表信息。

這里獲取全量請求執行的是eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())

然后再一路往下,跟蹤到 AbstractJersey2EurekaHttpClient.java中,getApplicationsInternal方法,發下發送的是GET請求,於是到Server端ApplicationsResource.java中的GET方法getContainers中查看邏輯

server端返回注冊表信息集合的多級緩存機制

上面已經看了Client端 發送抓取全量注冊表的邏輯,到了Server端查看ApplicationsResource.java中的GET方法getContainers,接着看看這部分的源碼

private final ResponseCache responseCache;

@GET
public Response getContainers(@PathParam("version") String version,
                              @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                              @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                              @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                              @Context UriInfo uriInfo,
                              @Nullable @QueryParam("regions") String regionsStr) {

    // 省略部分代碼

    Key cacheKey = new Key(Key.EntityType.Application,
            ResponseCacheImpl.ALL_APPS,
            keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
    );

    Response response;
    if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
        response = Response.ok(responseCache.getGZIP(cacheKey))
                .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                .header(HEADER_CONTENT_TYPE, returnMediaType)
                .build();
    } else {
        response = Response.ok(responseCache.get(cacheKey))
                .build();
    }
    CurrentRequestVersion.remove();
    return response;
}

這里接收到Client端的請求后,會去responseCache 中去拿去全量的數據信息。
從屬性名字就可以看出來,這個是從緩存中獲取數據。

ResponseCacheImpl.java

String get(final Key key, boolean useReadOnlyCache) {
    Value payload = getValue(key, useReadOnlyCache);
    if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
        return null;
    } else {
        return payload.getPayload();
    }
}

Value getValue(final Key key, boolean useReadOnlyCache) {
    Value payload = null;
    try {
        if (useReadOnlyCache) {
            final Value currentPayload = readOnlyCacheMap.get(key);
            if (currentPayload != null) {
                payload = currentPayload;
            } else {
                payload = readWriteCacheMap.get(key);
                readOnlyCacheMap.put(key, payload);
            }
        } else {
            payload = readWriteCacheMap.get(key);
        }
    } catch (Throwable t) {
        logger.error("Cannot get value for key : {}", key, t);
    }
    return payload;
}

這里主要關注getValue方法,這里主要有兩個map,一個是readOnlyCacheMap 另一個是readWriteCacheMap, 這里我們光看名字就可以知道一個是只讀緩存,一個是讀寫緩存,這里用了兩層的緩存結構,如果只讀緩存不為空 則直接返回,如果為空查詢可讀緩存。

關於緩存的講解 我們繼續往下看。

server端注冊表多級緩存過期機制:主動+定時+被動

繼續看緩存相關,用到了多級緩存這里可能就會存在一些疑問:

  1. 兩級緩存數據如何保存同步?
  2. 緩存數據如何過期?

帶着疑問我們來繼續看源代碼

private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
private final LoadingCache<Key, Value> readWriteCacheMap;

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    // 省略部分代碼

    long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
    this.readWriteCacheMap =
            CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
                    .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                    .removalListener(new RemovalListener<Key, Value>() {
                        @Override
                        public void onRemoval(RemovalNotification<Key, Value> notification) {
                            Key removedKey = notification.getKey();
                            if (removedKey.hasRegions()) {
                                Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                            }
                        }
                    })
                    .build(new CacheLoader<Key, Value>() {
                        @Override
                        public Value load(Key key) throws Exception {
                            if (key.hasRegions()) {
                                Key cloneWithNoRegions = key.cloneWithoutRegions();
                                regionSpecificKeys.put(cloneWithNoRegions, key);
                            }
                            Value value = generatePayload(key);
                            return value;
                        }
                    });

    // 省略部分代碼

}
  1. readOnlyCacheMap用的是ConcurrentHashMap,線程安全的。
    readWriteCacheMap用的是GuavaCache,不懂的小伙伴可以自己閱讀以下,我之前的博客也有講解這個,這個是谷歌開源的Guava項目基於內存的緩存,其內部也是實現的Map結構。

  2. 主要重點我們來看下GuavaCache,這里初始化大小是serverConfig.getInitialCapacityOfResponseCache() 默認是1000,也是Map的初始大小。
    expireAfterWrite 刷新時間是serverConfig.getResponseCacheAutoExpirationInSeconds()默認時間是180s。
    接着是build方法,這里獲取注冊表信息就是用的generatePayload方法,如果查詢readWriteCacheMap中注冊表信息為空,這會執行build方法。

繼續跟進generatePayload方法:

private Value generatePayload(Key key) {
    Stopwatch tracer = null;
    try {
        String payload;
        switch (key.getEntityType()) {
            case Application:
                boolean isRemoteRegionRequested = key.hasRegions();

                if (ALL_APPS.equals(key.getName())) {
                    if (isRemoteRegionRequested) {
                        tracer = serializeAllAppsWithRemoteRegionTimer.start();
                        payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                    } else {
                        tracer = serializeAllAppsTimer.start();
                        payload = getPayLoad(key, registry.getApplications());
                    }
                } else if (ALL_APPS_DELTA.equals(key.getName())) {
                    if (isRemoteRegionRequested) {
                        tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                        versionDeltaWithRegions.incrementAndGet();
                        versionDeltaWithRegionsLegacy.incrementAndGet();
                        payload = getPayLoad(key,
                                registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                    } else {
                        tracer = serializeDeltaAppsTimer.start();
                        versionDelta.incrementAndGet();
                        versionDeltaLegacy.incrementAndGet();
                        payload = getPayLoad(key, registry.getApplicationDeltas());
                    }
                }
                break;
        }
        return new Value(payload);
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
}

這個代碼刪減了一部分,到時增量抓取注冊表也會走這個邏輯,ALL_APPS就是全量抓取,ALL_APPS_DELTA就是增量抓取的意思,這里先插個眼,一會增量抓取注冊表的邏輯再回頭看。

上面的邏輯我們只需要關注registry.getApplicationsFromMultipleRegions 即可,這個是獲取注冊表的邏輯。接着繼續往下跟代碼:

AbstractInstanceRegistry.java

public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {

    Applications apps = new Applications();
    apps.setVersion(1L);
    for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
        Application app = null;

        if (entry.getValue() != null) {
            for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
                Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
                if (app == null) {
                    app = new Application(lease.getHolder().getAppName());
                }
                app.addInstance(decorateInstanceInfo(lease));
            }
        }
        if (app != null) {
            apps.addApplication(app);
        }
    }
    if (includeRemoteRegion) {
        for (String remoteRegion : remoteRegions) {
            RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
            if (null != remoteRegistry) {
                Applications remoteApps = remoteRegistry.getApplications();
                for (Application application : remoteApps.getRegisteredApplications()) {
                    if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                        logger.info("Application {}  fetched from the remote region {}",
                                application.getName(), remoteRegion);

                        Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
                        if (appInstanceTillNow == null) {
                            appInstanceTillNow = new Application(application.getName());
                            apps.addApplication(appInstanceTillNow);
                        }
                        for (InstanceInfo instanceInfo : application.getInstances()) {
                            appInstanceTillNow.addInstance(instanceInfo);
                        }
                    } else {
                        logger.debug("Application {} not fetched from the remote region {} as there exists a "
                                        + "whitelist and this app is not in the whitelist.",
                                application.getName(), remoteRegion);
                    }
                }
            } else {
                logger.warn("No remote registry available for the remote region {}", remoteRegion);
            }
        }
    }
    apps.setAppsHashCode(apps.getReconcileHashCode());
    return apps;
}

這里再看到 registry.entrySet()是不是會特別親切?Map<String, Map<String, Lease<InstanceInfo>> 我們上一篇講Client注冊的時候 就是將注冊信息放入到registry對應這個數據結構中的,果不其然,這里拿到所有的注冊信息,然后封裝到Applications 對象中的。

這里最后apps.setAppsHashCode()邏輯,先插個眼 后面講增量同步有類似的邏輯,后面再回頭看。接着再回頭看 返回數據后 readWriteCacheMap 的操作邏輯。

if (shouldUseReadOnlyResponseCache) {
    timer.schedule(getCacheUpdateTask(),
            new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                    + responseCacheUpdateIntervalMs),
            responseCacheUpdateIntervalMs);
}

private TimerTask getCacheUpdateTask() {
    return new TimerTask() {
        @Override
        public void run() {
            logger.debug("Updating the client cache from response cache");
            for (Key key : readOnlyCacheMap.keySet()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                            key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                }
                try {
                    CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    if (cacheValue != currentCacheValue) {
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {
                    logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                } finally {
                    CurrentRequestVersion.remove();
                }
            }
        }
    };
}

這里是起了一個調度任務,會去定時比較一級和二級緩存是否一致,如果不一致 就會用二級緩存覆蓋一級緩存。這就回答了上面的第一個問題,兩級緩存一致性的問題,默認30s執行一次。所以這里仍會有問題,可能緩存在30s內會存在不一致的情況,這里用的是最終一致的思想。

image.png

緊接着 讀寫緩存獲取到數據后再去回寫只讀緩存,這是上面ResponseCacheImpl.java 的邏輯,到了這里 全量抓取注冊表的代碼都已經看完了,這里主要的亮點是使用了兩級緩存策略來返回對應的數據。

接着整理下過期的幾個機制,也是回應上面拋出的第二個問題。

用一張圖作為總結:

05_EurekaServer多節緩存過期機制.png

  1. 主動過期
    readWriteCacheMap,讀寫緩存

    有新的服務實例發生注冊、下線、故障的時候,就會去刷新readWriteCacheMap(在Client注冊的時候,AbstractInstanceRegistry中register方法最后會有一個invalidateCache()方法)

    比如說現在有一個服務A,ServiceA,有一個新的服務實例,Instance010來注冊了,注冊完了之后,其實必須是得刷新這個緩存的,然后就會調用ResponseCache.invalidate(),將之前緩存好的ALL_APPS這個key對應的緩存,給他過期掉

    將readWriteCacheMap中的ALL_APPS緩存key,對應的緩存給過期掉

  2. 定時過期

    readWriteCacheMap在構建的時候,指定了一個自動過期的時間,默認值就是180秒,所以你往readWriteCacheMap中放入一個數據過后,自動會等180秒過后,就將這個數據給他過期了

  3. 被動過期

    readOnlyCacheMap怎么過期呢?
    默認是每隔30秒,執行一個定時調度的線程任務,TimerTask,有一個邏輯,會每隔30秒,對readOnlyCacheMap和readWriteCacheMap中的數據進行一個比對,如果兩塊數據是不一致的,那么就將readWriteCacheMap中的數據放到readOnlyCacheMap中來。

    比如說readWriteCacheMap中,ALL_APPS這個key對應的緩存沒了,那么最多30秒過后,就會同步到readOnelyCacheMap中去。

client端增量抓取注冊表邏輯

上面抓取全量注冊表的代碼已經說了,這里來講一下增量抓取,入口還是在DiscoverClient.java
中,當初始化完DiscoverClient.java 后會執行一個初始化定時任務的方法initScheduledTasks(), 其中這個里面就會每隔30s 增量抓取一次注冊表信息。

這里就不跟着這里的邏輯一步步看了,看過上面的代碼后 應該會對這里比較清晰了,這里我們直接看Server端代碼了。

還記的我們上面插過的眼,獲取全量用的是ALL_APPS 增量用的是ALL_APPS_DELTA, 所以我們這里只看增量的邏輯就行了。

else if (ALL_APPS_DELTA.equals(key.getName())) {
    if (isRemoteRegionRequested) {
        tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
        versionDeltaWithRegions.incrementAndGet();
        versionDeltaWithRegionsLegacy.incrementAndGet();
        payload = getPayLoad(key,
                registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
    } else {
        tracer = serializeDeltaAppsTimer.start();
        versionDelta.incrementAndGet();
        versionDeltaLegacy.incrementAndGet();
        payload = getPayLoad(key, registry.getApplicationDeltas());
    }
}

上面只是截取了部分代碼,這里直接看主要的邏輯registry.getApplicationDeltasFromMultipleRegions即可,這個和全量的方法名只有一個Deltas的區別。

public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
    if (null == remoteRegions) {
        remoteRegions = allKnownRemoteRegions; // null means all remote regions.
    }

    boolean includeRemoteRegion = remoteRegions.length != 0;

    if (includeRemoteRegion) {
        GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
    } else {
        GET_ALL_CACHE_MISS_DELTA.increment();
    }

    Applications apps = new Applications();
    apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
    Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
    try {
        write.lock();
        Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
        logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size());
        while (iter.hasNext()) {
            Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
            InstanceInfo instanceInfo = lease.getHolder();
            logger.debug("The instance id {} is found with status {} and actiontype {}",
                    instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
            Application app = applicationInstancesMap.get(instanceInfo.getAppName());
            if (app == null) {
                app = new Application(instanceInfo.getAppName());
                applicationInstancesMap.put(instanceInfo.getAppName(), app);
                apps.addApplication(app);
            }
            app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
        }

        if (includeRemoteRegion) {
            for (String remoteRegion : remoteRegions) {
                RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                if (null != remoteRegistry) {
                    Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
                    if (null != remoteAppsDelta) {
                        for (Application application : remoteAppsDelta.getRegisteredApplications()) {
                            if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                                Application appInstanceTillNow =
                                        apps.getRegisteredApplications(application.getName());
                                if (appInstanceTillNow == null) {
                                    appInstanceTillNow = new Application(application.getName());
                                    apps.addApplication(appInstanceTillNow);
                                }
                                for (InstanceInfo instanceInfo : application.getInstances()) {
                                    appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo));
                                }
                            }
                        }
                    }
                }
            }
        }

        Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
        apps.setAppsHashCode(allApps.getReconcileHashCode());
        return apps;
    } finally {
        write.unlock();
    }
}

這里代碼還是比較多的,我們只需要抓住重點即可:

  1. recentlyChangedQueue中獲取注冊信息,從名字可以看出來 這是最近改變的client注冊信息的隊列
  2. 使用writeLock,因為這里是獲取增量注冊信息,是從隊列中獲取,如果不加寫鎖,那么獲取的時候又有新數據加入隊列中,新數據會獲取不到的

基於上面第一點,我們來看看這個隊列怎么做的:

  1. 數據結構:ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue
  2. AbstractInstanceRegistry.java初始化的時候會啟動一個定時任務,默認30s中執行一次。如果注冊時間小於當前時間的180s,就會放到這個隊列中

AbstractInstanceRegistry.java具體代碼如下:

protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
    this.serverConfig = serverConfig;
    this.clientConfig = clientConfig;
    this.serverCodecs = serverCodecs;
    this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
    this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);

    this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);

    this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
            serverConfig.getDeltaRetentionTimerIntervalInMs(),
            serverConfig.getDeltaRetentionTimerIntervalInMs());
}

private TimerTask getDeltaRetentionTask() {
    return new TimerTask() {

        @Override
        public void run() {
            Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
            while (it.hasNext()) {
                if (it.next().getLastUpdateTime() <
                        System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                    it.remove();
                } else {
                    break;
                }
            }
        }

    };
}

這里就能看明白了,也就是說增量抓取會獲取EurekaServer端3分鍾內保存的變動的Client信息。
最后還有一個亮點,我們上面說過,無論是全量抓取還是增量抓取,最后都會返回一個全量注冊表的hash值,代碼是apps.setAppsHashCode(allApps.getReconcileHashCode());, 其中apps就是返回的Applications中的屬性,最后我們再看看這個hashCode的用法。

回到DiscoveryClient.java, 找到refreshRegistry 方法,然后一路跟蹤到getAndUpdateDelta方法,這里具體代碼我就不貼了,流程如下:

  1. 獲取delta增量數據
  2. 根據增量數據和本地注冊表數據進行合並
  3. 計算中本地注冊表信息的hashCode值
  4. 如果本地hashCode值和server端返回的hashCode值不一致則再全量獲取一次注冊表信息

最后一張圖總結增量注冊表抓取邏輯:

06_EurekaClient增量抓取注冊表流程.png

總結&感悟

這篇文章寫得有點長了,確實自己也很用心去寫了,我感覺這里多級緩存機制+增量數據Hash一致性的對比方案做的很優秀,如果要我做一個數據全量+增量同步 我也會借鑒這種方案。

看源碼 能夠學到的就是別人的設計思想。總結的部分可以看上面的一些圖,注冊表抓取的源碼學習就到這了,后面 還准備看下心跳機制、保護機制、集群等等一些的源碼。

這里讀完源碼之后會發下一個問題:

假設有服務實例注冊、下線、故障,要調用這個服務的其他服務,可能會過30秒之后才能感知倒,為什么呢?因為這里再獲取服務注冊表的時候,有一個多級緩存的機制,最多是30秒后才會去更新一級緩存。

申明

本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!

感興趣的小伙伴可關注個人公眾號:壹枝花算不算浪漫

22.jpg


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM