前言
前情回顧
上一講 我們通過單元測試 來梳理了EurekaClient是如何注冊到server端,以及server端接收到請求是如何處理的,這里最重要的關注點是注冊表的一個數據結構:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>()
本講目錄
回頭看了下之前的博客,沒有一個總目錄說明,每篇都是直接源碼分析了。從此篇文章開始都會加上目錄,以及文章最后會加上總結及讀此篇源碼的感受。希望這個博客系列的文章會越來越好。
目錄如下:
- client端第一次注冊全量抓取注冊表的邏輯
- server端返回注冊表信息集合的多級緩存機制
- server端注冊表多級緩存過期機制:主動+定時+被動
- client端增量抓取注冊表邏輯
技術亮點:
- 注冊表抓取的多級緩存機制
- 增量抓取返回的全量數據hashCode,和本地數據hashCode對比,保證數據一致性
這里再啰嗦一點,之前一直吐槽EurekaClient注冊的邏輯,今天看了EurekaClient注冊表抓取的邏輯后,不由的感嘆設計的精妙之處,這里說的精妙是指EurekaServer端對於注冊表讀取邏輯的設計,緩存邏輯以及增量獲取時Hash一致性的判斷,真的很妙,感覺又學到了不少東西。讀完這段代碼 一大早就很興奮,哈哈哈,一起看看吧。
說明
原創不易,如若轉載 請標明來源:一枝花算不算浪漫
EurekaClient全量抓取注冊表邏輯
一直在想着怎么才能把自己看完代碼后的理解用文字表達出來,這里采用一種新模式吧,先畫圖,然后源碼,然后解讀。
圖片看起來很簡單,Client發送Http請求給Server端,Server端返回全量的注冊表信息給Client端。接下來就是跟進代碼一步步分析,這里先有個大概印象
源碼解析
- Client端發送獲取全量注冊表請求
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
// 省略很多無關代碼
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
}
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// 刪減掉一些代碼
// registry was fetched successfully, so return true
return true;
}
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
這里就不再贅述Client端是如何一步步跟進到發請求的代碼的,因為之前通過單元測試代碼已經搞清楚了Server端接受請求的類是ApplicationsResource.java
, Client端主要核心的代碼也在 DiscoveryClient.java
中。
代碼還是之前看了好多遍的祖傳代碼,只是省略了很多內容,只展示我們需要分析的地方。
clientConfig.shouldFetchRegistry()
這個配置默認是true,然后fetchRegistry
方法中getAndStoreFullRegistry()
,因為第一次都是獲取全量注冊表信息,繼續往后。
getAndStoreFullRegistry
方法中可以看到就是發送Http請求給Server端,然后等待Server端返回全量注冊表信息。
這里獲取全量請求執行的是eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
然后再一路往下,跟蹤到 AbstractJersey2EurekaHttpClient.java
中,getApplicationsInternal
方法,發下發送的是GET
請求,於是到Server端ApplicationsResource.java
中的GET
方法getContainers
中查看邏輯
server端返回注冊表信息集合的多級緩存機制
上面已經看了Client端 發送抓取全量注冊表的邏輯,到了Server端查看ApplicationsResource.java
中的GET
方法getContainers
,接着看看這部分的源碼
private final ResponseCache responseCache;
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
// 省略部分代碼
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
CurrentRequestVersion.remove();
return response;
}
這里接收到Client端的請求后,會去responseCache
中去拿去全量的數據信息。
從屬性名字就可以看出來,這個是從緩存中獲取數據。
ResponseCacheImpl.java
String get(final Key key, boolean useReadOnlyCache) {
Value payload = getValue(key, useReadOnlyCache);
if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
return null;
} else {
return payload.getPayload();
}
}
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
這里主要關注getValue
方法,這里主要有兩個map,一個是readOnlyCacheMap
另一個是readWriteCacheMap
, 這里我們光看名字就可以知道一個是只讀緩存,一個是讀寫緩存,這里用了兩層的緩存結構,如果只讀緩存不為空 則直接返回,如果為空查詢可讀緩存。
關於緩存的講解 我們繼續往下看。
server端注冊表多級緩存過期機制:主動+定時+被動
繼續看緩存相關,用到了多級緩存這里可能就會存在一些疑問:
- 兩級緩存數據如何保存同步?
- 緩存數據如何過期?
帶着疑問我們來繼續看源代碼
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
private final LoadingCache<Key, Value> readWriteCacheMap;
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
// 省略部分代碼
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
// 省略部分代碼
}
-
readOnlyCacheMap
用的是ConcurrentHashMap,線程安全的。
readWriteCacheMap
用的是GuavaCache,不懂的小伙伴可以自己閱讀以下,我之前的博客也有講解這個,這個是谷歌開源的Guava項目基於內存的緩存,其內部也是實現的Map結構。 -
主要重點我們來看下GuavaCache,這里初始化大小是
serverConfig.getInitialCapacityOfResponseCache()
默認是1000,也是Map的初始大小。
expireAfterWrite
刷新時間是serverConfig.getResponseCacheAutoExpirationInSeconds()
默認時間是180s。
接着是build方法,這里獲取注冊表信息就是用的generatePayload
方法,如果查詢readWriteCacheMap中注冊表信息為空,這會執行build方法。
繼續跟進generatePayload
方法:
private Value generatePayload(Key key) {
Stopwatch tracer = null;
try {
String payload;
switch (key.getEntityType()) {
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeAllAppsWithRemoteRegionTimer.start();
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeAllAppsTimer.start();
payload = getPayLoad(key, registry.getApplications());
}
} else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = getPayLoad(key, registry.getApplicationDeltas());
}
}
break;
}
return new Value(payload);
} finally {
if (tracer != null) {
tracer.stop();
}
}
}
這個代碼刪減了一部分,到時增量抓取注冊表也會走這個邏輯,ALL_APPS
就是全量抓取,ALL_APPS_DELTA
就是增量抓取的意思,這里先插個眼,一會增量抓取注冊表的邏輯再回頭看。
上面的邏輯我們只需要關注registry.getApplicationsFromMultipleRegions
即可,這個是獲取注冊表的邏輯。接着繼續往下跟代碼:
AbstractInstanceRegistry.java
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
Applications apps = new Applications();
apps.setVersion(1L);
for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
Application app = null;
if (entry.getValue() != null) {
for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
if (app == null) {
app = new Application(lease.getHolder().getAppName());
}
app.addInstance(decorateInstanceInfo(lease));
}
}
if (app != null) {
apps.addApplication(app);
}
}
if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteApps = remoteRegistry.getApplications();
for (Application application : remoteApps.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
logger.info("Application {} fetched from the remote region {}",
application.getName(), remoteRegion);
Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(instanceInfo);
}
} else {
logger.debug("Application {} not fetched from the remote region {} as there exists a "
+ "whitelist and this app is not in the whitelist.",
application.getName(), remoteRegion);
}
}
} else {
logger.warn("No remote registry available for the remote region {}", remoteRegion);
}
}
}
apps.setAppsHashCode(apps.getReconcileHashCode());
return apps;
}
這里再看到 registry.entrySet()
是不是會特別親切?Map<String, Map<String, Lease<InstanceInfo>>
我們上一篇講Client注冊的時候 就是將注冊信息放入到registry對應這個數據結構中的,果不其然,這里拿到所有的注冊信息,然后封裝到Applications
對象中的。
這里最后apps.setAppsHashCode()
邏輯,先插個眼 后面講增量同步有類似的邏輯,后面再回頭看。接着再回頭看 返回數據后 readWriteCacheMap
的操作邏輯。
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
} finally {
CurrentRequestVersion.remove();
}
}
}
};
}
這里是起了一個調度任務,會去定時比較一級和二級緩存是否一致,如果不一致 就會用二級緩存覆蓋一級緩存。這就回答了上面的第一個問題,兩級緩存一致性的問題,默認30s執行一次。所以這里仍會有問題,可能緩存在30s內會存在不一致的情況,這里用的是最終一致的思想。
緊接着 讀寫緩存獲取到數據后再去回寫只讀緩存,這是上面ResponseCacheImpl.java
的邏輯,到了這里 全量抓取注冊表的代碼都已經看完了,這里主要的亮點是使用了兩級緩存策略來返回對應的數據。
接着整理下過期的幾個機制,也是回應上面拋出的第二個問題。
用一張圖作為總結:
-
主動過期
readWriteCacheMap,讀寫緩存有新的服務實例發生注冊、下線、故障的時候,就會去刷新readWriteCacheMap(在Client注冊的時候,AbstractInstanceRegistry中register方法最后會有一個invalidateCache()方法)
比如說現在有一個服務A,ServiceA,有一個新的服務實例,Instance010來注冊了,注冊完了之后,其實必須是得刷新這個緩存的,然后就會調用ResponseCache.invalidate(),將之前緩存好的ALL_APPS這個key對應的緩存,給他過期掉
將readWriteCacheMap中的ALL_APPS緩存key,對應的緩存給過期掉
-
定時過期
readWriteCacheMap在構建的時候,指定了一個自動過期的時間,默認值就是180秒,所以你往readWriteCacheMap中放入一個數據過后,自動會等180秒過后,就將這個數據給他過期了
-
被動過期
readOnlyCacheMap怎么過期呢?
默認是每隔30秒,執行一個定時調度的線程任務,TimerTask,有一個邏輯,會每隔30秒,對readOnlyCacheMap和readWriteCacheMap中的數據進行一個比對,如果兩塊數據是不一致的,那么就將readWriteCacheMap中的數據放到readOnlyCacheMap中來。比如說readWriteCacheMap中,ALL_APPS這個key對應的緩存沒了,那么最多30秒過后,就會同步到readOnelyCacheMap中去。
client端增量抓取注冊表邏輯
上面抓取全量注冊表的代碼已經說了,這里來講一下增量抓取,入口還是在DiscoverClient.java
中,當初始化完DiscoverClient.java
后會執行一個初始化定時任務的方法initScheduledTasks()
, 其中這個里面就會每隔30s 增量抓取一次注冊表信息。
這里就不跟着這里的邏輯一步步看了,看過上面的代碼后 應該會對這里比較清晰了,這里我們直接看Server端代碼了。
還記的我們上面插過的眼,獲取全量用的是ALL_APPS
增量用的是ALL_APPS_DELTA
, 所以我們這里只看增量的邏輯就行了。
else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = getPayLoad(key, registry.getApplicationDeltas());
}
}
上面只是截取了部分代碼,這里直接看主要的邏輯registry.getApplicationDeltasFromMultipleRegions
即可,這個和全量的方法名只有一個Deltas的區別。
public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
if (null == remoteRegions) {
remoteRegions = allKnownRemoteRegions; // null means all remote regions.
}
boolean includeRemoteRegion = remoteRegions.length != 0;
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
} else {
GET_ALL_CACHE_MISS_DELTA.increment();
}
Applications apps = new Applications();
apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
try {
write.lock();
Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size());
while (iter.hasNext()) {
Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
InstanceInfo instanceInfo = lease.getHolder();
logger.debug("The instance id {} is found with status {} and actiontype {}",
instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
Application app = applicationInstancesMap.get(instanceInfo.getAppName());
if (app == null) {
app = new Application(instanceInfo.getAppName());
applicationInstancesMap.put(instanceInfo.getAppName(), app);
apps.addApplication(app);
}
app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
}
if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
if (null != remoteAppsDelta) {
for (Application application : remoteAppsDelta.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
Application appInstanceTillNow =
apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo));
}
}
}
}
}
}
}
Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
apps.setAppsHashCode(allApps.getReconcileHashCode());
return apps;
} finally {
write.unlock();
}
}
這里代碼還是比較多的,我們只需要抓住重點即可:
- 從
recentlyChangedQueue
中獲取注冊信息,從名字可以看出來 這是最近改變的client注冊信息的隊列 - 使用writeLock,因為這里是獲取增量注冊信息,是從隊列中獲取,如果不加寫鎖,那么獲取的時候又有新數據加入隊列中,新數據會獲取不到的
基於上面第一點,我們來看看這個隊列怎么做的:
- 數據結構:
ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue
AbstractInstanceRegistry.java
初始化的時候會啟動一個定時任務,默認30s中執行一次。如果注冊時間小於當前時間的180s,就會放到這個隊列中
AbstractInstanceRegistry.java
具體代碼如下:
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
this.serverConfig = serverConfig;
this.clientConfig = clientConfig;
this.serverCodecs = serverCodecs;
this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);
this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
serverConfig.getDeltaRetentionTimerIntervalInMs(),
serverConfig.getDeltaRetentionTimerIntervalInMs());
}
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
@Override
public void run() {
Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
if (it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break;
}
}
}
};
}
這里就能看明白了,也就是說增量抓取會獲取EurekaServer端3分鍾內保存的變動的Client信息。
最后還有一個亮點,我們上面說過,無論是全量抓取還是增量抓取,最后都會返回一個全量注冊表的hash值,代碼是apps.setAppsHashCode(allApps.getReconcileHashCode());
, 其中apps就是返回的Applications
中的屬性,最后我們再看看這個hashCode的用法。
回到DiscoveryClient.java
, 找到refreshRegistry
方法,然后一路跟蹤到getAndUpdateDelta
方法,這里具體代碼我就不貼了,流程如下:
- 獲取delta增量數據
- 根據增量數據和本地注冊表數據進行合並
- 計算中本地注冊表信息的hashCode值
- 如果本地hashCode值和server端返回的hashCode值不一致則再全量獲取一次注冊表信息
最后一張圖總結增量注冊表抓取邏輯:
總結&感悟
這篇文章寫得有點長了,確實自己也很用心去寫了,我感覺這里多級緩存機制+增量數據Hash一致性的對比方案做的很優秀,如果要我做一個數據全量+增量同步 我也會借鑒這種方案。
看源碼 能夠學到的就是別人的設計思想。總結的部分可以看上面的一些圖,注冊表抓取的源碼學習就到這了,后面 還准備看下心跳機制、保護機制、集群等等一些的源碼。
這里讀完源碼之后會發下一個問題:
假設有服務實例注冊、下線、故障,要調用這個服務的其他服務,可能會過30秒之后才能感知倒,為什么呢?因為這里再獲取服務注冊表的時候,有一個多級緩存的機制,最多是30秒后才會去更新一級緩存。
申明
本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小伙伴可關注個人公眾號:壹枝花算不算浪漫