SpringCloud 源碼系列(2)—— 注冊中心 Eureka(中)


SpringCloud 源碼系列(1)—— 注冊中心 Eureka(上)

SpringCloud 源碼系列(2)—— 注冊中心 Eureka(中)

SpringCloud 源碼系列(3)—— 注冊中心 Eureka(下)

 

五、服務注冊

1、實例信息注冊器初始化

服務注冊的代碼位置不容易發現,我們看 DiscoveryClient 初始化調度任務的這個方法,這段代碼會去初始化一個實例信息復制器 InstanceInfoReplicator,這個復制器就包含了實例的注冊(明明是注冊卻叫 Replicator 感覺怪怪的)。

① DiscoveryClient 初始化調度器的流程

  • 先基於 DiscoveryClient、InstanceInfo 構造 InstanceInfoReplicator,然后還有兩個參數為實例信息復制間隔時間(默認30秒)、並發的數量(默認為2)。
  • 創建了一個實例狀態變更監聽器,並注冊到 ApplicationInfoManager。當實例狀態變更時,就會觸發這個監聽器,並調用 InstanceInfoReplicator 的 onDemandUpdate 方法。
  • 啟動 InstanceInfoReplicator,默認延遲40秒,也就是說服務啟動可能40秒之后才會注冊到注冊中心。
 1 private void initScheduledTasks() {
 2     // 省略定時刷新注冊表的任務...
 3 
 4     if (clientConfig.shouldRegisterWithEureka()) {
 5         // 省略定時心跳的任務...
 6 
 7         // 實例信息復制器,用於定時更新自己狀態,並向注冊中心注冊
 8         instanceInfoReplicator = new InstanceInfoReplicator(
 9                 this,
10                 instanceInfo,
11                 clientConfig.getInstanceInfoReplicationIntervalSeconds(),
12                 2); // burstSize
13 
14         // 實例狀態變更的監聽器
15         statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
16             @Override
17             public String getId() {
18                 return "statusChangeListener";
19             }
20 
21             @Override
22             public void notify(StatusChangeEvent statusChangeEvent) {
23                 if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) {
24                     logger.error("Saw local status change event {}", statusChangeEvent);
25                 } else {
26                     logger.info("Saw local status change event {}", statusChangeEvent);
27                 }
28                 instanceInfoReplicator.onDemandUpdate();
29             }
30         };
31 
32         // 向 ApplicationInfoManager 注冊狀態變更監聽器
33         if (clientConfig.shouldOnDemandUpdateStatusChange()) {
34             applicationInfoManager.registerStatusChangeListener(statusChangeListener);
35         }
36 
37         // 啟動實例信息復制器,默認延遲時間40秒
38         instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
39     } else {
40         logger.info("Not registering with Eureka server per configuration");
41     }
42 }

② InstanceInfoReplicator 的構造方法

  • 創建了一個單線程的調度器
  • 設置 started 為 false
  • 創建了以分鍾為單位的限流器,每分鍾默認最多只能調度4次
 1 InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
 2     this.discoveryClient = discoveryClient;
 3     this.instanceInfo = instanceInfo;
 4     // 單線程的調度器
 5     this.scheduler = Executors.newScheduledThreadPool(1,
 6             new ThreadFactoryBuilder()
 7                     .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
 8                     .setDaemon(true)
 9                     .build());
10 
11     this.scheduledPeriodicRef = new AtomicReference<Future>();
12     // started 設置為 false
13     this.started = new AtomicBoolean(false);
14     // 以分鍾為單位的限流器
15     this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
16     // 間隔時間,默認為30秒
17     this.replicationIntervalSeconds = replicationIntervalSeconds;
18     this.burstSize = burstSize;
19     // 允許每分鍾更新的頻率 60 * 2 / 30 = 4
20     this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
21     logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
22 }

③ 啟動 InstanceInfoReplicator

  • 將 started 設置為 true,代表已經啟動了
  • 調用 instanceInfo.setIsDirty() 方法,將實例設置為 dirty=true,並更新了最后一次設置 dirty 的時間戳
  • InstanceInfoReplicator 實現了 Runnable,它本身被當成任務來調度,然后延遲40秒開始調度當前任務,並將 Future 放到本地變量中
 1 public void start(int initialDelayMs) {
 2     // 啟動時 started 設置為 true
 3     if (started.compareAndSet(false, true)) {
 4         // 設置為 dirty,便於下一次心跳時同步到 eureka server
 5         instanceInfo.setIsDirty();
 6         // 延遲40秒后開始調度當前任務
 7         Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
 8         // 將 Future 放到本地變量中
 9         scheduledPeriodicRef.set(next);
10     }
11 }
12 
13 ///////
14 
15 public synchronized void setIsDirty() {
16     isInstanceInfoDirty = true;
17     lastDirtyTimestamp = System.currentTimeMillis();
18 }

2、客戶端實例注冊

① 實現注冊的run方法

接着看 InstanceInfoReplicator 的 run 方法,這個方法就是完成注冊的核心位置。

  • 首先會更新實例的信息,如果有變更就會設置 dirty=true
  • 如過是 dirty 的,就會調用 DiscoveryClient 的 register 方法注冊實例
  • 實例注冊后,就把 dirty 設置為 false
  • 最后在 finally 中繼續下一次的調度,默認是每隔30秒調度一次,注意他這里是把調度結果 Future 放到本地變量中
 1 public void run() {
 2     try {
 3         // 更新本地實例信息,如果實例信息有變更,則 dirty=true
 4         discoveryClient.refreshInstanceInfo();
 5 
 6         // 設置為 dirty 時的時間戳
 7         Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
 8         if (dirtyTimestamp != null) {
 9             // 注冊實例
10             discoveryClient.register();
11             // 設置 dirty=false
12             instanceInfo.unsetIsDirty(dirtyTimestamp);
13         }
14     } catch (Throwable t) {
15         logger.warn("There was a problem with the instance info replicator", t);
16     } finally {
17         // 30秒之后再調度
18         Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
19         scheduledPeriodicRef.set(next);
20     }
21 }

② 實例信息刷新

再來細看下 refreshInstanceInfo 刷新實例信息的方法:

  • 首先刷新了數據中心的信息
  • 然后刷新續約信息,主要就是將 EurekaClientConfig 的續約配置與本地的續約配置做對比,如果變更了就重新創建續約信息,並設置實例為dirty。這種情況一般就是運行期間動態更新實例的配置,然后重新注冊實例信息。
  • 接着使用健康檢查器檢查實例健康狀況,從 getHealthCheckHandler 這段代碼進去不難發現,我們可以自定義健康檢查器,例如當本地的一些資源未創建成功、某些核心線程池down了就認為實例不可用,這個時候就可以自定義健康檢查器。如果沒有自定義健康檢查器,那就直接返回實例當前的狀態。我們可以實現 HealthCheckHandler 接口自定義健康檢查器。
  • 最后就會調用 ApplicationInfoManager 的 setInstanceStatus 設置實例狀態,會判斷如果狀態發生變更,就會發出狀態變更的通知,這樣就會觸發前面定義的狀態變更監聽器,然后調用 InstanceInfoReplicator 的 onDemandUpdate 方法。
 1 void refreshInstanceInfo() {
 2     // 如果有必要,就更新數據中心的信息
 3     applicationInfoManager.refreshDataCenterInfoIfRequired();
 4     // 如果有必要,就更新續約信息,比如動態更新了配置文件,這時就更新續約信息 LeaseInfo,並將實例設置為 dirty
 5     applicationInfoManager.refreshLeaseInfoIfRequired();
 6 
 7     InstanceStatus status;
 8     try {
 9         // 用監控檢查器檢查實例的狀態
10         status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
11     } catch (Exception e) {
12         logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
13         status = InstanceStatus.DOWN;
14     }
15 
16     if (null != status) {
17         // 設置實例狀態,實例狀態變了會觸發狀態變更的監聽器
18         applicationInfoManager.setInstanceStatus(status);
19     }
20 }
21 
22 /////////////////////////////////
23 
24 public void refreshLeaseInfoIfRequired() {
25     // 當前實例續約信息
26     LeaseInfo leaseInfo = instanceInfo.getLeaseInfo();
27     if (leaseInfo == null) {
28         return;
29     }
30     // 從配置中獲取續約信息
31     int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds();
32     int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds();
33     // 如果續約信息變了,就重新創建續約信息,並設置實例為 dirty
34     if (leaseInfo.getDurationInSecs() != currentLeaseDuration || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) {
35         LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder()
36                 .setRenewalIntervalInSecs(currentLeaseRenewal)
37                 .setDurationInSecs(currentLeaseDuration)
38                 .build();
39         instanceInfo.setLeaseInfo(newLeaseInfo);
40         instanceInfo.setIsDirty();
41     }
42 }
43 
44 /////////////////////////////////
45 
46 public HealthCheckHandler getHealthCheckHandler() {
47     HealthCheckHandler healthCheckHandler = this.healthCheckHandlerRef.get();
48     if (healthCheckHandler == null) {
49         // 可以自定義 HealthCheckHandler 實現健康檢查
50         if (null != healthCheckHandlerProvider) {
51             healthCheckHandler = healthCheckHandlerProvider.get();
52         } else if (null != healthCheckCallbackProvider) {
53             // 可以自定義 HealthCheckCallback 實現健康檢查,HealthCheckCallback 已過期,建議使用 HealthCheckHandler
54             healthCheckHandler = new HealthCheckCallbackToHandlerBridge(healthCheckCallbackProvider.get());
55         }
56 
57         if (null == healthCheckHandler) {
58             // 沒有自定義的就是用默認的橋接類
59             healthCheckHandler = new HealthCheckCallbackToHandlerBridge(null);
60         }
61         this.healthCheckHandlerRef.compareAndSet(null, healthCheckHandler);
62     }
63 
64     return this.healthCheckHandlerRef.get();
65 }
66 
67 //////////////////////////////////////
68 
69 public synchronized void setInstanceStatus(InstanceStatus status) {
70     InstanceStatus next = instanceStatusMapper.map(status);
71     if (next == null) {
72         return;
73     }
74 
75     // 如果狀態變更了,才會返回之前的狀態,然后觸發狀態變更監聽器
76     InstanceStatus prev = instanceInfo.setStatus(next);
77     if (prev != null) {
78         for (StatusChangeListener listener : listeners.values()) {
79             try {
80                 listener.notify(new StatusChangeEvent(prev, next));
81             } catch (Exception e) {
82                 logger.warn("failed to notify listener: {}", listener.getId(), e);
83             }
84         }
85     }
86 }
View Code

③ 向 eureka server 注冊

在 run 方法里調用了 discoveryClient.register() 方法實現了客戶端實例向注冊中心的注冊,進入到 register 方法可以看到,他就是使用前面構造的 EurekaTransport 來發起遠程調用。

一層層進去,很容易發現就是調用了 eureka-server 的 POST /apps/{appName} 接口,后面我們就從 eureka-core 中找這個接口就可以找到注冊中心實現服務注冊的入口了。

 1 boolean register() throws Throwable {
 2     logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
 3     EurekaHttpResponse<Void> httpResponse;
 4     try {
 5         // registrationClient => JerseyReplicationClient
 6         httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
 7     } catch (Exception e) {
 8         logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
 9         throw e;
10     }
11     if (logger.isInfoEnabled()) {
12         logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
13     }
14     return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
15 }
16 
17 /////////////////////////////////
18 
19 public EurekaHttpResponse<Void> register(InstanceInfo info) {
20     // 調用的是 POST apps/{appName} 接口
21     String urlPath = "apps/" + info.getAppName();
22     ClientResponse response = null;
23     try {
24         Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
25         addExtraHeaders(resourceBuilder);
26         response = resourceBuilder
27                 .header("Accept-Encoding", "gzip")
28                 .type(MediaType.APPLICATION_JSON_TYPE)
29                 .accept(MediaType.APPLICATION_JSON)
30                 // post 方法
31                 .post(ClientResponse.class, info);
32         return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
33     } finally {
34         if (logger.isDebugEnabled()) {
35             logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
36                     response == null ? "N/A" : response.getStatus());
37         }
38         if (response != null) {
39             response.close();
40         }
41     }
42 }
View Code

④ 注冊中心設置實例狀態為已啟動

再回想下注冊中心的初始化流程,在最后調用 openForTraffic 方法時,最后也會調用 ApplicationInfoManager 的 setInstanceStatus 方法,將實例狀態設置為已啟動,這個時候就會觸發客戶端注冊到注冊中心的動作。

applicationInfoManager.setInstanceStatus(InstanceStatus.UP);

⑤ 完成監聽實例變更的方法

狀態變更器會調用 onDemandUpdate 方法來完成實例狀態變更后的邏輯。

  • 它這里一個是用到了限流器來限制每分鍾這個方法只能被調用4次,即避免了頻繁的注冊行為
  • 然后在調度時,它會從本地變量中取出上一次調度的 Future,如果任務還沒執行完,它會直接取消掉
  • 最后就是調用 run 方法,完成服務的注冊
 1 public boolean onDemandUpdate() {
 2     // 限流控制
 3     if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
 4         if (!scheduler.isShutdown()) {
 5             scheduler.submit(new Runnable() {
 6                 @Override
 7                 public void run() {
 8                     logger.debug("Executing on-demand update of local InstanceInfo");
 9 
10                     // 如果上一次的任務還沒有執行完,直接取消掉,然后執行注冊的任務
11                     Future latestPeriodic = scheduledPeriodicRef.get();
12                     if (latestPeriodic != null && !latestPeriodic.isDone()) {
13                         logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
14                         latestPeriodic.cancel(false);
15                     }
16 
17                     InstanceInfoReplicator.this.run();
18                 }
19             });
20             return true;
21         } else {
22             logger.warn("Ignoring onDemand update due to stopped scheduler");
23             return false;
24         }
25     } else {
26         logger.warn("Ignoring onDemand update due to rate limiter");
27         return false;
28     }
29 }
View Code

⑥ 限流器

最后簡單看下限流器 RateLimiter 的設計:

  • 從它的注釋中可以看出,eureka 的 RateLimiter 是基於令牌桶算法實現的限流器
  • acquire 方法有兩個參數:
    • burstSize:允許以突發方式進入系統的最大請求數
    • averageRate:設置的時間窗口內允許進入的請求數
 1 /**
 2  * Rate limiter implementation is based on token bucket algorithm. There are two parameters:
 3  * <ul>
 4  * <li>
 5  *     burst size - maximum number of requests allowed into the system as a burst
 6  * </li>
 7  * <li>
 8  *     average rate - expected number of requests per second (RateLimiters using MINUTES is also supported)
 9  * </li>
10  * </ul>
11  *
12  * @author Tomasz Bak
13  */
14 public class RateLimiter {
15 
16     private final long rateToMsConversion;
17 
18     private final AtomicInteger consumedTokens = new AtomicInteger();
19     private final AtomicLong lastRefillTime = new AtomicLong(0);
20 
21     @Deprecated
22     public RateLimiter() {
23         this(TimeUnit.SECONDS);
24     }
25 
26     public RateLimiter(TimeUnit averageRateUnit) {
27         switch (averageRateUnit) {
28             case SECONDS:
29                 rateToMsConversion = 1000;
30                 break;
31             case MINUTES:
32                 rateToMsConversion = 60 * 1000;
33                 break;
34             default:
35                 throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported");
36         }
37     }
38 
39     public boolean acquire(int burstSize, long averageRate) {
40         return acquire(burstSize, averageRate, System.currentTimeMillis());
41     }
42 
43     public boolean acquire(int burstSize, long averageRate, long currentTimeMillis) {
44         if (burstSize <= 0 || averageRate <= 0) { // Instead of throwing exception, we just let all the traffic go
45             return true;
46         }
47 
48         refillToken(burstSize, averageRate, currentTimeMillis);
49         return consumeToken(burstSize);
50     }
51 
52     private void refillToken(int burstSize, long averageRate, long currentTimeMillis) {
53         // 上一次填充 token 的時間
54         long refillTime = lastRefillTime.get();
55         // 時間差
56         long timeDelta = currentTimeMillis - refillTime;
57         // 固定生成令牌的速率,即每分鍾4次
58         // 例如剛好間隔15秒進來一個請求,就是 15000 * 4 / 60000 = 1,newTokens 代表間隔了多少次,如果等於0,說明間隔不足15秒
59         long newTokens = timeDelta * averageRate / rateToMsConversion;
60         if (newTokens > 0) {
61             long newRefillTime = refillTime == 0
62                     ? currentTimeMillis
63                     // 注意這里不是直接設置的當前時間戳,而是根據 newTokens 重新計算的,因為有可能同一周期內同時有多個請求進來,這樣可以保持一個固定的周期
64                     : refillTime + newTokens * rateToMsConversion / averageRate;
65             if (lastRefillTime.compareAndSet(refillTime, newRefillTime)) {
66                 while (true) {
67                     // 調整令牌的數量
68                     int currentLevel = consumedTokens.get();
69                     int adjustedLevel = Math.min(currentLevel, burstSize);
70                     // currentLevel 可能為2,重置為了 0 或 1
71                     int newLevel = (int) Math.max(0, adjustedLevel - newTokens);
72                     if (consumedTokens.compareAndSet(currentLevel, newLevel)) {
73                         return;
74                     }
75                 }
76             }
77         }
78     }
79 
80     private boolean consumeToken(int burstSize) {
81         while (true) {
82             int currentLevel = consumedTokens.get();
83             // 突發數量為2,也就是允許15秒內最多有兩次請求進來
84             if (currentLevel >= burstSize) {
85                 return false;
86             }
87             if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) {
88                 return true;
89             }
90         }
91     }
92 
93     public void reset() {
94         consumedTokens.set(0);
95         lastRefillTime.set(0);
96     }
97 }
View Code

3、Eureka Server 接收注冊請求

① 找到實例注冊的API入口

從前面的分析中,我們知道服務端注冊的API是 POST /apps/{appName},由於 eureka 是基於 jersey 來通信的,想找到API入口還是有點費勁的,至少沒有 springmvc 那么容易。

先看 ApplicationsResource 這個類,可以找到 getApplicationResource 這個方法的路徑是符合 /apps/{appName} 這個規則的。然后可以看到它里面創建了 ApplicationResource,再進入到這個類里面,就可以找到 @Post 標注的 addInstance 方法,這就是注冊的入口了。可以看到它是調用了注冊表的 register 方法來注冊實例的。

 1 @Path("/{version}/apps")
 2 @Produces({"application/xml", "application/json"})
 3 public class ApplicationsResource {
 4     private final EurekaServerConfig serverConfig;
 5     private final PeerAwareInstanceRegistry registry;
 6     private final ResponseCache responseCache;
 7 
 8     // 符合規則 /apps/{appName}
 9     @Path("{appId}")
10     public ApplicationResource getApplicationResource(
11             @PathParam("version") String version,
12             @PathParam("appId") String appId) {
13         CurrentRequestVersion.set(Version.toEnum(version));
14         try {
15             // 真正的入口
16             return new ApplicationResource(appId, serverConfig, registry);
17         } finally {
18             CurrentRequestVersion.remove();
19         }
20     }
21 }
22 
23 /////////////////////////////////
24 
25 @Produces({"application/xml", "application/json"})
26 public class ApplicationResource {
27 
28     private final PeerAwareInstanceRegistry registry;
29 
30     @POST
31     @Consumes({"application/json", "application/xml"})
32     public Response addInstance(InstanceInfo info,
33                                 @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
34         logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
35 
36         registry.register(info, "true".equals(isReplication));
37         return Response.status(204).build();  // 204 to be backwards compatible
38     }
39 }

addInstance 接口有兩個參數:

  • InstanceInfo:服務實例,主要有兩塊數據:
    • 基本信息:主機名、IP地址、端口號、URL地址
    • 租約信息:保持心跳的間隔時間、最近心跳的時間、服務注冊的時間、服務啟動的時間
  • isReplication:這個參數是從請求頭中取的,表示是否是在同步 server 節點的實例。在集群模式下,因為客戶端實例注冊到注冊中心后,會同步到其它 server節點,所以如果是eureka-server之間同步信息,這個參數就為 true,避免循環同步。

② 實例注冊

進入到注冊表的 register 方法,可以看到主要就是調用父類的 register 方法注冊實例,然后同步到 eureka server 集群中的其它 server 節點。集群同步放到后面來看,現在只需要知道注冊實例時會同步到其它server節點即可。

 1 @Override
 2 public void register(final InstanceInfo info, final boolean isReplication) {
 3     int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
 4     // 如果實例中沒有周期的配置,就設置為默認的 90 秒
 5     if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
 6         leaseDuration = info.getLeaseInfo().getDurationInSecs();
 7     }
 8     // 注冊實例
 9     super.register(info, leaseDuration, isReplication);
10     // 復制到集群其它 server 節點
11     replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
12 }

接着看父類的注冊方法,它的主要流程如下:

  • 首先可以看到eureka server保存注冊表(registry)的數據結構是 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>,key 就是服務名稱,value 就是對應的實例,因為一個服務可能會部署多個實例。
  • 根據服務名稱從注冊表拿到實例表,然后根據實例ID拿到實例的租約信息 Lease<InstanceInfo>
  • 如果租約信息存在,說明已經注冊過相同的實例了,然后就對比已存在實例和新注冊實例的最后更新時間,如果新注冊的是舊的,就替換為已存在的實例來完成注冊
  • 如果租約信息不存在,說明是一個新注冊的實例,這時會更新兩個閾值:
    • 期望續約的客戶端數量 +1
    • 每分鍾續約次數的閾值,如果低於這個值,說明有很多客戶端沒有發送心跳,這時eureka就認為可能網絡出問題了,就會有另一些機制,這個后面再說
  • 然后就根據注冊的實例信息和續約周期創建新的租約,並放入注冊表中去
  • 接着根據當前時間戳、服務名稱、實例ID封裝一個 Pair,然后放入到最近注冊的隊列中 recentRegisteredQueue,先記住這個隊列就行了
  • 根據實例的 overriddenStatus 判斷,不為空的話,可能就只是要更新實例的狀態,這個時候就會只變更實例的狀態,而不會改變 dirty
  • 然后是設置了實例的啟動時間戳,設置了實例的 ActionType 為 ADDED
  • 將租約加入到最近變更的隊列 recentlyChangedQueue,先記住這個隊列
  • 最后一步失效緩存,一步步進去可以發現,主要就是將讀寫緩存 readWriteCacheMap 中與這個實例相關的緩存失效掉,這個緩存后面分析抓取注冊表的時候再來細看
 1 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
 2     read.lock();
 3     try {
 4         // registry => ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
 5         Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
 6         REGISTER.increment(isReplication);
 7         if (gMap == null) {
 8             // 初次注冊時,創建一個 ConcurrentHashMap,key 為 appName
 9             final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
10             gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
11             if (gMap == null) {
12                 gMap = gNewMap;
13             }
14         }
15         Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
16         // Retain the last dirty timestamp without overwriting it, if there is already a lease
17         if (existingLease != null && (existingLease.getHolder() != null)) {
18             // 已存在的實例的最后更新時間
19             Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
20             // 新注冊的實例的最后更新時間
21             Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
22             logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
23 
24             // 如果存在的實例比新注冊盡量的實例后更新,就直接把新注冊的實例設置為已存在的實例
25             if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
26                 logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
27                         " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
28                 logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
29                 registrant = existingLease.getHolder();
30             }
31         } else {
32             // 新注冊時,續約信息不存在
33             synchronized (lock) {
34                 if (this.expectedNumberOfClientsSendingRenews > 0) {
35                     // Since the client wants to register it, increase the number of clients sending renews
36                     // 期望續約的客戶端數量 + 1
37                     this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
38                     // 更新每分鍾續約請求次數的閥值,這個閥值在后面很多地方都會用到
39                     updateRenewsPerMinThreshold();
40                 }
41             }
42             logger.debug("No previous lease information found; it is new registration");
43         }
44         // 創建新的續約
45         Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
46         if (existingLease != null) {
47             lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
48         }
49         gMap.put(registrant.getId(), lease);
50         // 放入最近注冊的隊列
51         recentRegisteredQueue.add(new Pair<Long, String>(
52                 System.currentTimeMillis(),
53                 registrant.getAppName() + "(" + registrant.getId() + ")"));
54         // 覆蓋狀態
55         if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
56             logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
57                             + "overrides", registrant.getOverriddenStatus(), registrant.getId());
58             if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
59                 logger.info("Not found overridden id {} and hence adding it", registrant.getId());
60                 overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
61             }
62         }
63         InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
64         if (overriddenStatusFromMap != null) {
65             logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
66             registrant.setOverriddenStatus(overriddenStatusFromMap);
67         }
68 
69         // Set the status based on the overridden status rules
70         InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
71         // 僅僅是變更實例狀態,不會設置為 dirty
72         registrant.setStatusWithoutDirty(overriddenInstanceStatus);
73 
74         // If the lease is registered with UP status, set lease service up timestamp
75         if (InstanceStatus.UP.equals(registrant.getStatus())) {
76             // UP 時設置 Lease 的時間戳
77             lease.serviceUp();
78         }
79         // 設置動作是 ADDED,這個在后面會做 switch 判斷
80         registrant.setActionType(ActionType.ADDED);
81         // 添加到最近變更的隊列
82         recentlyChangedQueue.add(new RecentlyChangedItem(lease));
83         // 設置最后更新時間
84         registrant.setLastUpdatedTimestamp();
85         // 失效緩存
86         invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
87         logger.info("Registered instance {}/{} with status {} (replication={})",
88                 registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
89     } finally {
90         read.unlock();
91     }
92 }
View Code

更新每分鍾續約次數的閾值:

1 protected void updateRenewsPerMinThreshold() {
2     // 每分鍾續約閾值 = 期望續約的客戶端數量 * (60 / 續約間隔時間) * 續約百分比
3     // 例如,一共注冊了 10 個實例,那么期望續約的客戶端數量為 10,間隔時間默認為 30秒,就是每個客戶端應該每30秒發送一次心跳,續約百分比默認為 0.85
4     // 每分鍾續約次數閾值 = 10 * (60.0 / 30) * 0.85 = 17,也就是說每分鍾至少要接收到 17 此續約請求
5     this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
6             * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
7             * serverConfig.getRenewalPercentThreshold());
8 }

這就是注冊表 registry 緩存服務實例信息的結構,可以看出 eureka 是基於內存來組織注冊表的,使用的是 ConcurrentHashMap 來保證多線程並發安全。

4、Eureka Server 控制台

前面已經將服務實例注冊上去了,現在來看下 eureka server 的控制台頁面是怎么獲取這些數據的。

前面已經分析過 eureka-server 的 web.xml 中配置了歡迎頁為 status.jsp ,這就是控制台的頁面。

從 status.jsp 可以看出,其實就是從 EurekaServerContext 上下文獲取注冊表,然后讀取注冊表注冊的服務實例,然后遍歷展示到表格中。

  1 <%@ page language="java" import="java.util.*,java.util.Map.Entry,com.netflix.discovery.shared.Pair,
  2 com.netflix.discovery.shared.*,com.netflix.eureka.util.*,com.netflix.appinfo.InstanceInfo.*,
  3 com.netflix.appinfo.DataCenterInfo.*,com.netflix.appinfo.AmazonInfo.MetaDataKey,com.netflix.eureka.resources.*,
  4 com.netflix.eureka.*,com.netflix.appinfo.*,com.netflix.eureka.util.StatusUtil" pageEncoding="UTF-8" %>
  5 <%
  6 String path = request.getContextPath();
  7 String basePath = request.getScheme()+"://"+request.getServerName()+":"+request.getServerPort()+path+"/";
  8 %>
  9 
 10 <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
 11 
 12 <html>
 13   <head>
 14     <base href="<%=basePath%>">
 15 
 16     <title>Eureka</title>
 17     <link rel="stylesheet" type="text/css" href="./css/main.css">
 18     <script type="text/javascript" src="./js/jquery-1.11.1.js" ></script>
 19     <script type="text/javascript" src="./js/jquery.dataTables.js" ></script>
 20     <script type="text/javascript" >
 21        $(document).ready(function() {
 22            $('table.stripeable tr:odd').addClass('odd');
 23            $('table.stripeable tr:even').addClass('even');
 24            $('#instances thead th').each(function () {
 25                var title = $('#instances thead th').eq($(this).index()).text();
 26                $(this).html(title + '</br><input type="text" placeholder="Search ' + title + '" />');
 27            });
 28            // DataTable
 29            var table = $('#instances').DataTable({"paging": false, "bInfo": false, "sDom": 'ltipr', "bSort": false});
 30            // Apply the search
 31            table.columns().eq(0).each(function (colIdx) {
 32                $('input', table.column(colIdx).header()).on('keyup change', function () {
 33                    table.column(colIdx).search(this.value).draw();
 34                });
 35            });
 36        });
 37     </script>
 38   </head>
 39 
 40   <body id="one">
 41     <jsp:include page="header.jsp" />
 42     <jsp:include page="navbar.jsp" />
 43     <div id="content">
 44       <div class="sectionTitle">Instances currently registered with Eureka</div>
 45         <table id='instances' class="stripeable">
 46            <thead><tr><th>Application</th><th>AMIs</th><th>Availability Zones</th><th>Status</th></tr></thead>
 47            <tfoot><tr><th>Application</th><th>AMIs</th><th>Availability Zones</th><th>Status</th></tr></tfoot>
 48            <tbody>
 49            <%
 50                // 獲取 eureka server 上下文 EurekaServerContext
 51            EurekaServerContext serverContext = (EurekaServerContext) pageContext.getServletContext()
 52                    .getAttribute(EurekaServerContext.class.getName());
 53            // 從上下文中取出注冊表,
 54            for(Application app : serverContext.getRegistry().getSortedApplications()) {
 55                out.print("<tr><td><b>" + app.getName() + "</b></td>");
 56                Map<String, Integer> amiCounts = new HashMap<String, Integer>();
 57                Map<InstanceStatus,List<Pair<String, String>>> instancesByStatus =
 58                    new HashMap<InstanceStatus, List<Pair<String,String>>>();
 59                Map<String,Integer> zoneCounts = new HashMap<String, Integer>();
 60 
 61                for(InstanceInfo info : app.getInstances()){
 62                    String id = info.getId();
 63                    String url = info.getStatusPageUrl();
 64                    InstanceStatus status = info.getStatus();
 65                    String ami = "n/a";
 66                    String zone = "";
 67                    if(info.getDataCenterInfo().getName() == Name.Amazon){
 68                        AmazonInfo dcInfo = (AmazonInfo)info.getDataCenterInfo();
 69                        ami = dcInfo.get(MetaDataKey.amiId);
 70                        zone = dcInfo.get(MetaDataKey.availabilityZone);
 71                    }
 72 
 73                    Integer count = amiCounts.get(ami);
 74                    if(count != null){
 75                        amiCounts.put(ami, Integer.valueOf(count.intValue()+1));
 76                    }else {
 77                        amiCounts.put(ami, Integer.valueOf(1));
 78                    }
 79 
 80                    count = zoneCounts.get(zone);
 81                    if(count != null){
 82                        zoneCounts.put(zone, Integer.valueOf(count.intValue()+1));
 83                    }else {
 84                        zoneCounts.put(zone, Integer.valueOf(1));
 85                    }
 86                    List<Pair<String, String>> list = instancesByStatus.get(status);
 87 
 88                    if(list == null){
 89                        list = new ArrayList<Pair<String,String>>();
 90                        instancesByStatus.put(status, list);
 91                    }
 92                    list.add(new Pair<String, String>(id, url));
 93                }
 94                StringBuilder buf = new StringBuilder();
 95                for (Iterator<Entry<String, Integer>> iter =
 96                    amiCounts.entrySet().iterator(); iter.hasNext();) {
 97                    Entry<String, Integer> entry = iter.next();
 98                    buf.append("<b>").append(entry.getKey()).append("</b> (").append(entry.getValue()).append("), ");
 99                }
100                out.println("<td>" + buf.toString() + "</td>");
101                buf = new StringBuilder();
102                for (Iterator<Entry<String, Integer>> iter =
103                    zoneCounts.entrySet().iterator(); iter.hasNext();) {
104                    Entry<String, Integer> entry = iter.next();
105                    buf.append("<b>").append(entry.getKey()).append("</b> (").append(entry.getValue()).append("), ");
106                }
107                out.println("<td>" + buf.toString() + "</td>");
108                buf = new StringBuilder();
109                for (Iterator<Entry<InstanceStatus, List<Pair<String,String>>>> iter =
110                    instancesByStatus.entrySet().iterator(); iter.hasNext();) {
111                    Entry<InstanceStatus, List<Pair<String,String>>> entry = iter.next();
112                    List<Pair<String, String>> value = entry.getValue();
113                    InstanceStatus status = entry.getKey();
114                    if(status != InstanceStatus.UP){
115                        buf.append("<font color=red size=+1><b>");
116                    }
117                    buf.append("<b>").append(status.name()).append("</b> (").append(value.size()).append(") - ");
118                    if(status != InstanceStatus.UP){
119                        buf.append("</font></b>");
120                    }
121 
122                    for(Pair<String,String> p : value) {
123                        String id = p.first();
124                        String url = p.second();
125                        if(url != null && url.startsWith("http")){
126                            buf.append("<a href=\"").append(url).append("\">");
127                        }else {
128                            url = null;
129                        }
130                        buf.append(id);
131                        if(url != null){
132                            buf.append("</a>");
133                        }
134                        buf.append(", ");
135                    }
136                }
137                out.println("<td>" + buf.toString() + "</td></tr>");
138            }
139            %>
140            </tbody>
141            </table>
142       </div>
143       <div>
144       <div class="sectionTitle">General Info</div>
145       <table id='generalInfo' class="stripeable">
146           <tr><th>Name</th><th>Value</th></tr>
147            <%
148            StatusInfo statusInfo = (new StatusUtil(serverContext)).getStatusInfo();
149            Map<String,String> genMap = statusInfo.getGeneralStats();
150            for (Map.Entry<String,String> entry : genMap.entrySet()) {
151              out.print("<tr>");
152              out.print("<td>" + entry.getKey() +  "</td><td>" + entry.getValue() + "</td>");
153              out.print("</tr>");
154            }
155            Map<String,String> appMap = statusInfo.getApplicationStats();
156            for (Map.Entry<String,String> entry : appMap.entrySet()) {
157              out.print("<tr>");
158              out.print("<td>" + entry.getKey() +  "</td><td>" + entry.getValue() + "</td>");
159              out.print("</tr>");
160            }
161            %>
162            </table>
163       </div>
164       <div>
165       <div class="sectionTitle">Instance Info</div>
166         <table id='instanceInfo' class="stripeable">
167           <tr><th>Name</th><th>Value</th></tr>
168            <%
169            InstanceInfo instanceInfo = statusInfo.getInstanceInfo();
170            Map<String,String> instanceMap = new HashMap<String,String>();
171            instanceMap.put("ipAddr", instanceInfo.getIPAddr());
172            instanceMap.put("status", instanceInfo.getStatus().toString());
173            if(instanceInfo.getDataCenterInfo().getName() == DataCenterInfo.Name.Amazon) {
174                AmazonInfo info = (AmazonInfo) instanceInfo.getDataCenterInfo();
175                instanceMap.put("availability-zone", info.get(AmazonInfo.MetaDataKey.availabilityZone));
176                instanceMap.put("public-ipv4", info.get(AmazonInfo.MetaDataKey.publicIpv4));
177                instanceMap.put("instance-id", info.get(AmazonInfo.MetaDataKey.instanceId));
178                instanceMap.put("public-hostname", info.get(AmazonInfo.MetaDataKey.publicHostname));
179                instanceMap.put("ami-id", info.get(AmazonInfo.MetaDataKey.amiId));
180                instanceMap.put("instance-type", info.get(AmazonInfo.MetaDataKey.instanceType));
181            }
182            for (Map.Entry<String,String> entry : instanceMap.entrySet()) {
183              out.print("<tr>");
184              out.print("<td>" + entry.getKey() +  "</td><td>" + entry.getValue() + "</td>");
185              out.print("</tr>");
186            }
187            %>
188            </table>
189     </div>
190 
191   </body>
192 </html>
View Code

5、服務注冊的整體流程圖

下面通過一張圖來看看服務實例注冊的整個流程。

六、抓取注冊表

1、Eureka Client 啟動時全量抓取注冊表

客戶端啟動初始化 DiscoveryClient 時,其中有段代碼如下:這一步調用 fetchRegistry 就是在啟動時全量抓取注冊表緩存到本地中。

 1 if (clientConfig.shouldFetchRegistry()) {
 2     try {
 3         // 拉取注冊表:全量抓取和增量抓取
 4         boolean primaryFetchRegistryResult = fetchRegistry(false);
 5         if (!primaryFetchRegistryResult) {
 6             logger.info("Initial registry fetch from primary servers failed");
 7         }
 8         boolean backupFetchRegistryResult = true;
 9         if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
10             backupFetchRegistryResult = false;
11             logger.info("Initial registry fetch from backup servers failed");
12         }
13         if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
14             throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
15         }
16     } catch (Throwable th) {
17         logger.error("Fetch registry error at startup: {}", th.getMessage());
18         throw new IllegalStateException(th);
19     }
20 }

進入 fetchRegistry 方法,可以看到,首先獲取本地的 Applications,如果為空就會調用 getAndStoreFullRegistry 方法全量抓取注冊表並緩存到本地。

 1 private boolean fetchRegistry(boolean forceFullRegistryFetch) {
 2     Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
 3 
 4     try {
 5         // 獲取本地的應用實例
 6         Applications applications = getApplications();
 7 
 8         if (clientConfig.shouldDisableDelta()
 9                 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
10                 || forceFullRegistryFetch
11                 || (applications == null)
12                 || (applications.getRegisteredApplications().size() == 0)
13                 || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
14         {
15             // 全量抓取注冊表
16             getAndStoreFullRegistry();
17         } else {
18             // 增量更新注冊表
19             getAndUpdateDelta(applications);
20         }
21         applications.setAppsHashCode(applications.getReconcileHashCode());
22         logTotalInstances();
23     } catch (Throwable e) {
24         logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
25                 appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
26         return false;
27     } finally {
28         if (tracer != null) {
29             tracer.stop();
30         }
31     }
32 
33     // 發出緩存刷新的通知
34     onCacheRefreshed();
35 
36     // Update remote status based on refreshed data held in the cache
37     updateInstanceRemoteStatus();
38 
39     // registry was fetched successfully, so return true
40     return true;
41 }
View Code

進入 getAndStoreFullRegistry  方法可以發現,就是調用 GET /apps 接口抓取全量注冊表,因此等會服務端就從這個入口進去看抓取全量注冊表的邏輯。注冊表抓取回來之后,就放到本地變量 localRegionApps 中。localRegionApps 的類型是 AtomicReference<Applications>,實例信息是存儲在 Applications 中的。

 1 private void getAndStoreFullRegistry() throws Throwable {
 2     long currentUpdateGeneration = fetchRegistryGeneration.get();
 3 
 4     logger.info("Getting all instance registry info from the eureka server");
 5 
 6     Applications apps = null;
 7     EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
 8             // 調用 server GET /apps 全量抓取注冊表
 9             ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
10             : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
11     if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
12         apps = httpResponse.getEntity();
13     }
14     logger.info("The response status is {}", httpResponse.getStatusCode());
15 
16     if (apps == null) {
17         logger.error("The application is null for some reason. Not storing this information");
18     } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
19         // 將 Applications 緩存到本地
20         localRegionApps.set(this.filterAndShuffle(apps));
21         logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
22     } else {
23         logger.warn("Not updating applications as another thread is updating it already");
24     }
25 }
View Code

2、Eureka Server 注冊表多級緩存機制

① 全量抓取注冊表的接口

全量抓取注冊表的接口是 GET /apps,跟找注冊接口是類似的,最終可以找到 ApplicationsResource 的 getContainers 方法就是全量抓取注冊表的入口。

  • 可以看出,我們可以通過請求頭來指定返回 xml 格式還是 json 格式,可以指定是否要壓縮返回等。
  • 然后創建了全量緩存的 Key
  • 接着根據緩存的 key 從 responseCache 中全量抓取注冊表
 1 @GET
 2 public Response getContainers(@PathParam("version") String version,
 3                               @HeaderParam(HEADER_ACCEPT) String acceptHeader,
 4                               @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
 5                               @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
 6                               @Context UriInfo uriInfo,
 7                               @Nullable @QueryParam("regions") String regionsStr) {
 8     // 省略部分代碼...
 9 
10     // JSON 類型
11     KeyType keyType = Key.KeyType.JSON;
12     String returnMediaType = MediaType.APPLICATION_JSON;
13     if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
14         keyType = Key.KeyType.XML;
15         returnMediaType = MediaType.APPLICATION_XML;
16     }
17 
18     // 全量注冊表的緩存key
19     Key cacheKey = new Key(Key.EntityType.Application,
20             ResponseCacheImpl.ALL_APPS,
21             keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
22     );
23 
24     Response response;
25     if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
26         // 壓縮返回
27         response = Response.ok(responseCache.getGZIP(cacheKey))
28                 .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
29                 .header(HEADER_CONTENT_TYPE, returnMediaType)
30                 .build();
31     } else {
32         // 根據緩存 key 從 responseCache 獲取全量注冊表
33         response = Response.ok(responseCache.get(cacheKey))
34                 .build();
35     }
36     CurrentRequestVersion.remove();
37     return response;
38 }

② ResponseCache 多級緩存讀取

ResponseCache 就是 eureka server 讀取注冊表的核心組件,它的內部采用了多級緩存的機制來快速響應客戶端抓取注冊表的請求,下面就來看看 ResponseCache。

緩存讀取的流程:

  • 如果設置了使用只讀緩存(默認true),就先從只讀緩存 readOnlyCacheMap 中讀取;readOnlyCacheMap 使用 ConcurrentHashMap 實現,ConcurrentHashMap 支持並發訪問,讀取速度很快。
  • 如果讀寫緩存中沒有,就從讀寫緩存 readWriteCacheMap 中讀取,讀取出來后並寫入到只讀緩存中;readWriteCacheMap 使用 google guava 的 LoadingCache 實現,LoadingCache 支持在沒有元素的時候使用 CacheLoader 加載元素。
  • 如果沒有開啟使用只讀緩存,就直接從讀寫緩存中獲取。
 1 public String get(final Key key) {
 2     return get(key, shouldUseReadOnlyResponseCache);
 3 }
 4 
 5 ////////////////////////////////////////////////////
 6 
 7 String get(final Key key, boolean useReadOnlyCache) {
 8     // => getValue
 9     Value payload = getValue(key, useReadOnlyCache);
10     if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
11         return null;
12     } else {
13         return payload.getPayload();
14     }
15 }
16 
17 ////////////////////////////////////////////////////
18 
19 Value getValue(final Key key, boolean useReadOnlyCache) {
20     Value payload = null;
21     try {
22         if (useReadOnlyCache) {
23             // 開啟使用只讀緩存,則先從只讀緩存讀取
24             // readOnlyCacheMap => ConcurrentHashMap<Key, Value>
25             final Value currentPayload = readOnlyCacheMap.get(key);
26             if (currentPayload != null) {
27                 payload = currentPayload;
28             } else {
29                 // 只讀緩存中沒有,則從讀寫緩存中讀取,然后放入只讀緩存中
30                 // readWriteCacheMap => LoadingCache<Key, Value>
31                 payload = readWriteCacheMap.get(key);
32                 readOnlyCacheMap.put(key, payload);
33             }
34         } else {
35             // 未開啟只讀緩存,就從讀寫緩存中讀取
36             payload = readWriteCacheMap.get(key);
37         }
38     } catch (Throwable t) {
39         logger.error("Cannot get value for key : {}", key, t);
40     }
41     return payload;
42 }

③ ResponseCache 初始化

分析 eureka server EurekaBootStrap 啟動初始化時,最后有一步去初始化 eureka server 上下文,它里面就會去初始化注冊表,初始化注冊表的時候就會初始化 ResponseCache,這里就來分析下這個初始化干了什么。

  • 主要就是使用 google guava cache 構造了一個讀寫緩存 readWriteCacheMap,初始容量為 1000。注意這個讀寫緩存的特性:每隔 180 秒定時過期,然后元素不存在的時候就會使用 CacheLoader 從注冊表中讀取。
  • 接着如果配置了使用只讀緩存,還會開啟一個定時任務,每隔30秒將讀寫緩存 readWriteCacheMap 的數據同步到只讀緩存 readOnlyCacheMap。
 1 ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
 2     this.serverConfig = serverConfig;
 3     this.serverCodecs = serverCodecs;
 4     // 是否使用只讀緩存,默認為 true
 5     this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
 6     // 保存注冊表
 7     this.registry = registry;
 8     // 緩存更新間隔時間,默認30秒
 9     long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
10     // 使用 google guava cache 構造一個讀寫緩存
11     this.readWriteCacheMap =
12             // 初始容量為1000
13             CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
14                     // 緩存的數據在寫入多久后過期,默認180秒,也就是說 readWriteCacheMap 會定時過期
15                     .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
16                     .removalListener(new RemovalListener<Key, Value>() {
17                         @Override
18                         public void onRemoval(RemovalNotification<Key, Value> notification) {
19                             Key removedKey = notification.getKey();
20                             if (removedKey.hasRegions()) {
21                                 Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
22                                 regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
23                             }
24                         }
25                     })
26                     // 當key對應的元素不存在時,使用定義 CacheLoader 加載元素
27                     .build(new CacheLoader<Key, Value>() {
28                         @Override
29                         public Value load(Key key) throws Exception {
30                             if (key.hasRegions()) {
31                                 Key cloneWithNoRegions = key.cloneWithoutRegions();
32                                 regionSpecificKeys.put(cloneWithNoRegions, key);
33                             }
34                             // 獲取元素
35                             Value value = generatePayload(key);
36                             return value;
37                         }
38                     });
39 
40     if (shouldUseReadOnlyResponseCache) {
41         // 如果配置了使用只讀緩存,就開啟一個定時任務,定期將 readWriteCacheMap 的數據同步到 readOnlyCacheMap 中
42         // 默認間隔時間是 30 秒
43         timer.schedule(getCacheUpdateTask(),
44                 new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
45                         + responseCacheUpdateIntervalMs),
46                 responseCacheUpdateIntervalMs);
47     }
48 
49     try {
50         Monitors.registerObject(this);
51     } catch (Throwable e) {
52         logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
53     }
54 }

generatePayload 方法:

 1 private Value generatePayload(Key key) {
 2     Stopwatch tracer = null;
 3     try {
 4         String payload;
 5         switch (key.getEntityType()) {
 6             case Application:
 7                 boolean isRemoteRegionRequested = key.hasRegions();
 8 
 9                 // 獲取所有應用
10                 if (ALL_APPS.equals(key.getName())) {
11                     if (isRemoteRegionRequested) {
12                         tracer = serializeAllAppsWithRemoteRegionTimer.start();
13                         payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
14                     } else {
15                         tracer = serializeAllAppsTimer.start();
16                         // 從注冊表讀取所有服務實例
17                         payload = getPayLoad(key, registry.getApplications());
18                     }
19                 }
20                 // 增量獲取應用
21                 else if (ALL_APPS_DELTA.equals(key.getName())) {
22                     if (isRemoteRegionRequested) {
23                         tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
24                         versionDeltaWithRegions.incrementAndGet();
25                         versionDeltaWithRegionsLegacy.incrementAndGet();
26                         payload = getPayLoad(key,
27                                 registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
28                     } else {
29                         tracer = serializeDeltaAppsTimer.start();
30                         versionDelta.incrementAndGet();
31                         versionDeltaLegacy.incrementAndGet();
32                         payload = getPayLoad(key, registry.getApplicationDeltas());
33                     }
34                 } else {
35                     tracer = serializeOneApptimer.start();
36                     payload = getPayLoad(key, registry.getApplication(key.getName()));
37                 }
38                 break;
39             case VIP:
40             case SVIP:
41                 tracer = serializeViptimer.start();
42                 payload = getPayLoad(key, getApplicationsForVip(key, registry));
43                 break;
44             default:
45                 logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
46                 payload = "";
47                 break;
48         }
49         return new Value(payload);
50     } finally {
51         if (tracer != null) {
52             tracer.stop();
53         }
54     }
55 }
View Code

3、Eureka Server 注冊表多級緩存過期機制

這節來總結下 eureka server 注冊表多級緩存的過期時機,其實前面都已經分析過了。

① 主動過期

分析服務注冊時已經說過,服務注冊完成后,調用了 invalidateCache 來失效緩存,進去可以看到就是將讀寫緩存 readWriteCacheMap 中的服務、所有服務、增量服務的緩存失效掉。

那這里就要注意了,如果服務注冊、下線、故障之類的,這里只是失效了讀寫緩存,然后可能要間隔30秒才能同步到只讀緩存 readOnlyCacheMap,那么其它客戶端可能要隔30秒后才能感知到。

1 private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
2     // invalidate cache
3     responseCache.invalidate(appName, vipAddress, secureVipAddress);
4 }

緩存失效:

 1 @Override
 2 public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
 3     for (Key.KeyType type : Key.KeyType.values()) {
 4         for (Version v : Version.values()) {
 5             invalidate(
 6                     // 失效服務的緩存
 7                     new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
 8                     new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
 9                     // 失效所有 APP 的緩存
10                     new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
11                     new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
12                     // 失效增量 APP 的緩存
13                     new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
14                     new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
15             );
16             if (null != vipAddress) {
17                 invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
18             }
19             if (null != secureVipAddress) {
20                 invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
21             }
22         }
23     }
24 }
25 
26 public void invalidate(Key... keys) {
27     for (Key key : keys) {
28         logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
29                 key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
30 
31         // 失效讀寫緩存
32         readWriteCacheMap.invalidate(key);
33         Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
34         if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
35             for (Key keysWithRegion : keysWithRegions) {
36                 logger.debug("Invalidating the response cache key : {} {} {} {} {}",
37                         key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
38                 readWriteCacheMap.invalidate(keysWithRegion);
39             }
40         }
41     }
42 }
View Code

② 定時過期

讀寫緩存 readWriteCacheMap 在構建的時候,指定了一個自動過期的時間,默認值是180秒,所以往 readWriteCacheMap 中放入一個數據過后,等180秒過后,它就自動過期了。然后下次讀取的時候發現緩存中沒有這個 key,就會使用 CacheLoader 重新加載到這個緩存中。

這種定時過期機制就是每隔一段時間來同步注冊表與緩存的數據。

③ 被動過期

初始化 ResponseCache 時,如果啟用了只讀緩存,就會創建一個定時任務(每隔30秒運行一次)來同步 readWriteCacheMap 與 readOnlyCacheMap 中的數據,對於 readOnlyCacheMap 來說這就是一種被動過期。

 1 private TimerTask getCacheUpdateTask() {
 2     return new TimerTask() {
 3         @Override
 4         public void run() {
 5             logger.debug("Updating the client cache from response cache");
 6             for (Key key : readOnlyCacheMap.keySet()) {
 7                 if (logger.isDebugEnabled()) {
 8                     logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
 9                             key.getEntityType(), key.getName(), key.getVersion(), key.getType());
10                 }
11                 try {
12                     CurrentRequestVersion.set(key.getVersion());
13                     // 獲取讀寫緩存中的數據
14                     Value cacheValue = readWriteCacheMap.get(key);
15                     // 獲取只讀緩存中的數據
16                     Value currentCacheValue = readOnlyCacheMap.get(key);
17                     // 如果 readOnlyCacheMap 中緩存的值與 readWriteCacheMap 緩存的值不同,就用 readWriteCacheMap 的值覆蓋 readOnlyCacheMap 的值
18                     if (cacheValue != currentCacheValue) {
19                         readOnlyCacheMap.put(key, cacheValue);
20                     }
21                 } catch (Throwable th) {
22                     logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
23                 } finally {
24                     CurrentRequestVersion.remove();
25                 }
26             }
27         }
28     };
29 }
View Code

4、Eureka Client 定時拉取增量注冊表

① 客戶端注冊表刷新定時任務

前面介紹 DiscoveryClient 初始化時,在初始化調度任務這一步,如果要抓取注冊表,就會創建一個調度器每隔 30 秒執行一次 cacheRefreshTask,它對 CacheRefreshThread 做了封裝,進去可以看到,它其實就是調用 refreshRegistry 方法刷新注冊表。

 1 private void initScheduledTasks() {
 2     if (clientConfig.shouldFetchRegistry()) {
 3         // 抓取注冊表的間隔時間,默認30秒
 4         int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
 5         // 刷新緩存調度器延遲時間擴大倍數,在任務超時的時候,將擴大延遲時間
 6         // 這在出現網絡抖動、eureka-sever 不可用時,可以避免頻繁發起無效的調度
 7         int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
 8         // 注冊表刷新的定時任務
 9         cacheRefreshTask = new TimedSupervisorTask(
10                 "cacheRefresh",
11                 scheduler,
12                 cacheRefreshExecutor,
13                 registryFetchIntervalSeconds,
14                 TimeUnit.SECONDS,
15                 expBackOffBound,
16                 new CacheRefreshThread() // 刷新注冊表的任務
17         );
18         // 30秒后開始調度刷新注冊表的任務
19         scheduler.schedule(
20                 cacheRefreshTask,
21                 registryFetchIntervalSeconds, TimeUnit.SECONDS);
22     }
23 }

refreshRegistry 方法:

 1 class CacheRefreshThread implements Runnable {
 2     public void run() {
 3         refreshRegistry();
 4     }
 5 }
 6 
 7 @VisibleForTesting
 8 void refreshRegistry() {
 9     try {
10         boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
11 
12         boolean remoteRegionsModified = false;
13         // This makes sure that a dynamic change to remote regions to fetch is honored.
14         String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
15         if (null != latestRemoteRegions) {
16             String currentRemoteRegions = remoteRegionsToFetch.get();
17             if (!latestRemoteRegions.equals(currentRemoteRegions)) {
18                 // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
19                 synchronized (instanceRegionChecker.getAzToRegionMapper()) {
20                     if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
21                         String[] remoteRegions = latestRemoteRegions.split(",");
22                         remoteRegionsRef.set(remoteRegions);
23                         instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
24                         remoteRegionsModified = true;
25                     } else {
26                         logger.info("Remote regions to fetch modified concurrently," +
27                                 " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
28                     }
29                 }
30             } else {
31                 // Just refresh mapping to reflect any DNS/Property change
32                 instanceRegionChecker.getAzToRegionMapper().refreshMapping();
33             }
34         }
35 
36         // 抓取注冊表
37         boolean success = fetchRegistry(remoteRegionsModified);
38         if (success) {
39             registrySize = localRegionApps.get().size();
40             lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
41         }
42 
43         if (logger.isDebugEnabled()) {
44             StringBuilder allAppsHashCodes = new StringBuilder();
45             allAppsHashCodes.append("Local region apps hashcode: ");
46             allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
47             allAppsHashCodes.append(", is fetching remote regions? ");
48             allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
49             for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
50                 allAppsHashCodes.append(", Remote region: ");
51                 allAppsHashCodes.append(entry.getKey());
52                 allAppsHashCodes.append(" , apps hashcode: ");
53                 allAppsHashCodes.append(entry.getValue().getAppsHashCode());
54             }
55             logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
56                     allAppsHashCodes);
57         }
58     } catch (Throwable e) {
59         logger.error("Cannot fetch registry from server", e);
60     }
61 }
View Code

refreshRegistry 里面又調用了 fetchRegistry 抓取注冊表,fetchRegistry 在前面分析全量抓取注冊表時已經展示過了。全量抓取注冊表之后,本地 applications 不為空了,這時就會走 getAndUpdateDelta 增量更新的方法。

 1 private boolean fetchRegistry(boolean forceFullRegistryFetch) {
 2     Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
 3 
 4     try {
 5         // 獲取本地的應用實例
 6         Applications applications = getApplications();
 7 
 8         if (clientConfig.shouldDisableDelta()
 9                 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
10                 || forceFullRegistryFetch
11                 || (applications == null)
12                 || (applications.getRegisteredApplications().size() == 0)
13                 || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
14         {
15             logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
16             logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
17             logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
18             logger.info("Application is null : {}", (applications == null));
19             logger.info("Registered Applications size is zero : {}",
20                     (applications.getRegisteredApplications().size() == 0));
21             logger.info("Application version is -1: {}", (applications.getVersion() == -1));
22             // 全量抓取注冊表
23             getAndStoreFullRegistry();
24         } else {
25             // 增量更新注冊表
26             getAndUpdateDelta(applications);
27         }
28         applications.setAppsHashCode(applications.getReconcileHashCode());
29         logTotalInstances();
30     } catch (Throwable e) {
31         logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
32                 appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
33         return false;
34     } finally {
35         if (tracer != null) {
36             tracer.stop();
37         }
38     }
39 
40     // 發出刷新緩存的通知
41     onCacheRefreshed();
42 
43     // Update remote status based on refreshed data held in the cache
44     updateInstanceRemoteStatus();
45 
46     // registry was fetched successfully, so return true
47     return true;
48 }
View Code

② 增量更新本地注冊表

接着看 getAndUpdateDelta 增量更新方法:

  • 首先調用 eureka server GET /apps/delta 接口獲取增量的注冊表
  • 如果增量的注冊表為空,就會調用 getAndStoreFullRegistry 方法全量抓取注冊表
  • 增量注冊表不為空,就將其合並到本地注冊表中
  • 然后根據本地注冊表的 applications 重新計算一個 hash 值
  • eureka server 返回的 delta 中包含一個 appsHashCode,代表了 eureka server 端的注冊表的 hash 值,如果與本地計算的 hash 值不同,則說明本地注冊表與server端注冊表不一致,那就會全量拉取注冊表更新到本地緩存中

可以看到,eureka 增量抓取的思路來更新本地緩存,並使用了 hash 值來保證服務端與本地的數據一致性。在分布式系統里,要進行數據同步,采用 hash 值比對的思想,這是值得學習的一個思路。

 1 private void getAndUpdateDelta(Applications applications) throws Throwable {
 2     long currentUpdateGeneration = fetchRegistryGeneration.get();
 3 
 4     Applications delta = null;
 5     // 調用遠程接口增量抓取:GET apps/delta
 6     EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
 7     if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
 8         delta = httpResponse.getEntity();
 9     }
10 
11     // 如果增量抓取的數據為空,就會進行一次全量抓取
12     if (delta == null) {
13         logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
14                 + "Hence got the full registry.");
15         getAndStoreFullRegistry();
16     }
17 
18     else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
19         logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
20         String reconcileHashCode = "";
21         // 加鎖更新本地注冊表
22         if (fetchRegistryUpdateLock.tryLock()) {
23             try {
24                 // 抓取到增量的注冊表后,跟本地的注冊表合並
25                 updateDelta(delta);
26                 // 注冊表合並完成后,根據本地 applications 計算一個 hash 值
27                 reconcileHashCode = getReconcileHashCode(applications);
28             } finally {
29                 fetchRegistryUpdateLock.unlock();
30             }
31         } else {
32             logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
33         }
34         // delta 中會返回 server 端注冊表的 hash 值,如果和本地計算出來的 hash 值不一樣,
35         // 說明本地注冊表跟 server 端注冊表不一樣,就會從 server 全量拉取注冊表更新到本地緩存
36         if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
37             reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
38         }
39     } else {
40         logger.warn("Not updating application delta as another thread is updating it already");
41         logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
42     }
43 }

③ 增量注冊表合並到本地

再來看下增量注冊表合並到本地發方法 updateDelta,其實就是遍歷返回來的服務實例,然后根據實例的  ActionType 分別處理,比如前面分析實例注冊時 ActionType 就設置了 ADDED,后面分析實例下線時還可以看到設置了 ActionType 為 DELETED。

 1 private void updateDelta(Applications delta) {
 2     int deltaCount = 0;
 3     // 變量增量注冊的服務
 4     for (Application app : delta.getRegisteredApplications()) {
 5         // 遍歷實例
 6         for (InstanceInfo instance : app.getInstances()) {
 7             Applications applications = getApplications();
 8             String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
 9             if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
10                 Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
11                 if (null == remoteApps) {
12                     remoteApps = new Applications();
13                     remoteRegionVsApps.put(instanceRegion, remoteApps);
14                 }
15                 applications = remoteApps;
16             }
17 
18             ++deltaCount;
19             // ADDED 新增的實例:服務注冊
20             if (ActionType.ADDED.equals(instance.getActionType())) {
21                 Application existingApp = applications.getRegisteredApplications(instance.getAppName());
22                 if (existingApp == null) {
23                     applications.addApplication(app);
24                 }
25                 logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
26                 applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
27             }
28             // MODIFIED 變更的實例:續約,信息變更
29             else if (ActionType.MODIFIED.equals(instance.getActionType())) {
30                 Application existingApp = applications.getRegisteredApplications(instance.getAppName());
31                 if (existingApp == null) {
32                     applications.addApplication(app);
33                 }
34                 logger.debug("Modified instance {} to the existing apps ", instance.getId());
35 
36                 applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
37 
38             }
39             // DELETED 移除實例:實例下線、故障
40             else if (ActionType.DELETED.equals(instance.getActionType())) {
41                 Application existingApp = applications.getRegisteredApplications(instance.getAppName());
42                 if (existingApp != null) {
43                     logger.debug("Deleted instance {} to the existing apps ", instance.getId());
44                     existingApp.removeInstance(instance);
45                     /*
46                      * We find all instance list from application(The status of instance status is not only the status is UP but also other status)
47                      * if instance list is empty, we remove the application.
48                      */
49                     if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
50                         applications.removeApplication(existingApp);
51                     }
52                 }
53             }
54         }
55     }
56     logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
57 
58     getApplications().setVersion(delta.getVersion());
59     getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
60 
61     for (Applications applications : remoteRegionVsApps.values()) {
62         applications.setVersion(delta.getVersion());
63         applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
64     }
65 }
View Code

④ 客戶端實例存儲

可以看到,全量更新或者增量更新都是在更新本地的 Applications,可以看下 Applications 和 Application 的結構:

  • 應用實例是被封裝到 Application 中的,實例列表是一個 LinkedHashSet,並用一個 ConcurrentHashMap 緩存了實例ID和實例的關系,便於快速檢索。
  • 應用 Application 再被封裝到 Applications 中,應用列表是一個 ConcurrentLinkedQueue,並用 ConcurrentHashMap 緩存了應用名稱和 Application 的關系。
  • Applications 中還保存了這個應用的 appsHashCode  值,這個應該是和 eureka server 的 hash 是一致的。
 1 private final AbstractQueue<Application> applications;
 2 private final Map<String, Application> appNameApplicationMap;
 3 private final Map<String, VipIndexSupport> virtualHostNameAppMap;
 4 private final Map<String, VipIndexSupport> secureVirtualHostNameAppMap;
 5 
 6 public Applications(@JsonProperty("appsHashCode") String appsHashCode,
 7         @JsonProperty("versionDelta") Long versionDelta,
 8         @JsonProperty("application") List<Application> registeredApplications) {
 9     // 實例應用隊列
10     this.applications = new ConcurrentLinkedQueue<Application>();
11     // key 為 appName,Value 為對應的應用
12     this.appNameApplicationMap = new ConcurrentHashMap<String, Application>();
13     this.virtualHostNameAppMap = new ConcurrentHashMap<String, VipIndexSupport>();
14     this.secureVirtualHostNameAppMap = new ConcurrentHashMap<String, VipIndexSupport>();
15     // hash 值
16     this.appsHashCode = appsHashCode;
17     this.versionDelta = versionDelta;
18 
19     for (Application app : registeredApplications) {
20         this.addApplication(app);
21     }
22 }
 1 public class Application {
 2 
 3     private static Random shuffleRandom = new Random();
 4 
 5     private String name;
 6 
 7     // 是否變更
 8     private volatile boolean isDirty = false;
 9     // 實例信息
10     private final Set<InstanceInfo> instances;
11 
12     private final AtomicReference<List<InstanceInfo>> shuffledInstances;
13     // map 結構實例
14     private final Map<String, InstanceInfo> instancesMap;
15 
16     public Application() {
17         instances = new LinkedHashSet<InstanceInfo>();
18         instancesMap = new ConcurrentHashMap<String, InstanceInfo>();
19         shuffledInstances = new AtomicReference<List<InstanceInfo>>();
20     }
21 
22 }

5、Eureka Server 返回增量注冊表

① 抓取增量注冊表的入口

從前分析知道,增量抓取注冊表單接口為 GET/apps/delta,可以很容易找到位於 ApplicationsResource 下的 getContainerDifferential 就是抓取增量注冊表的入口。

可以看到,跟抓取注冊表類似,也是先構建一個緩存的Key,然后從多級緩存 ResponseCache 中獲取。這里的key是 ALL_APPS_DELTA。

 1 @Path("delta")
 2 @GET
 3 public Response getContainerDifferential(
 4         @PathParam("version") String version,
 5         @HeaderParam(HEADER_ACCEPT) String acceptHeader,
 6         @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
 7         @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
 8         @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
 9 
10     Key cacheKey = new Key(Key.EntityType.Application,
11             // 增量服務:ALL_APPS_DELTA
12             ResponseCacheImpl.ALL_APPS_DELTA,
13             keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
14     );
15 
16     final Response response;
17 
18     if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
19          response = Response.ok(responseCache.getGZIP(cacheKey))
20                 .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
21                 .header(HEADER_CONTENT_TYPE, returnMediaType)
22                 .build();
23     } else {
24         // 從多級緩存中獲取增量注冊表
25         response = Response.ok(responseCache.get(cacheKey)).build();
26     }
27 
28     CurrentRequestVersion.remove();
29     return response;
30 }

與全量抓取注冊表,讀取多級緩存的流程都是類似的,唯一的區別就是 Key 不同,全量抓取時是 ALL_APPS,增量抓取時 ALL_APPS_DELTA,區別就在於 readWriteCacheMap 加載數據到緩存中時走的邏輯不一樣,可以再看看下面的 generatePayload 方法就知道了。

 1 private Value generatePayload(Key key) {
 2     Stopwatch tracer = null;
 3     try {
 4         String payload;
 5         switch (key.getEntityType()) {
 6             case Application:
 7                 boolean isRemoteRegionRequested = key.hasRegions();
 8 
 9                 // 獲取所有應用
10                 if (ALL_APPS.equals(key.getName())) {
11                     if (isRemoteRegionRequested) {
12                         tracer = serializeAllAppsWithRemoteRegionTimer.start();
13                         payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
14                     } else {
15                         tracer = serializeAllAppsTimer.start();
16                         // 從注冊表讀取所有服務實例
17                         payload = getPayLoad(key, registry.getApplications());
18                     }
19                 }
20                 // 增量獲取應用
21                 else if (ALL_APPS_DELTA.equals(key.getName())) {
22                     if (isRemoteRegionRequested) {
23                         tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
24                         versionDeltaWithRegions.incrementAndGet();
25                         versionDeltaWithRegionsLegacy.incrementAndGet();
26                         payload = getPayLoad(key,
27                                 registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
28                     } else {
29                         tracer = serializeDeltaAppsTimer.start();
30                         versionDelta.incrementAndGet();
31                         versionDeltaLegacy.incrementAndGet();
32                         // 獲取增量注冊表
33                         payload = getPayLoad(key, registry.getApplicationDeltas());
34                     }
35                 } else {
36                     tracer = serializeOneApptimer.start();
37                     payload = getPayLoad(key, registry.getApplication(key.getName()));
38                 }
39                 break;
40             case VIP:
41             case SVIP:
42                 tracer = serializeViptimer.start();
43                 payload = getPayLoad(key, getApplicationsForVip(key, registry));
44                 break;
45             default:
46                 logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
47                 payload = "";
48                 break;
49         }
50         return new Value(payload);
51     } finally {
52         if (tracer != null) {
53             tracer.stop();
54         }
55     }
56 }
View Code

② 增量注冊表的設計

之后會調用 registry.getApplicationDeltas() 獲取增量注冊表,進去可以發現,增量的注冊表其實就是 recentlyChangedQueue 這個最近變更隊列里的數據,通過遍歷 recentlyChangedQueue 生成 Applications。

在返回 apps 之前,先獲取了本地所有應用,並計算了一個 hash 值,然后設置到 apps 中。這就和前一節對應起來了,抓取增量注冊表時,服務端會返回一個全量注冊表的 hash 值,然后客戶端將增量注冊表合並到本地后,再根據本地的全量注冊表計算一個 hash 值,然后將兩個 hash 值做對比,如果不一致,說明服務端和客戶端的數據是不一致的,這時客戶端就會重新向服務端全量拉取注冊表到本地。

 1 public Applications getApplicationDeltas() {
 2     GET_ALL_CACHE_MISS_DELTA.increment();
 3     Applications apps = new Applications();
 4     apps.setVersion(responseCache.getVersionDelta().get());
 5     Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
 6     write.lock();
 7     try {
 8         // 最近變更隊列 recentlyChangedQueue,這就是增量的注冊表
 9         // recentlyChangedQueue 只保留了最近3分鍾有變化的實例,如實例上線、下線、故障剔除
10         Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
11         logger.debug("The number of elements in the delta queue is : {}",
12                 this.recentlyChangedQueue.size());
13         while (iter.hasNext()) {
14             Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
15             InstanceInfo instanceInfo = lease.getHolder();
16             logger.debug(
17                     "The instance id {} is found with status {} and actiontype {}",
18                     instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
19             Application app = applicationInstancesMap.get(instanceInfo
20                     .getAppName());
21             if (app == null) {
22                 app = new Application(instanceInfo.getAppName());
23                 applicationInstancesMap.put(instanceInfo.getAppName(), app);
24                 apps.addApplication(app);
25             }
26             app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
27         }
28 
29         // 省略部分代碼...
30 
31         // 獲取所有應用實例
32         Applications allApps = getApplications(!disableTransparentFallback);
33         // 根據所有應用實例計算一個 hash 值,並設置到要返回的 apps 中
34         apps.setAppsHashCode(allApps.getReconcileHashCode());
35         return apps;
36     } finally {
37         write.unlock();
38     }
39 }

再來看看 recentlyChangedQueue 是如何設計來保存增量信息的。

再看看前面提到過的注冊表初始化的構造方法,最后創建了一個每隔30秒執行一次的定時調度任務。這個任務會遍歷 recentlyChangedQueue 這個隊列,判斷每個元素的最后更新時間是否超過了 180 秒,如果超過了,就會從隊列中移除這個元素。超過 180 秒的實例變更信息,就會認為這些變更信息都已經同步到客戶端了,因為客戶端是每隔30秒拉取一次增量注冊表的。因此客戶端多次拉取增量注冊表可能拉取到同樣的變更信息,不過最終合並到本地都是一樣的。

因此可以看出,eureka 利用 recentlyChangedQueue 這個最近變更隊列保存了最近3分鍾以內實例的變更信息,如新服務注冊、服務下線等,然后客戶端每次就是拉取這個變更隊列。

 1 protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
 2     this.serverConfig = serverConfig;
 3     this.clientConfig = clientConfig;
 4     this.serverCodecs = serverCodecs;
 5     // 最近下線的循環隊列
 6     this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
 7     // 最近注冊的循環隊列
 8     this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);
 9 
10     // 最近一分鍾續約的計數器
11     this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
12 
13     // 一個定時調度任務,定時剔除最近改變隊列中過期的實例
14     this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
15             // 調度任務延遲 30 秒開始執行
16             serverConfig.getDeltaRetentionTimerIntervalInMs(),
17             // 默認每隔 30 秒執行一次
18             serverConfig.getDeltaRetentionTimerIntervalInMs());
19 }
20 
21 /////////////////////////////////////////
22 
23 private TimerTask getDeltaRetentionTask() {
24     return new TimerTask() {
25 
26         @Override
27         public void run() {
28             // 最近變更的隊列
29             Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
30             while (it.hasNext()) {
31                 // 最近更新時間超過 180 秒就認為數據已經同步到各個客戶端了,就從隊列中移除
32                 if (it.next().getLastUpdateTime() <
33                         // retentionTimeInMSInDeltaQueue:delta隊列數據保留時間,默認 180 秒
34                         System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
35                     it.remove();
36                 } else {
37                     break;
38                 }
39             }
40         }
41 
42     };
43 }

7、Eureka 抓取注冊表總體流程圖

下面還是用一張圖整體展示下服務抓取注冊表的整理流程。

服務注冊、服務下線、實例故障剔除都會將讀寫緩存 readWriteCacheMap 中對應的實例失效掉,然后加入到最近變更隊列 recentlyChangedQueue 中,因此這三種情況下,增量抓取注冊表的邏輯都是類似的。

七、服務續約

在分布式系統中,服務續約機制是非常重要的,這樣能讓中心系統(注冊中心)知道客戶端還存活着。接下來就來看看服務續約的機制。

1、Eureka Client 定時發送心跳

在初始化 DiscoveryClient 的調度任務時,下面這部分代碼就是在創建定時發送心跳的任務,心跳每隔30秒發送一次。發送心跳的接口是 PUT /apps/{appName}/{instanceId}。

 1 private void initScheduledTasks() {
 2     // 定時刷新本地緩存...
 3 
 4     if (clientConfig.shouldRegisterWithEureka()) {
 5         // 續約間隔時間,默認30秒
 6         int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
 7         // 心跳調度器的延遲時間擴大倍數,默認10
 8         int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
 9         logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
10 
11         // 心跳的定時任務
12         heartbeatTask = new TimedSupervisorTask(
13                 "heartbeat",
14                 scheduler,
15                 heartbeatExecutor,
16                 renewalIntervalInSecs,
17                 TimeUnit.SECONDS,
18                 expBackOffBound,
19                 new HeartbeatThread()
20         );
21         // 30秒后開始調度心跳的任務
22         scheduler.schedule(
23                 heartbeatTask,
24                 renewalIntervalInSecs, TimeUnit.SECONDS);
25 
26         // 服務注冊...
27     } else {
28         logger.info("Not registering with Eureka server per configuration");
29     }
30 }
31 
32 //////////////////////////////////////////////
33 
34 private class HeartbeatThread implements Runnable {
35     public void run() {
36         if (renew()) {
37             lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
38         }
39     }
40 }
41 
42 //////////////////////////////////////////////
43 
44 boolean renew() {
45     EurekaHttpResponse<InstanceInfo> httpResponse;
46     try {
47         // 發送心跳的接口:PUT /apps/{appName}/{instanceId}
48         httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
49         logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
50         if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
51             REREGISTER_COUNTER.increment();
52             logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
53             long timestamp = instanceInfo.setIsDirtyWithTime();
54             // 服務端未找到對應的實例,就重新注冊
55             boolean success = register();
56             if (success) {
57                 instanceInfo.unsetIsDirty(timestamp);
58             }
59             return success;
60         }
61         // 續約成功
62         return httpResponse.getStatusCode() == Status.OK.getStatusCode();
63     } catch (Throwable e) {
64         logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
65         return false;
66     }
67 }

2、Eureka Server 接收心跳續約

順着 PUT /apps/{appName}/{instanceId} 找可以發現,服務端接收注冊的入口在 InstanceResource 的 renewLease 方法中,它調用了注冊表單 renew 方法進行服務續約。

 1 @PUT
 2 public Response renewLease(
 3         @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
 4         @QueryParam("overriddenstatus") String overriddenStatus,
 5         @QueryParam("status") String status,
 6         @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
 7     boolean isFromReplicaNode = "true".equals(isReplication);
 8     // 調用注冊表的 renew 進行服務續約
 9     boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
10 
11     // Not found in the registry, immediately ask for a register
12     if (!isSuccess) {
13         logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
14         return Response.status(Status.NOT_FOUND).build();
15     }
16     // Check if we need to sync based on dirty time stamp, the client
17     // instance might have changed some value
18     Response response;
19     if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
20         response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
21         // Store the overridden status since the validation found out the node that replicates wins
22         if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
23                 && (overriddenStatus != null)
24                 && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
25                 && isFromReplicaNode) {
26             registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
27         }
28     } else {
29         response = Response.ok().build();
30     }
31     logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
32     return response;
33 }
View Code

進去可以看到,調用了父類的 renew 方法續約,然后會判斷 isReplication ,如果是復制,說明是來自 eureka-server 集群中其它節點的同步請求,就復制到其它節點。復制到其它集群這塊代碼在前面已經提到過了,就不再展示。

1 public boolean renew(final String appName, final String id, final boolean isReplication) {
2     // 調用父類(AbstractInstanceRegistry)的 renew 續約
3     if (super.renew(appName, id, isReplication)) {
4         // 續約完成后同步到集群其它節點
5         replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
6         return true;
7     }
8     return false;
9 }

接着看父類的 renew 續約方法:

  • 首先根據服務名從注冊表中取出租約信息
  • 然后根據實例ID取出實例的租約信息
  • 然后判斷是否是覆蓋實例狀態
  • 將最近一分鍾續約次數計數器 renewsLastMin +1
  • 最后調用實例租約對象的 renew 方法進行續約,其內部只是更新了租約的最后更新時間 lastUpdateTimestamp ,更新為當前時間+續約間隔時間。
 1 public boolean renew(String appName, String id, boolean isReplication) {
 2     RENEW.increment(isReplication);
 3     // 根據服務名從注冊表取出租約信息
 4     Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
 5     Lease<InstanceInfo> leaseToRenew = null;
 6     if (gMap != null) {
 7         // 根據實例ID取出實例租約信息
 8         leaseToRenew = gMap.get(id);
 9     }
10     if (leaseToRenew == null) {
11         RENEW_NOT_FOUND.increment(isReplication);
12         logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
13         return false;
14     } else {
15         InstanceInfo instanceInfo = leaseToRenew.getHolder();
16         if (instanceInfo != null) {
17             // touchASGCache(instanceInfo.getASGName());
18             InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
19                     instanceInfo, leaseToRenew, isReplication);
20             if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
21                 logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
22                         + "; re-register required", instanceInfo.getId());
23                 RENEW_NOT_FOUND.increment(isReplication);
24                 return false;
25             }
26             if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
27                 logger.info(
28                         "The instance status {} is different from overridden instance status {} for instance {}. "
29                                 + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
30                                 overriddenInstanceStatus.name(),
31                                 instanceInfo.getId());
32                 instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
33 
34             }
35         }
36         // 最近一分鍾續約計數器+1
37         renewsLastMin.increment();
38         // 續約
39         leaseToRenew.renew();
40         return true;
41     }
42 }
43 
44 ////////////////////////////////////////////////
45 
46 public void renew() {
47     // 更新最后更新時間,在當前時間的基礎上加了周期時間,默認90秒
48     lastUpdateTimestamp = System.currentTimeMillis() + duration;
49 }

八、服務下線

1、Eureka Client 下線

eureka client 服務關閉停止時,會觸發 DiscoveryClient 的 shutdown 關閉 eureka-client,我們就從 shutdown 方法來看看 eureka-client 的下線。

  • 首先移除了注冊的狀態變更器,這個時候不再需要監聽實例狀態的變更了
  • 然后關閉了一系列的調度任務,停止與 eureka-server 的交互,比如定時發送心跳。同時也釋放了資源。
  • 之后調用了 unregister 取消注冊,其實就是調用了 server 端的 DELETE /apps/{appName}/{instanceId} 下線實例
  • 最后再關閉了一些其它資源,如 EurekaTransport。
 1 @PreDestroy
 2 @Override
 3 public synchronized void shutdown() {
 4     if (isShutdown.compareAndSet(false, true)) {
 5         logger.info("Shutting down DiscoveryClient ...");
 6 
 7         // 移除狀態變更監聽器
 8         if (statusChangeListener != null && applicationInfoManager != null) {
 9             applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
10         }
11 
12         // 停止調度任務,釋放資源:
13         //    instanceInfoReplicator、heartbeatExecutor、cacheRefreshExecutor
14         //    scheduler、cacheRefreshTask、heartbeatTask
15         cancelScheduledTasks();
16 
17         // If APPINFO was registered
18         if (applicationInfoManager != null
19                 && clientConfig.shouldRegisterWithEureka()
20                 && clientConfig.shouldUnregisterOnShutdown()) {
21             applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
22             // 調用 eureka-server 的下線接口下線實例
23             unregister();
24         }
25 
26         // 繼續釋放資源
27         if (eurekaTransport != null) {
28             eurekaTransport.shutdown();
29         }
30         heartbeatStalenessMonitor.shutdown();
31         registryStalenessMonitor.shutdown();
32 
33         Monitors.unregisterObject(this);
34 
35         logger.info("Completed shut down of DiscoveryClient");
36     }
37 }
38 
39 void unregister() {
40     // It can be null if shouldRegisterWithEureka == false
41     if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
42         try {
43             logger.info("Unregistering ...");
44             // 取消注冊:DELETE /apps/{appName}/{instanceId}
45             EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
46             logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
47         } catch (Exception e) {
48             logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
49         }
50     }
51 }

2、Eureka Server 服務下線

順着 DELETE /apps/{appName}/{instanceId} 接口可以找到 InstanceResouce 的 cancelLease 方法就是客戶端下線的入口。

進入注冊的 cancel 方法,可以看到與前面的一些接口是類似的,先調用服務的 cancel 方法下線實例,然后調用 replicateToPeers 復制到集群中其它節點。然后 cancel 方法其實是調用的 internalCancel 方法。

 1 @DELETE
 2 public Response cancelLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
 3     try {
 4         // 下線實例
 5         boolean isSuccess = registry.cancel(app.getName(), id,
 6             "true".equals(isReplication));
 7 
 8         if (isSuccess) {
 9             logger.debug("Found (Cancel): {} - {}", app.getName(), id);
10             return Response.ok().build();
11         } else {
12             logger.info("Not Found (Cancel): {} - {}", app.getName(), id);
13             return Response.status(Status.NOT_FOUND).build();
14         }
15     } catch (Throwable e) {
16         logger.error("Error (cancel): {} - {}", app.getName(), id, e);
17         return Response.serverError().build();
18     }
19 }
20 
21 //////////////////////////////////////////
22 
23 public boolean cancel(final String appName, final String id,
24                       final boolean isReplication) {
25     if (super.cancel(appName, id, isReplication)) {
26         replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
27 
28         return true;
29     }
30     return false;
31 }
32 
33 //////////////////////////////////////////
34 
35 public boolean cancel(String appName, String id, boolean isReplication) {
36     return internalCancel(appName, id, isReplication);
37 }

再來看下 internalCancel 方法:

  • 首先根據服務名從注冊表取出服務所有實例的租約信息,再根據實例ID移除實例租約信息
  • 將移除的實例加入到最近下線的一個循環隊列 recentCanceledQueue
  • 下線實例,注意這里設置了實例的下線時間 evictionTimestamp 為當前時間
  • 然后設置實例的 ActionType 為 DELETED,再將下線的實例加入最近變更的隊列 recentlyChangedQueue
  • 之后失效掉緩存 readWriteCacheMap,服務實例變更了就必須清理緩存。不過  readWriteCacheMap 可能要30秒才會同步到 readOnlyCacheMap。
  • 最后將期望續約的客戶端數量-1,然后更新了每分鍾續約次數閾值
 1 protected boolean internalCancel(String appName, String id, boolean isReplication) {
 2     read.lock();
 3     try {
 4         CANCEL.increment(isReplication);
 5         // 根據服務名稱取出服務租約信息
 6         Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
 7         Lease<InstanceInfo> leaseToCancel = null;
 8         if (gMap != null) {
 9             // 根據實例ID移除實例租約信息
10             leaseToCancel = gMap.remove(id);
11         }
12         // 將移除的實例ID加入到最近下線的隊列中
13         recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
14         InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
15         if (instanceStatus != null) {
16             logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
17         }
18         if (leaseToCancel == null) {
19             CANCEL_NOT_FOUND.increment(isReplication);
20             logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
21             return false;
22         } else {
23             // 下線實例,設置了實例的下線時間 evictionTimestamp 為當前時間戳
24             leaseToCancel.cancel();
25             InstanceInfo instanceInfo = leaseToCancel.getHolder();
26             String vip = null;
27             String svip = null;
28             if (instanceInfo != null) {
29                 // 設置實例 ActionType 為 DELETED
30                 instanceInfo.setActionType(ActionType.DELETED);
31                 // 加入最近變更隊列中
32                 recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
33                 instanceInfo.setLastUpdatedTimestamp();
34                 vip = instanceInfo.getVIPAddress();
35                 svip = instanceInfo.getSecureVipAddress();
36             }
37             // 失效緩存
38             invalidateCache(appName, vip, svip);
39             logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
40         }
41     } finally {
42         read.unlock();
43     }
44 
45     synchronized (lock) {
46         // 期望續約的客戶端數量 - 1
47         if (this.expectedNumberOfClientsSendingRenews > 0) {
48             // Since the client wants to cancel it, reduce the number of clients to send renews.
49             this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
50             // 更新每分鍾續約次數的閾值
51             updateRenewsPerMinThreshold();
52         }
53     }
54 
55     return true;
56 }

九、服務故障

服務正常停止時,會觸發 DiscoveryClient 的 shutdown 方法關閉 eureka-client,並向注冊中心發送下線的通知。但如果客戶端宕機或非正常關機,注冊中心就無法接收到客戶端下線的通知了,這時注冊中心就會有一個定時任務,根據心跳來判斷客戶端實例是否故障下線了,然后摘除故障的實例。

1、摘除實例的定時任務初始化

在 EurekaBootStrap 初始化的最后幾步中,調用了注冊表的 openForTraffic 做一些最后的設置,其中最后一步調用了 super.postInit 方法做最后的初始化,里面就創建了定時摘除過期實例的調度任務。

1 registry.openForTraffic(applicationInfoManager, registryCount);
 1 public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
 2     // 期望的客戶端每分鍾的續約次數
 3     this.expectedNumberOfClientsSendingRenews = count;
 4     // 更新每分鍾續約閾值
 5     updateRenewsPerMinThreshold();
 6     logger.info("Got {} instances from neighboring DS node", count);
 7     logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
 8     this.startupTime = System.currentTimeMillis();
 9     if (count > 0) {
10         this.peerInstancesTransferEmptyOnStartup = false;
11     }
12     DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
13     boolean isAws = Name.Amazon == selfName;
14     if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
15         logger.info("Priming AWS connections for all replicas..");
16         primeAwsReplicas(applicationInfoManager);
17     }
18     logger.info("Changing status to UP");
19     // 設置實例狀態為已啟動
20     applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
21     // 調用父類的后置初始化
22     super.postInit();
23 }
View Code

postInit:

  • 首先是開啟了最近一分鍾續約次數的計數器
  • 然后創建了定時摘除過期實例的調度任務,調度任務每隔60秒執行一次
 1 protected void postInit() {
 2     // 啟動 統計最近一分鍾續約次數的計數器
 3     renewsLastMin.start();
 4     if (evictionTaskRef.get() != null) {
 5         evictionTaskRef.get().cancel();
 6     }
 7     // 定時剔除任務
 8     evictionTaskRef.set(new EvictionTask());
 9     evictionTimer.schedule(evictionTaskRef.get(),
10             serverConfig.getEvictionIntervalTimerInMs(),
11             // 每隔60秒執行一次
12             serverConfig.getEvictionIntervalTimerInMs());
13 }

2、定時摘除過期的實例

① 摘除實例的定時任務

可以看到,每次運行摘除實例的任務時,會先獲取一個補償時間,因為兩次 EvictionTask 執行的間隔時間可能超過了設置的60秒,比如 GC 導致的停頓或本地時間漂移導致計時不准確等。然后就調用了 evict 方法摘除實例。

在計算時間差的場景中,這種補償時間的思路是值得學習的,要考慮到時間差的不准確性。

 1 class EvictionTask extends TimerTask {
 2 
 3     private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
 4 
 5     @Override
 6     public void run() {
 7         try {
 8             // 獲取補償時間,因為兩次 EvictionTask 執行的間隔時間可能超過了設置的60秒,比如 GC 導致的停頓或本地時間漂移導致計時不准確
 9             long compensationTimeMs = getCompensationTimeMs();
10             logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
11             evict(compensationTimeMs);
12         } catch (Throwable e) {
13             logger.error("Could not run the evict task", e);
14         }
15     }
16 
17     long getCompensationTimeMs() {
18         long currNanos = getCurrentTimeNano();
19         long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
20         if (lastNanos == 0L) {
21             return 0L;
22         }
23         // 兩次任務運行的間隔時間
24         long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
25         // 補償時間 = 任務運行間隔時間 - 剔除任務的間隔時間(默認60秒)
26         long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
27         return compensationTime <= 0L ? 0L : compensationTime;
28     }
29 
30     long getCurrentTimeNano() {  // for testing
31         return System.nanoTime();
32     }
33 }

② 摘除實例

摘除實例的過程如下:

  • 首先判斷是否啟用了租約過期的機制(主要就是自我保護機制,下一章節再說)。
  • 遍歷注冊表,判斷實例是否過期,將過期的實例加入集合列表中。
  • 計算摘除實例的數量限制,主要就是出於自我保護機制,避免一次摘除過多實例。
  • 然后就是從要摘除的集合中隨機選擇限制數量內的過期實例來摘除掉。
  • 摘除實例其實就是調用了實例下線的方法 internalCancel,主要就是從注冊表中移除實例、加入最近變更的隊列、失效緩存等,具體可以回看服務下線那節。
 1 public void evict(long additionalLeaseMs) {
 2     logger.debug("Running the evict task");
 3 
 4     // 是否啟用了租約過期
 5     if (!isLeaseExpirationEnabled()) {
 6         logger.debug("DS: lease expiration is currently disabled.");
 7         return;
 8     }
 9 
10     // We collect first all expired items, to evict them in random order. For large eviction sets,
11     // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
12     // the impact should be evenly distributed across all applications.
13     List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
14     for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
15         Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
16         if (leaseMap != null) {
17             for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
18                 Lease<InstanceInfo> lease = leaseEntry.getValue();
19                 // 判斷實例租約是否過期
20                 if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
21                     // 加入到過期的集合列表中
22                     expiredLeases.add(lease);
23                 }
24             }
25         }
26     }
27 
28     // 先獲取注冊表已注冊的實例數量
29     int registrySize = (int) getLocalRegistrySize();
30     // 注冊表數量保留的閾值:已注冊的實例數 * 續約百分比閾值(默認0.85)
31     int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
32     // 剔除的數量限制,也就是說一次最多只能剔除 15% 的實例
33     int evictionLimit = registrySize - registrySizeThreshold;
34 
35     // 得到最小的剔除數量
36     int toEvict = Math.min(expiredLeases.size(), evictionLimit);
37     if (toEvict > 0) {
38         logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
39 
40         Random random = new Random(System.currentTimeMillis());
41         for (int i = 0; i < toEvict; i++) {
42             // 根據要剔除的數量從 expiredLeases 中隨機剔除 toEvict 個過期實例
43             int next = i + random.nextInt(expiredLeases.size() - i);
44             Collections.swap(expiredLeases, i, next);
45             Lease<InstanceInfo> lease = expiredLeases.get(i);
46 
47             String appName = lease.getHolder().getAppName();
48             // 實例ID
49             String id = lease.getHolder().getId();
50             EXPIRED.increment();
51             logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
52             // 調用下線的方法
53             internalCancel(appName, id, false);
54         }
55     }
56 }

③ 分批次摘取實例

可以看到,過期的實例並不是一次性摘除的,而是計算了一個閾值 toEvict,一次只隨機摘除 toEvict 個過期實例,采用了分批摘取+隨機摘取的機制。

比如注冊表一共有20個實例,那么最多可以摘除的實例數 toEvict = 20 - 20 * 0.85 = 3 個,也就是說就算有5個實例過期了,那這一次也只能隨機摘除其中3個,另外兩個要等到下一次執行摘除任務時再摘除。

這種分批摘取機制+隨機摘取機制可能會導致有些過期實例要過很久才能下線,尤其是在開發環境這種頻繁啟動、停止服務的場景中。

3、如何判斷實例是否過期

從上面可以看到,eureka 調用了 lease.isExpired(additionalLeaseMs) 來判斷實例是否過期。進入 isExpired 這個方法可以看到,如果已經設置了摘除時間,或者 當前時間 > (實例最后更新時間 + 續約周期(90秒) + 補償時間),就認為實例已經過期了,說明實例已經超過一個周期沒有續約了,就認為這個客戶端實例發生了故障,無法續約,要被摘除掉。

 1 /**
 2  * Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not.
 3  *
 4  * Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than
 5  * what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect
 6  * instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will
 7  * not be fixed.
 8  *
 9  * @param additionalLeaseMs any additional lease time to add to the lease evaluation in ms.
10  */
11 public boolean isExpired(long additionalLeaseMs) {
12     // 已經設置過剔除時間,或者 當前時間 > (實例最后更新時間 + 續約周期(90秒) + 補償時間)
13     return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
14 }

這里其實要注意的是另外一個問題,可以看看 isExpired 的注釋,eureka 說這其實是一個bug,但不打算修復了,因為它的 duration 其實是被加了兩次的,下面來看看怎么回事。

先看下 lastUpdateTimestamp 是如何更新的,在客戶端續約的時候會更新 lastUpdateTimestamp,將其設置為 當前時間 + duration 間隔周期(默認90秒),

1 public void renew() {
2     // 更新最后更新時間,在當前時間的基礎上加了一個周期間隔時間,默認90秒
3     lastUpdateTimestamp = System.currentTimeMillis() + duration;
4 }

這個 duration 在注冊的時候也有設置,我們通過這個來看看它的含義是什么。可以看到,如果客戶端沒有配置 durationInSecs,就會設置為默認的 90 秒。

從 getDurationInSecs 的注釋可以看出,這個 duration 的意思是等待客戶端多久沒有續約之后就將其剔除,默認為 90 秒。比如客戶端每隔 30 秒續約一次,那可能超過3次沒有續約之后,就會認為客戶端實例故障了,就要將其摘除掉。

 1 public void register(final InstanceInfo info, final boolean isReplication) {
 2     int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
 3     // 如果實例中沒有周期的配置,就設置為默認的 90 秒
 4     if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
 5         leaseDuration = info.getLeaseInfo().getDurationInSecs();
 6     }
 7     // 注冊實例
 8     super.register(info, leaseDuration, isReplication);
 9     // 復制到集群其它 server 節點
10     replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
11 }
12 
13 //////////////////////////////////////////////
14 
15 /**
16  * Returns client specified setting for eviction (e.g. how long to wait w/o
17  * renewal event)
18  *
19  * @return time in milliseconds since epoch.
20  */
21 public int getDurationInSecs() {
22     return durationInSecs;
23 }

但實際上並不是90秒后摘除實例,可以看到 isExpired 里面將 lastUpdateTimestamp 又加了一個 duration,也就是 180 秒了。也就是說客戶端實例超過 180 秒未續約才被認為是故障了,然后要將其摘除。

isExpired 的注釋也說了,續約的方法 renew() 錯誤的計算了 lastUpdateTimestamp,實際的過期周期是 2 * duration,但是 eureka 並不打算修復這個bug,因為它的影響范圍很小。

所以這里得出一個結論,客戶端關閉了(非正常下線),注冊表中的實例並不是90秒后就摘除了,至少是 180 秒后才會被摘除。

十、自我保護機制

如果網段偶爾抖動或暫時不可用,就無法接收到客戶端的續約,因此 eureka server 為了保證可用性,就會去判斷最近一分鍾收到的心跳次數是否小於指定的閾值,是的就會觸發自我保護機制,關閉租約失效剔除,不再摘除實例,從而保護注冊信息。

1、摘除實例時的自我保護機制

摘除實例的 evict 方法調用了 isLeaseExpirationEnabled 這個方法判斷是否觸發自我保護機制,如果返回 false,就不會摘除實例了。

先看看 isLeaseExpirationEnabled 這個方法:

  • 首先,如果沒有啟用自我保護機制,就返回 true,那就可以摘除實例
  • 如果啟用了自我保護機制(默認啟用),就判斷每分鍾續約閾值 > 0 且 最近一分鍾續約次數 > 每分鍾續約閾值 就是啟用了租約過期
 1 public boolean isLeaseExpirationEnabled() {
 2     // 先判斷是否啟用了自我保護機制
 3     if (!isSelfPreservationModeEnabled()) {
 4         // The self preservation mode is disabled, hence allowing the instances to expire.
 5         return true;
 6     }
 7     // 每分鍾續約閾值大於0, 且 最近一分鍾續約次數 大於 每分鍾續約閾值
 8     return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
 9 }
10 
11 public boolean isSelfPreservationModeEnabled() {
12     return serverConfig.shouldEnableSelfPreservation();
13 }

這個每分鍾續約閾值 numberOfRenewsPerMinThreshold 在前面很多地方都看到過了,服務注冊、下線、openForTraffic、以及有個定時任務每隔15分鍾都會調用下面這個方法來更新 numberOfRenewsPerMinThreshold。

1 protected void updateRenewsPerMinThreshold() {
2     // 每分鍾續約閾值 = 期望續約的客戶端數量 * (60 / 續約間隔時間) * 續約閾值
3     // 例如,一共注冊了 10 個實例,那么期望續約的客戶端數量為 10,間隔時間默認為 30秒,就是每個客戶端應該每30秒發送一次心跳,續約百分比默認為 0.85
4     // 每分鍾續約次數閾值 = 10 * (60.0 / 30) * 0.85 = 17,也就是說每分鍾至少要接收到 17 此續約請求
5     this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
6             * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
7             * serverConfig.getRenewalPercentThreshold());
8 }

expectedNumberOfClientsSendingRenews 在實例注冊的時候會 + 1,在實例下線的時候會 - 1,其代表的就是期望續約的客戶端數量。

 1 /////////////// 實例注冊 ///////////////
 2 synchronized (lock) {
 3     if (this.expectedNumberOfClientsSendingRenews > 0) {
 4         // Since the client wants to register it, increase the number of clients sending renews
 5         // 期望續約的客戶端數量 + 1
 6         this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
 7         // 更新每分鍾續約請求次數的閾值,這個閾值在后面很多地方都會用到
 8         updateRenewsPerMinThreshold();
 9     }
10 }
11 
12 
13 /////////////// 實例下線(下線、故障摘除) ///////////////
14 synchronized (lock) {
15     // 期望續約的客戶端數量 - 1
16     if (this.expectedNumberOfClientsSendingRenews > 0) {
17         // Since the client wants to cancel it, reduce the number of clients to send renews.
18         this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
19         // 更新每分鍾續約次數的閾值
20         updateRenewsPerMinThreshold();
21     }
22 }

而最近一分鍾續約次數計數器 renewsLastMin 在每個客戶端續約的時候就會+1,可以回看下 renew 方法,最后調用了 renewsLastMin.increment() 增加一次續約次數。而 renewsLastMin.getCount() 返回的是上一分鍾總的續約次數。

1 public long getNumOfRenewsInLastMin() {
2     return renewsLastMin.getCount();
3 }

根據以上代碼舉個例子來看看實例故障時的自我保護機制:

  • 比如注冊了20個實例,實例默認發送心跳續約的間隔時間為30秒,續約的閾值為 0.85,並且開啟了自我保護機制。
  • 那么期望續約的客戶端數量 expectedNumberOfClientsSendingRenews = 20,每分鍾發送心跳的閾值 numberOfRenewsPerMinThreshold = 20 * (60 / 30 )* 0.85 = 34。
  • 正常來說20個實例每分鍾發送心跳的次數 renewsLastMin = 20 * (60 / 30)= 40 次。
  • 那么 numberOfRenewsPerMinThreshold(34) > 0 && renewsLastMin(40)> numberOfRenewsPerMinThreshold(34)就是滿足的,就允許摘除故障的實例。
  • 那如果有 3 個實例上一分鍾沒有發送續約了,這個時候 renewsLastMin = 17 * (60 / 30)= 34 次,而 numberOfRenewsPerMinThreshold 還是不變,因為注冊表的實例並未移除,因此這時條件就不滿足了,就算實例真的故障了,也不能摘除實例了。

這就是 eureka-server 的自我保護機制,他認為如果短時間內有過的的實例未發送心跳(超過15%),它會認為是自己網絡故障導致客戶端不能發送心跳,就進入自我保護機制,避免誤摘除實例。

2、自我保護機制導致實例未下線的情況

在開發環境中,因為會頻繁重啟服務,會發現有些服務已經是下線狀態了(DOWN),但服務實例一直未被摘除,就是因為 eureka-server 的自我保護機制導致的,下面來看下。

① 啟用自我保護機制的情況

首先 eureka-server 做了如下配置,啟用注冊中心:

1 eureka:
2   server:
3     # 是否啟用自我保護機制
4     enable-self-preservation: true

啟動幾個客戶端實例:

然后快速將 demo-consumer 停止掉(如果正常關閉,會調用 cancel 下線實例),這時就會看到 demo-consumer 已經DOWN了,但是實例一直未被移除。

可以看到,上一分鍾續約的次數為 4 次,期望每分鍾續約次數為6次,因為不滿足判斷的條件,所以就觸發了自我保護機制,導致一直無法摘除實例。

注意期望續約的客戶端數量為4,而實際注冊的客戶端實例是3個,這是因為 springcloud 在調用 openForTraffic 設置了初始值為 1。

② 關閉自我保護機制的情況

配置如下,關閉自我保護機制:

1 eureka:
2   server:
3     # 是否啟用自我保護機制
4     enable-self-preservation: false

這時注冊中心控制台會提示我們關閉了自我保護機制:

同樣的操作,快速停掉實例,發現實例還是未被摘除:

那其實是因為實例要180秒后才會被認為是過期的,所以等3分鍾以后,實例就會下線了。

1 public boolean isExpired(long additionalLeaseMs) {
2     return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
3 }

③ 快速關閉多個實例

這次同時關閉2個實例來看看,在2分鍾之后,發現只有一個實例下線了,這因為eureka-server一次只會摘除15%的實例。

④ DOWN 是怎么來的

那么DOWN這個狀態是怎么來的呢?由於我本地是用IDEA啟動的客戶端實例,其實在關閉的時候,會觸發狀態變更監聽器,然后就會觸發一次注冊的調用,注冊的狀態是 DOWN,因此實例狀態馬上就變為 DOWN 了。

如果直接 kill 這個進程,就不會觸發狀態變更監聽器了,注冊中心的實例就不會變為 DOWN 了,但是實例已經下線變為不可用的狀態了。

⑤ 實例快速下線

經過前面的測試可以總結出來,要想實例快速下線,可以調整如下一些參數。

eureka-server 配置:

1 eureka:
2   server:
3     # 是否啟用自我保護機制
4     enable-self-preservation: false
5     # 每分鍾續約閾值
6     renewal-percent-threshold: 0
7     # 摘除實例的定時任務的間隔時間
8     eviction-interval-timer-in-ms: 10000

eureka-client 配置:

1 eureka:
2   instance:
3     # 判斷實例多久未發送心跳就判定為過期
4     lease-expiration-duration-in-seconds: 60

3、最近一分鍾計數器的設計

來看下最近一分鍾續約次數計數器 renewsLastMin 是如何統計上一分鍾的續約次數的,renewsLastMin 的類型是 MeasuredRate,這個類的設計也是值得學習的一個點。

MeasuredRate 利用兩個桶來計數,一個保存上一間隔時間的計數,一個保存當前這一間隔時間的計數,然后使用定時任務每隔一定間隔時間就將當前這個桶的計數替換到上一個桶里。然后增加計數的時候增加當前桶,獲取數量的時候從上一個桶里獲取,就實現了獲取上一個間隔時間的計數。

 1 public class MeasuredRate {
 2     private static final Logger logger = LoggerFactory.getLogger(MeasuredRate.class);
 3     // 利用了兩個桶來計數,一個是上一分鍾,一個是當前這一分鍾
 4     private final AtomicLong lastBucket = new AtomicLong(0);
 5     private final AtomicLong currentBucket = new AtomicLong(0);
 6 
 7     private final long sampleInterval;
 8     private final Timer timer;
 9 
10     private volatile boolean isActive;
11 
12     /**
13      * @param sampleInterval in milliseconds
14      */
15     public MeasuredRate(long sampleInterval) {
16         // 間隔時間
17         this.sampleInterval = sampleInterval;
18         // 定時器
19         this.timer = new Timer("Eureka-MeasureRateTimer", true);
20         this.isActive = false;
21     }
22 
23     public synchronized void start() {
24         if (!isActive) {
25             timer.schedule(new TimerTask() {
26                 @Override
27                 public void run() {
28                     try {
29                         // 每分鍾執行一次,將當前這一分鍾的次數設置到上一分鍾的桶里
30                         lastBucket.set(currentBucket.getAndSet(0));
31                     } catch (Throwable e) {
32                         logger.error("Cannot reset the Measured Rate", e);
33                     }
34                 }
35             }, sampleInterval, sampleInterval);
36 
37             isActive = true;
38         }
39     }
40 
41     public synchronized void stop() {
42         if (isActive) {
43             timer.cancel();
44             isActive = false;
45         }
46     }
47 
48     /**
49      * Returns the count in the last sample interval.
50      */
51     public long getCount() {
52         // 獲取計數時是獲取的上一分鍾這個桶的計數
53         return lastBucket.get();
54     }
55 
56     /**
57      * Increments the count in the current sample interval.
58      */
59     public void increment() {
60         // 增加計數的時候是增加的當前這個桶的計數
61         currentBucket.incrementAndGet();
62     }
63 }

4、服務故障摘除和自我保護機制圖

下面用一張圖來總結下服務故障摘除和自我保護機制。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM