Spring Cloud Eureka源碼分析之三級緩存的設計原理及源碼分析


Eureka源碼

Eureka Server 為了提供響應效率,提供了兩層的緩存結構,將 Eureka Client 所需要的注冊信息,直接存儲在緩存結構中,實現原理如下圖所示。

image-20211121172504080

第一層緩存:readOnlyCacheMap,本質上是 ConcurrentHashMap,依賴定時從 readWriteCacheMap 同步數據,默認時間為 30 秒。

readOnlyCacheMap : 是一個 CurrentHashMap 只讀緩存,這個主要是為了供客戶端獲取注冊信息時使用,其緩存更新,依賴於定時器的更新,通過和 readWriteCacheMap 的值做對比,如果數據不一致,則以 readWriteCacheMap 的數據為准。

第二層緩存:readWriteCacheMap,本質上是 Guava 緩存。

readWriteCacheMap:readWriteCacheMap 的數據主要同步於存儲層。當獲取緩存時判斷緩存中是否沒有數據,如果不存在此數據,則通過 CacheLoader 的 load 方法去加載,加載成功之后將數據放入緩存,同時返回數據。

readWriteCacheMap 緩存過期時間,默認為 180 秒,當服務下線、過期、注冊、狀態變更,都會來清除此緩存中的數據。

Eureka Client 獲取全量或者增量的數據時,會先從一級緩存中獲取;如果一級緩存中不存在,再從二級緩存中獲取;如果二級緩存也不存在,這時候先將存儲層的數據同步到緩存中,再從緩存中獲取。

通過 Eureka Server 的二層緩存機制,可以非常有效地提升 Eureka Server 的響應時間,通過數據存儲層和緩存層的數據切割,根據使用場景來提供不同的數據支持。

多級緩存的意義

這里為什么要設計多級緩存呢?原因很簡單,就是當存在大規模的服務注冊和更新時,如果只是修改一個ConcurrentHashMap數據,那么勢必因為鎖的存在導致競爭,影響性能。

而Eureka又是AP模型,只需要滿足最終可用就行。所以它在這里用到多級緩存來實現讀寫分離。注冊方法寫的時候直接寫內存注冊表,寫完表之后主動失效讀寫緩存。

獲取注冊信息接口先從只讀緩存取,只讀緩存沒有再去讀寫緩存取,讀寫緩存沒有再去內存注冊表里取(不只是取,此處較復雜)。並且,讀寫緩存會更新回寫只讀緩存

  • responseCacheUpdateIntervalMs : readOnlyCacheMap 緩存更新的定時器時間間隔,默認為30秒
  • responseCacheAutoExpirationInSeconds : readWriteCacheMap 緩存過期時間,默認為 180 秒 。

緩存初始化

readWriteCacheMap使用的是LoadingCache對象,它是guava中提供的用來實現內存緩存的一個api。創建方式如下

LoadingCache<Long, String> cache = CacheBuilder.newBuilder()
    //緩存池大小,在緩存項接近該大小時, Guava開始回收舊的緩存項
    .maximumSize(10000)
    //設置時間對象沒有被讀/寫訪問則對象從內存中刪除(在另外的線程里面不定期維護)
    .expireAfterAccess(10, TimeUnit.MINUTES)
    //移除監聽器,緩存項被移除時會觸發
    .removalListener(new RemovalListener <Long, String>() {
        @Override
        public void onRemoval(RemovalNotification<Long, String> rn) {
            //執行邏輯操作
        }
    })
    .recordStats()//開啟Guava Cache的統計功能
    .build(new CacheLoader<String, Object>() {
        @Override
        public Object load(String key) {
            //從 SQL或者NoSql 獲取對象
        }
    });//CacheLoader類 實現自動加載

其中,CacheLoader是用來實現緩存自動加載的功能,當觸發readWriteCacheMap.get(key)方法時,就會回調CacheLoader.load方法,根據key去服務注冊信息中去查找實例數據進行緩存

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    this.serverConfig = serverConfig;
    this.serverCodecs = serverCodecs;
    this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
    this.registry = registry;

    long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
    this.readWriteCacheMap =
        CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
        .removalListener(new RemovalListener<Key, Value>() {
            @Override
            public void onRemoval(RemovalNotification<Key, Value> notification) {
                Key removedKey = notification.getKey();
                if (removedKey.hasRegions()) {
                    Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                    regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                }
            }
        })
        .build(new CacheLoader<Key, Value>() {
            @Override
            public Value load(Key key) throws Exception {
                if (key.hasRegions()) {
                    Key cloneWithNoRegions = key.cloneWithoutRegions();
                    regionSpecificKeys.put(cloneWithNoRegions, key);
                }
                Value value = generatePayload(key);  //注意這里
                return value;
            }
        });

而緩存的加載,是基於generatePayload方法完成的,代碼如下。

private Value generatePayload(Key key) {
    Stopwatch tracer = null;
    try {
        String payload;
        switch (key.getEntityType()) {
            case Application:
                boolean isRemoteRegionRequested = key.hasRegions();

                if (ALL_APPS.equals(key.getName())) {
                    if (isRemoteRegionRequested) {
                        tracer = serializeAllAppsWithRemoteRegionTimer.start();
                        payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                    } else {
                        tracer = serializeAllAppsTimer.start();
                        payload = getPayLoad(key, registry.getApplications());
                    }
                } else if (ALL_APPS_DELTA.equals(key.getName())) {
                    if (isRemoteRegionRequested) {
                        tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                        versionDeltaWithRegions.incrementAndGet();
                        versionDeltaWithRegionsLegacy.incrementAndGet();
                        payload = getPayLoad(key,
                                             registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                    } else {
                        tracer = serializeDeltaAppsTimer.start();
                        versionDelta.incrementAndGet();
                        versionDeltaLegacy.incrementAndGet();
                        payload = getPayLoad(key, registry.getApplicationDeltas());
                    }
                } else {
                    tracer = serializeOneApptimer.start();
                    payload = getPayLoad(key, registry.getApplication(key.getName()));
                }
                break;
            case VIP:
            case SVIP:
                tracer = serializeViptimer.start();
                payload = getPayLoad(key, getApplicationsForVip(key, registry));
                break;
            default:
                logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
                payload = "";
                break;
        }
        return new Value(payload);
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
}

此方法接受一個 Key 類型的參數,返回一個 Value 類型。 其中 Key 中重要的字段有:

  • KeyType ,表示payload文本格式,有 JSONXML 兩種值。
  • EntityType ,表示緩存的類型,有 Application , VIP , SVIP 三種值。
  • entityName ,表示緩存的名稱,可能是單個應用名,也可能是 ALL_APPSALL_APPS_DELTA

Value 則有一個 String 類型的payload和一個 byte 數組,表示gzip壓縮后的字節。

緩存同步

ResponseCacheImpl這個類的構造實現中,初始化了一個定時任務,這個定時任務每個

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    //省略...
    if (shouldUseReadOnlyResponseCache) {
        timer.schedule(getCacheUpdateTask(),
                       new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                                + responseCacheUpdateIntervalMs),
                       responseCacheUpdateIntervalMs);
    }
}

默認每30s從readWriteCacheMap更新有差異的數據同步到readOnlyCacheMap中

private TimerTask getCacheUpdateTask() {
    return new TimerTask() {
        @Override
        public void run() {
            logger.debug("Updating the client cache from response cache");
            for (Key key : readOnlyCacheMap.keySet()) { //遍歷只讀集合
                if (logger.isDebugEnabled()) {
                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                                 key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                }
                try {
                    CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    if (cacheValue != currentCacheValue) { //判斷差異信息,如果有差異,則更新
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {
                    logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                } finally {
                    CurrentRequestVersion.remove();
                }
            }
        }
    };
}

緩存失效

在AbstractInstanceRegistry.register這個方法中,當完成服務信息保存后,會調用invalidateCache失效緩存

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    //....
     invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
    //....
}

最終調用ResponseCacheImpl.invalidate方法,完成緩存的失效機制

public void invalidate(Key... keys) {
    for (Key key : keys) {
        logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
                     key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());

        readWriteCacheMap.invalidate(key);
        Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
        if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
            for (Key keysWithRegion : keysWithRegions) {
                logger.debug("Invalidating the response cache key : {} {} {} {} {}",
                             key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
                readWriteCacheMap.invalidate(keysWithRegion);
            }
        }
    }
}

版權聲明:本博客所有文章除特別聲明外,均采用 CC BY-NC-SA 4.0 許可協議。轉載請注明來自 Mic帶你學架構
如果本篇文章對您有幫助,還請幫忙點個關注和贊,您的堅持是我不斷創作的動力。歡迎關注「跟着Mic學架構」公眾號公眾號獲取更多技術干貨!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM