eureka 心跳機制 源碼解析


今天看了一篇文件,介紹了一個eureka心跳中的一個續租間隔時間的問題,看了文章中的一些內容想了解一下eureka心跳機制的原理以及源碼

 

首先我從DiscoveryClient這個類開始,講這個類之前,我們先來了解一下幾個概念

Register:服務注冊
當Eureka客戶端向Eureka Server注冊時,它提供自身的元數據,比如IP地址、端口,運行狀況指示符URL,主頁等。

Renew:服務續約
Eureka客戶會每隔30秒發送一次心跳來續約。 通過續約來告知Eureka Server該Eureka客戶仍然存在,沒有出現問題。 正常情況下,如果Eureka Server在90秒沒有收到Eureka客戶的續約,它會將實例從其注冊表中刪除。 建議不要更改續約間隔。

Fetch Registries:獲取注冊列表信息
Eureka客戶端從服務器獲取注冊表信息,並將其緩存在本地。客戶端會使用該信息查找其他服務,從而進行遠程調用。該注冊列表信息定期(每30秒鍾)更新一次。每次返回注冊列表信息可能與Eureka客戶端的緩存信息不同, Eureka客戶端自動處理。如果由於某種原因導致注冊列表信息不能及時匹配,Eureka客戶端則會重新獲取整個注冊表信息。 Eureka服務器緩存注冊列表信息,整個注冊表以及每個應用程序的信息進行了壓縮,壓縮內容和沒有壓縮的內容完全相同。Eureka客戶端和Eureka 服務器可以使用JSON / XML格式進行通訊。在默認的情況下Eureka客戶端使用壓縮JSON格式來獲取注冊列表的信息。

Cancel:服務下線
Eureka客戶端在程序關閉時向Eureka服務器發送取消請求。 發送請求后,該客戶端實例信息將從服務器的實例注冊表中刪除。該下線請求不會自動完成,它需要調用以下內容:
DiscoveryManager.getInstance().shutdownComponent();

Eviction 服務剔除
在默認的情況下,當Eureka客戶端連續90秒沒有向Eureka服務器發送服務續約,即心跳,Eureka服務器會將該服務實例從服務注冊列表刪除,即服務剔除。

再我們來看一下在DiscoveryClient類,這個類對服務注冊,服務續約,服務列表獲取等相關操作都有相應的方法,可以說是一個比較重要的類。下面我開始給大家講一下這幾個比較重要的方法

 

第一個:自身服務注冊方法

boolean register() throws Throwable {
logger.info("DiscoveryClient_" + this.appPathIdentifier + ": registering service...");

EurekaHttpResponse httpResponse;
try {
httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo);
} catch (Exception var3) {
logger.warn("{} - registration failed {}", new Object[]{"DiscoveryClient_" + this.appPathIdentifier, var3.getMessage(), var3});
throw var3;
}

if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", "DiscoveryClient_" + this.appPathIdentifier, httpResponse.getStatusCode());
}

return httpResponse.getStatusCode() == 204;
}
AbstractJerseyEurekaHttpClient:
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;

EurekaHttpResponse var5;
try {
Builder resourceBuilder = this.jerseyClient.resource(this.serviceUrl).path(urlPath).getRequestBuilder();
this.addExtraHeaders(resourceBuilder);
response = (ClientResponse)((Builder)((Builder)((Builder)resourceBuilder.header("Accept-Encoding", "gzip")).type(MediaType.APPLICATION_JSON_TYPE)).accept(new String[]{"application/json"})).post(ClientResponse.class, info);
var5 = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", new Object[]{this.serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()});
}

if (response != null) {
response.close();
}

}

return var5;
}
 

InstanceInfo在我的理解是注冊到注冊中心的元素信息,包括ip,port等,AbstractJerseyEurekaHttpClient方法中的register方法中其實大致的可以理解成將自身的一些信息通過http請求注冊到注冊中心,上面的請求的地址也就是自己在配置文件配置的eureka-server地址
 

第二:服務列表獲取方法(獲取服務列表分為兩種:全量獲取和增量獲取)

下面這個是全量獲取

首先我們來看一下這個方法的源碼

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = this.FETCH_REGISTRY_TIMER.start();

label122: {
boolean var4;
try {
Applications applications = this.getApplications();
// 下面是判斷是否將注冊服務列表的二級緩存禁用或者並且是第一次獲取服務列表的時候
// 如果是就直接拿獲取調用更新接口
//https://www.cnblogs.com/fangfuhai/p/7070325.html 這個地址是一些eurek的配置詳情
if (!this.clientConfig.shouldDisableDelta() && Strings.isNullOrEmpty(this.clientConfig.getRegistryRefreshSingleVipAddress()) && !forceFullRegistryFetch && applications != null && applications.getRegisteredApplications().size() != 0 && applications.getVersion().longValue() != -1L) {
this.getAndUpdateDelta(applications);
} else {
// 否則就進入第一次獲取服務列表的方法
logger.info("Disable delta property : {}", this.clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", this.clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", applications == null);
logger.info("Registered Applications size is zero : {}", applications.getRegisteredApplications().size() == 0);
logger.info("Application version is -1: {}", applications.getVersion().longValue() == -1L);
//下面這個方法就是直接發http請求到eureka server去獲取服務列表
this.getAndStoreFullRegistry();
}
//設置AppsHashCode(值是applications計算出來的hash值),客戶端讀取到之后更新好自己的Apps
// 緩存之后進行的增量獲取會對比這個AppsHashCode,如果不一樣,就會進行一次全量Apps信息請求
applications.setAppsHashCode(applications.getReconcileHashCode());
this.logTotalInstances();
break label122;
} catch (Throwable var8) {
logger.error("DiscoveryClient_" + this.appPathIdentifier + " - was unable to refresh its cache! status = " + var8.getMessage(), var8);
var4 = false;
} finally {
if (tracer != null) {
tracer.stop();
}

}

return var4;
}

this.onCacheRefreshed();
this.updateInstanceRemoteStatus();
return true;
}
接下來就看一下getAndStoreFullRegistry這個方法,主要就是發請求到eureka-server去獲取注冊的服務列表

private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = this.fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
// 發請求到eureka-server獲取到服務列表applications
EurekaHttpResponse<Applications> httpResponse = this.clientConfig.getRegistryRefreshSingleVipAddress() == null ? this.eurekaTransport.queryClient.getApplications((String[])this.remoteRegionsRef.get()) : this.eurekaTransport.queryClient.getVip(this.clientConfig.getRegistryRefreshSingleVipAddress(), (String[])this.remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = (Applications)httpResponse.getEntity();
}

logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (this.fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1L)) {
this.localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}

}
第三:續約方法 Renew


private void initScheduledTasks() {
int renewalIntervalInSecs;
int expBackOffBound;
//此客戶端是否獲取eureka服務器注冊表上的注冊信息,默認為true
if (this.clientConfig.shouldFetchRegistry()) {
renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();
expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//這是心跳機制的主要代碼,主要是將TimedSupervisorTask放到定時任務的線程池,定時的運行heatbeat線程去發送請求
this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
}
// 實例是否在eureka服務器上注冊自己的信息以供其他服務發現,默認為true
if (this.clientConfig.shouldRegisterWithEureka()) {
renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: renew interval is: " + renewalIntervalInSecs);
this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread(null)), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);
this.statusChangeListener = new StatusChangeListener() {
public String getId() {
return "statusChangeListener";
}

public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) {
DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);
} else {
DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent);
}

DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();
}
};
if (this.clientConfig.shouldOnDemandUpdateStatusChange()) {
this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener);
}

this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}

}
到此,我這邊大概了講了一下一些服務注冊以及獲取服務的流程,想繼續在深入的,自己可以再看深究
來自:https://blog.csdn.net/qq_18416057/article/details/82463783 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM