前言
上一講eureka client是如何注冊的,一直跟到源碼發送http請求為止,當時看eureka client注冊時如此費盡,光是找一個regiter的地方就找了半天,那么client端發送了http請求給server端,server端是如何處理的呢?
帶着這么一個疑問 就開始今天源碼的解讀了。
如若轉載 請標明來源:一枝花算不算浪漫
源碼解讀
從何讀起?
上一講我們知道,跟進client注冊 一直到 AbstractJersey2EurekaHttpClient.register
方法,這里先看下其中的源碼:
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
Response response = null;
try {
// 發送請求,類似於:http://localhost:8080/v2/apps/ServiceA
// 發送的是post請求,服務實例的對象被打成了一個json發送,包括自己的主機、ip、端口號
// eureka server 就知道了這個ServiceA這個服務,有一個服務實例,比如是在192.168.31.109、host-01、8761端口
Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
addExtraProperties(resourceBuilder);
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.accept(MediaType.APPLICATION_JSON)
.acceptEncoding("gzip")
.post(Entity.json(info));
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
那這種情況我們肯定可以猜測,server端應該有個controller來接收此http請求,然后默默的去做一些注冊的邏輯。
緊接着我們從/apps/
這個關鍵詞入手,進行全局搜索:
全局搜索結果如下,這里可以看到很多test 調用,這里框起來的一個是不是類似於我們controller接口的調用呢?直接點進去查看,然后一步步跟進。
源碼分析
接着上面說的,跟進ApplicationResource
這個類,可以找到如下方法:
@Path("{appId}")
public ApplicationResource getApplicationResource(
@PathParam("version") String version,
@PathParam("appId") String appId) {
CurrentRequestVersion.set(Version.toEnum(version));
return new ApplicationResource(appId, serverConfig, registry);
}
這個appId可以理解為我們之前傳遞的appName,緊接着這里是直接構造了一個ApplicationResource
實例,接着跟進代碼,進入ApplicationResource
中我們可以看到很多@GET
、@POST
等restful接口,還記得上面我們register方法中,發送的http請求用的就是POST方法,所以我們這里直接看@POST
請求
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
由於代碼不是很長,這里都給截取出來了。其實這里做的事情就很簡單了。
- 做一些常規的chek,檢查注冊實例
InstanceInfo
的一些基本信息 - DataCenter的相關操作,這里還涉及到亞馬遜雲,我們直接跳過
registry.register(info, "true".equals(isReplication));
這里才是核心的注冊,我們繼續往下
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
到了這里東西就有點多了,我們慢慢梳理。
- reda.lock() 這里使用的是讀鎖,方便多個服務實例同時來注冊
- 這里關鍵信息是registry的數據結構,同時這也是保存注冊實例的對象。
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
ConcurrentHashMap的key是appName
第二層Map的key是appId,所以數據結構格式類似於:
{
“ServiceA”: {
“001”: Lease<InstanceInfo>,
“002”: Lease<InstanceInfo>,
“003”: Lease<InstanceInfo>
},
“ServiceB”: {
“001”: Lease<InstanceInfo>
}
}
- 這里面還有兩個隊列
recentRegisteredQueue
、recentlyChangedQueue
,其中registerQueue默認保存最近1000條注冊的實例信息。 - 后面就是一些狀態設置之類的操作
注冊表使用場景
我們注冊完成之后,打開eureka 后台配置頁面,可以看到自己的實例已經在頁面上了,那么這個東東是如何展示的呢?
我們都知道eureka-resources模塊下有很多jsp信息,點開status.jsp查看一下:
這里用到了 serverContext.getRegistry().getSortedApplications()
, 然后在通過獲取的Applicaiton
去執行app.getInstances()
等到了所有大的服務實例信息。
這里我們還需要回頭看下EurekaBootStrap
中的代碼,看看Application是如何來的。
從PeerAwareInstanceRegistryImpl.java
的getSortedApplications()
一直跟到 AbstractInstanceRegistry.java
的getApplicationsFromMultipleRegions()
,如下圖所示:
看到這里是不是就真相大白了?
這里再總結一下:
在jsp代碼中,拿到了EurekaServerContext,所以之前為什么要將這個東東放到一個Holder里面去,就是隨時都要從這個里面去獲取一些數據
然后會從EurekaServerContext,獲取到注冊表,PeerAwareInstanceRegistry,注冊表,從里面獲取所有的服務信息,從底層的map數據結構中,獲取所有的服務注冊的信息,遍歷,封裝到一個叫Application的東西里去,一個Application就代表了一個服務,里面包含很多個服務實例。
Eureka的服務注冊流程圖
申明
本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小伙伴可關注個人公眾號:壹枝花算不算浪漫