Eureka 系列(04)客戶端源碼分析
0. Spring Cloud 系列目錄 - Eureka 篇
在上一篇 Eureka 系列(01)最簡使用姿態 中對 Eureka 的簡單用法做了一個講解,本節分析一下 EurekaClient 的實現 DiscoveryClient。本文的源碼是基於 Eureka-1.9.8。
1)服務注冊(發送注冊請求到注冊中心)
2)服務發現(本質就是獲取調用服務名所對應的服務提供者實例信息,包括IP、port等)
3)服務續約(本質就是發送當前應用的心跳請求到注冊中心)
4)服務下線(本質就是發送取消注冊的HTTP請求到注冊中心)
1. DiscoveryClient 基本功能簡介
總結: DiscoveryClient 構造時會初始化一個 scheduler 定時任務調度器,兩個線程池 heartbeatExecutor 和 cacheRefreshExecutor,分別執行 CacheRefreshThread 和 HeartbeatThread 定時任務,前者定時(默認 30s)從 Eureka Server 更新服務列表,后者定時(默認 30s)上報心跳。
1.1 DiscoveryClient 初始化
DiscoveryClient 初始化最核心就是:一是服務發現定時任務,二是心跳發送定時任務。
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// default size of 2 - 1 each for heartbeat and cacheRefresh
// 1. scheduler 是 CacheRefreshThread 和 HeartbeatThread 任務調度器
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
// 2. 執行 HeartbeatThread 線程池,定時發送心跳
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 3. 執行 CacheRefreshThread 線程池,定時刷新服務列表
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 4. Eureka Server 服務端,用於 HTTP 通信
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
...
// 5. 啟動定時任務
initScheduledTasks();
}
總結: DiscoveryClient 代碼有刪減,只保留了最核心的功能,從上面的代碼來看還是很簡單的。下面再看一下 initScheduledTasks 干了些什么。至於每 4 步裝配 Http Client 會在每 5 小章具體講解。
1.2 initScheduledTasks 啟動定時任務
initScheduledTasks 啟動了以下幾個任務:一每 30s 同步一次服務列表;二每 30s 發送一次心跳信息;三是如果當前 InstanceInfo 發生變更,同步到 Eureka Server,默認 40s。
private void initScheduledTasks() {
// 1. 定時刷新服務列表,服務發現,默認 30s
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 2. 定時發送心跳,默認 30s
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this, instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
// 3. 監聽 instance 狀態
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 4. 定時同步當前 Eureka Client 信息(變更時)給 Eureka Server,默認 40s
instanceInfoReplicator.start(
clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
}
}
總結: Eureka DiscoveryClient 通過 CacheRefreshThread 和 HeartbeatThread 這兩個定時任務保證的服務的有效性。
2. 服務注冊與下線
總結: 服務的注冊與下線 OPEN API:
- 服務注冊(POST):
http://{ip}:{port}/eureka/apps/{appName} - 服務下線(DELETE):
http://{ip}:{port}/eureka/apps/{appName}/{id}
boolean register() throws Throwable {
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
throw e;
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
3. 服務發現
總結: 服務發現默認每 30s 同步一次數據,更新到本地緩存 localRegionApps 中。數據同步分兩種情況:
-
全量同步(GET):
http://{ip}:{port}/eureka/apps/,參數是 regions。這個 API 會獲取該 regions 下的全部服務實例 InstanceInfo,如果實例數很多會對網絡造成壓力,最好是按需要拉取,即 Client 需要訂閱那個服務就返回那個服務的實例。全量同步很簡單,getAndStoreFullRegistry 方法調用上述 API,獲取全量的 Applications 數據,直接設置給本地緩存 localRegionApps 即可。
-
增量同步(GET):
http://{ip}:{port}/eureka/apps/delta,參數是 regions。返回發生變化的服務實例,eg: ADDED、MODIFIED、DELETED。增量同步比全量同步要麻煩一些,getAndUpdateDelta 調用上述 API 返回發生變化的服務實例信息,與本地緩存 localRegionApps 進行對比,更新本地緩存。
思考: 增量同步失敗,返回數據為空,或者由於網絡等原因導致本地緩存和 Eureka Server 無法通過增量同步保持數據一致性時怎么辦?
DiscoveryClient 在進行增量同步時,有對應的補償機制,當增量同步失敗時回滾到全量同步。那如何判斷本地緩存和服務端數據不一致呢?Eureka DiscoveryClient 通過計算本地緩存和服務端的 hashcode,如果出現不一致的情況,則同樣回滾到全量同步。
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
Applications applications = getApplications();
// 1. 全量同步,基本上除了配置選項,第一次同步時全量同步,之后增量同步
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) {
getAndStoreFullRegistry();
// 2. 增量同步
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
return false;
} finally {
}
// 發布事件CacheRefreshedEvent,同時更新狀態
onCacheRefreshed();
updateInstanceRemoteStatus();
return true;
}
總結: 參數 forceFullRegistryFetch 表示強制全量同步。除了配置選項,基本第一次同步是全量同步,之后都增量同步。全量同步很簡單就不看了,看一下增量同步是怎么做的?
private void getAndUpdateDelta(Applications applications) throws Throwable {
// 1. 通過增量同步,獲取改變的服務實例列表
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
// 2. 增量同步失敗,回滾到全量同步
if (delta == null) {
getAndStoreFullRegistry();
// 3. 增量同步,對比本地緩存和delta信息,更新本地緩存
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 3. 增量同步,對比本地緩存和delta信息,更新本地緩存
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
}
// 4. 由於未知原因導致實例數不一致(此時hashcode會不一致)
// 無法通過增量同步,回滾到全量同步
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
}
}
總結: 增量同步考慮到了增量同步失敗或數據出現不一致的情況,進行了補償。其實這種補償機制也很簡單,以后做設計時可以考慮這種補償機制,提高代碼的健壯性。
4. 心跳檢測
總結: 健康檢測,一般都是 TTL(Time To Live) 機制。eg: 客戶端每 5s 發送心跳,服務端 15s 沒收到心跳包,更新實例狀態為不健康, 30s 未收到心跳包,從服務列表中刪除。 Eureka Server 是每 30s 發送心跳包,90s 未收心跳則刪除。
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
// 1. 發送心跳包
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
// 2. 如果服務端實例不存在,則重新注冊實例
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
return false;
}
}
5. 高可用客戶端(HA Client)
高可用客戶端(HA Client)多用於生產環境,客戶端應用關聯或配置注冊中心服務器集群,避免注冊中心單點故障。
常見配置手段:①多注冊中心主機;②注冊中心 DNS;③廣播
如果 Eureka 客戶端應用配置多個 Eureka 注冊服務器,那么默認情況只有第一台可用的服務器,存在注冊信息。如果第一台可用的 Eureka 服務器 Down 掉了,那么 Eureka 客戶端應用將會選擇下台可用的 Eureka 服務器。
客戶端配置如下:
eureka.client.service-url.defaultZone= \
http://peer1:10001/eureka,http://peer2:10001/eureka
思考: 那 Eureka Client 到底是訪問那台 Eureka Server 呢?如果其中一台 Eureka Server 宕機后怎么處理呢?
5.1 EurekaHttpClient 初始化
DiscoveryClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// 4. Eureka Server 服務端,用於 HTTP 通信
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
...
}
scheduleServerEndpointTask 最終初始化 EurekaTransport,EurekaTransport 最重要的屬性有兩個:一是 ClosableResolver,用於 Eureka Server 發現;二是 EurekaHttpClient 用於與 Eureka Server 通信。
private static final class EurekaTransport {
// Eureka Server 地址發現
private ClosableResolver bootstrapResolver;
private TransportClientFactory transportClientFactory;
// Eureka 注冊
private EurekaHttpClient registrationClient;
private EurekaHttpClientFactory registrationClientFactory;
// Eureka 查詢
private EurekaHttpClient queryClient;
private EurekaHttpClientFactory queryClientFactory;
}
總結: scheduleServerEndpointTask 的方法很長,我們只看最核心的代碼,即 bootstrapResolver 和 queryClient 的初始化。
private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
AbstractDiscoveryClientOptionalArgs args) {
// 1. ClusterResolver#getClusterEndpoints 可以獲取所的 endpoints
eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(
clientConfig,
transportConfig,
eurekaTransport.transportClientFactory,
applicationInfoManager.getInfo(),
applicationsSource
);
// 2. 初始化 queryClient,默認實現是 RetryableEurekaHttpClient
// registrationClient 初始化類似,就省略了
if (clientConfig.shouldFetchRegistry()) {
EurekaHttpClientFactory newQueryClientFactory = null;
EurekaHttpClient newQueryClient = null;
try {
newQueryClientFactory = EurekaHttpClients.queryClientFactory(
eurekaTransport.bootstrapResolver,
eurekaTransport.transportClientFactory,
clientConfig,
transportConfig,
applicationInfoManager.getInfo(),
applicationsSource
);
newQueryClient = newQueryClientFactory.newClient();
} catch (Exception e) {
}
eurekaTransport.queryClientFactory = newQueryClientFactory;
eurekaTransport.queryClient = newQueryClient;
}
...
}
總結: scheduleServerEndpointTask 方法是重要的工作:
-
一是初始化 ClusterResolver,用於獲取所有的 Eureka Server。默認實現是 ConfigClusterResolver,調用 EurekaClientConfig#getEurekaServerServiceUrls() 方法獲取配置的 Eureka 地址。
-
二是初始化 EurekaHttpClient,用於發送請求。RetryableEurekaHttpClient 會通過輪詢的方式 Eureka Server。需要注意的是:只有第一台宕機時,才會輪詢,否則正常情況下永遠只訪問第一台。
-
EurekaHttpClients.queryClientFactory 創建 EurekaHttpClient 的責任鏈。默認情況下:
graph LR SessionedEurekaHttpClient --> RetryableEurekaHttpClient RetryableEurekaHttpClient --> RedirectingEurekaHttpClient RedirectingEurekaHttpClient --> JerseyApplicationClient其中最終用於發送請求的 JerseyApplicationClient,由
eurekaTransport.transportClientFactory.newClient()構建。默認實現如下:graph LR Jersey1TransportClientFactories -- newTransportClientFactory --> JerseyEurekaHttpClientFactory JerseyEurekaHttpClientFactory --newClient --> JerseyApplicationClient
5.2 EurekaHttpClient 執行流程
RetryableEurekaHttpClient 通過 ConfigClusterResolver 解析獲取所有配置的 Eureka ServerUrls,默認只會調用每一台 Eureka Server,只有當第一台宕機時才會調用下一台。 也就是通過 EurekaClientConfig#getEurekaServerServiceUrls 獲取 eureka.client.service-url.defaultZone=http://peer1:10001/eureka,http://peer2:10001/eureka 配置的集群地址。
總結: RetryableEurekaHttpClient 通過輪詢的方式保證客戶端的高可用,主要的執行流程分三步:
- 獲取所有的 Eureka Server。ConfigClusterResolver 獲取所有的地址后,通過輪詢算法選擇一台 Server。
- 根據這個 Server 構建一個真實發送請求的 EurekaHttpClient。
- EurekaHttpClient 發送請求,如果失敗則重試。
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
for (int retry = 0; retry < numberOfRetries; retry++) {
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
// 獲取所有的 Endpoint
if (candidateHosts == null) {
candidateHosts = getHostCandidates();
if (candidateHosts.isEmpty()) {
throw new TransportException("There is no known eureka server; cluster server list is empty");
}
}
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}
// 2. 輪詢獲取 currentEndpoint,注意只有每一台無法訪問時才會訪問下一台
// currentHttpClient 才是真實發送請求的 EurekaHttpClient
// 在 spring cloud(sc) 中默認的實現是 RestTemplateEurekaHttpClient
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}
// 3. 發送請求,成功則返回,失敗則是重試
try {
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
return response;
}
} catch (Exception e) {
}
delegate.compareAndSet(currentHttpClient, null);
if (currentEndpoint != null) {
quarantineSet.add(currentEndpoint);
}
}
throw new TransportException("Retry limit reached; giving up on completing the request");
}
總結: RetryableEurekaHttpClient 通過輪詢的方式保證高可用客戶端(HA Client)
- RetryableEurekaHttpClient 繼承關系:RetryableEurekaHttpClient -> EurekaHttpClientDecorator -> EurekaHttpClient。EurekaHttpClientDecorator 只是一個包裝類,具體的發送請求過程都委托給子類 RetryableEurekaHttpClient#execute(EurekaHttpClient delegate) 完成。
- RetryableEurekaHttpClient 通過輪詢的方式獲取 currentEndpoint,再通過
clientFactory.newClient(currentEndpoint)構建一個真正用於發送請求的 EurekaHttpClient,在 Spring Cloud(SC) 中的默認實現是 RestTemplateEurekaHttpClient。
6. 總結
6.1 Eureka OPEN API
| 操作 | 請求方式 | 路徑 | 參數 |
|---|---|---|---|
| 注冊(register) | POST | apps/{appName} |
instanceInfo |
| 下線(unregister) | DELETE | apps/{appName}/{id} |
-- |
| 全量同步(unregister) | GET | apps/ |
regions |
| 增量同步(unregister) | GET | apps/delta |
regions |
| 心跳(sendHeartBeat) | PUT | apps/{appName}/{id} |
-- |
6.2 實例注冊
Eureka DiscoveryClient 默認延遲 40s 注冊實例信息,之后如果實例信息發生變化,則每 30s 同步一次數據。
| 參數 | 功能 | 默認值 |
|---|---|---|
| registerWithEureka | 是否將本機實例注冊到 Eureka Server 上 | true |
| initialInstanceInfoReplicationIntervalSeconds | 初始化注冊的延遲時間 | 40s |
| instanceInfoReplicationIntervalSeconds | 定時更新本機實例信息到 Eureka Server 的時間間隔 | 30s |
6.3 數據同步 - 服務發現
Eureka DiscoveryClient 默認每 30s 同步一次數據,更新本地緩存 localRegionApps 。數據同步分為全量同步和增量同步。
-
全量同步:獲取該 regions 下的全部服務實例 InstanceInfo,如果實例數很多會對網絡造成壓力,最好是按需要拉取,即 Client 需要訂閱那個服務就返回那個服務的實例。
-
增量同步:返回發生變化的服務實例,eg: ADDED、MODIFIED、DELETED。
增量同步比全量同步要麻煩一些,getAndUpdateDelta 調用上述 API 返回發生變化的服務實例信息,與本地緩存 localRegionApps 進行對比,更新本地緩存。如果增量同步失敗回滾到全量同步。
表3:Eureka 服務發現配置參數
| 參數 | 功能 | 默認值 |
|---|---|---|
| fetchRegistry | 是否從 Eureka Server 獲取注冊信息 | true |
| registryFetchIntervalSeconds | 定時同步本地的服務實例信息緩存的時間間隔 | 30s |
6.4 健康檢查 - 心跳機制
Eureka DiscoveryClient 默認每 30s 發送心跳包,Server 如果 90s 未收心跳則刪除。
| 參數 | 功能 | 默認值 | 來源 |
|---|---|---|---|
| renewalIntervalInSecs | 心跳的時間間隔 | 30s | LeaseInfo |
| durationInSecs | 定時同步本地的服務實例信息緩存的時間間隔 | 90s | LeaseInfo |
6.5 思考
- 當注冊應用之間存在相互關聯時,那么上層應用如何感知下層服務的存在?
- 如果上層應用感知到下層服務,那么它是怎么同步下層服務信息?
- 如果應用需要實時地同步信息,那么確保一致性?
每天用心記錄一點點。內容也許不重要,但習慣很重要!
