前言
前情回顧
上一講 我們講解了服務發現的相關邏輯,所謂服務發現 其實就是注冊表抓取,服務實例默認每隔30s去注冊中心抓取一下注冊表增量數據,然后合並本地注冊表數據,最后有個hash對比的操作。
本講目錄
今天主要是看下服務續約的邏輯,服務續約就是client端給server端發送心跳檢測,告訴對方我還活着。現在很多分布式系統都會有心跳檢查的機制,這里一起來學習下Eureka是怎么做心跳檢查的。
目錄如下:
- client端心跳檢查調度任務
- server端接收心跳檢查,設置最后renew時間
這一講內容不太多,因為上一篇文章寫全量和增量注冊表信息內容有點多,所以這里將博客盡量一篇保持一個知識點,后面還會講服務實例下線、摘除、注冊中心自我保護等機制的實現原理。
說明
原創不易,如若轉載 請標明來源:一枝花算不算浪漫
源碼分析
client端心跳檢查調度任務
服務實例續約代碼比較簡單,這里還是從DiscovertClient.java
開始,很多源碼的入口都是在這里,因為client端初始化、注冊 都是走的這里,因為前幾篇文章對這個類已經分析很多了,這里只截取部分重要代碼:
DiscovertClient.java
初始化后 會繼續初始化一些調度任務:
private void initScheduledTasks() {
if (clientConfig.shouldRegisterWithEureka()) {
// 默認也是30s
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
// 執行heartbeatExecutor心跳檢查,默認是30s
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// 執行線程
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
String urlPath = "apps/" + appName + '/' + id;
Response response = null;
try {
WebTarget webResource = jerseyClient.target(serviceUrl)
.path(urlPath)
.queryParam("status", info.getStatus().toString())
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus != null) {
webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
}
Builder requestBuilder = webResource.request();
addExtraProperties(requestBuilder);
addExtraHeaders(requestBuilder);
requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE);
response = requestBuilder.put(Entity.entity("{}", MediaType.APPLICATION_JSON_TYPE)); // Jersey2 refuses to handle PUT with no body
EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
if (response.hasEntity()) {
eurekaResponseBuilder.entity(response.readEntity(InstanceInfo.class));
}
return eurekaResponseBuilder.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey2 HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
這里的流程很簡單,初始化DiscoveryClient
后會新建一個調度任務,然后執行HeartbeatThread
中的run方法,默認是renewalIntervalInSecs
30s執行一次。
具體就是給Server端發送一個http請求,類似於:http://localhost:8080/v2/apps/ServiceA/i-000000-1
, 走的是put請求。
最后拿到響應結果,續約成功后會更新lastSuccessfulHeartbeatTimestamp
最近成功心跳檢測的時間戳。
server端接收心跳檢查請求
前幾篇文章已經說過,Server端接收http請求的入口在eureka-core
模塊下的 resource
包里面,這里直接找到ApplicationResource.java
中的getInstanceInfo
方法,這里直接請求的InstanceResource
類的構造方法,找到這個方法中的@PUT
請求。可以直接看下代碼:
InstanceResource.renewLease
+AbstractInstanceRegistry.renew
方法:
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// 省略部分代碼
logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
return response;
}
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
Object[] args = {
instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId()
};
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", args);
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
leaseToRenew.renew();
return true;
}
}
這里主要看renew
方法, 這里看到registry
是一個注冊表,通過appName獲取對應的服務注冊表信息。
這里主要還是看leaseToRenew.renew()
其實很簡單,就是設置當前示例注冊表的renew屬性的lastUpdateTimestamp
為最新時間+duration。
至於這里的duration 我們下一講會詳細講解,duration 和服務實例摘除有關。
總結
(1)DiscoveryClient初始化的時候,會去調度一堆定時任務,其中有一個就是HeartbeatThread,心跳線程
(2)在這里可以看到,默認是每隔30秒去發送一次心跳,每隔30秒執行一次HeartbeatTHread線程的邏輯,發送心跳
(3)這邊的話就是去發送這個心跳,走的是EurekaHttpClient的sendHeartbeat()方法,http://localhost:8080/v2/apps/ServiceA/i-000000-1,走的是put請求
(4)負責承接服務實例的心跳相關的這些操作的,是ApplicationsResource,服務相關的controller。找到ApplicationResource,再次找到InstanceResource,通過PUT請求,可以找到renewLease方法。
(5)通過注冊表的renew()方法,進去完成服務續約,實際進入AbstractInstanceRegistry的renew()方法
(6)從注冊表的map中,根據服務名和實例id,獲取一個Lease
申明
本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小伙伴可關注個人公眾號:壹枝花算不算浪漫