SpringCloud 源碼系列(1)—— 注冊中心 Eureka(上)


SpringCloud 源碼系列(1)—— 注冊中心 Eureka(上)

SpringCloud 源碼系列(2)—— 注冊中心 Eureka(中)

SpringCloud 源碼系列(3)—— 注冊中心 Eureka(下)

 

Eureka 是 Netflix 公司開源的一個服務注冊與發現的組件,和其他 Netflix 公司的服務組件(例如負載均衡、熔斷器、網關等)一起,被 Spring Cloud 整合為 Spring Cloud Netflix 模塊。不過 Eureka 2.0 開始閉源了,但 1.x 還在繼續維護中,可以繼續使用。這篇文章就來深入學習下 Eureka 注冊中心,便於我們更好的使用和調優注冊中心。

關於版本:本文章使用的 Spring cloud 版本為 Hoxton.SR8,Spring boot 版本為 2.3.3.RELEASE,依賴的 eureka 版本則為 1.9.25。

一、Eureka 初體驗

Eureka 分為 Eureka Server 和 Eureka Client,Eureka Server 為 Eureka 注冊中心,Eureka Client 為 Eureka 客戶端。這節先通過demo把注冊中心的架子搭起來,看看注冊中心的基礎架構。

1、Eureka Server

① 創建注冊中心服務:sunny-register

首先創建一個 maven 工程,服務名稱為 sunny-register,並在 pom.xml 中引入注冊中心服務端的依賴。

1 <dependencies>
2     <dependency>
3         <groupId>org.springframework.cloud</groupId>
4         <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
5     </dependency>
6 </dependencies>

② 添加配置文件

在 resources 下添加 application.yml 配置文件,並添加注冊中心相關配置。

 1 server:
 2   port: 8000
 3 spring:
 4   application:
 5     name: sunny-register
 6 
 7 eureka:
 8   instance:
 9     hostname: dev.lyyzoo.com
10   client:
11     # 是否向注冊中心注冊自己
12     register-with-eureka: false
13     # 是否檢索服務
14     fetch-registry: false
15     service-url:
16       defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/

③ 添加啟動類

添加啟動類,並在啟動類上加上 @EnableEurekaServer 注解,啟用注冊中心。

 1 package com.lyyzoo.sunny.register;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
 6 
 7 @EnableEurekaServer
 8 @SpringBootApplication
 9 public class RegisterApplication {
10 
11     public static void main(String[] args) {
12         SpringApplication.run(RegisterApplication.class, args);
13     }
14 }

④ 啟動注冊中心

啟動注冊中心后,訪問 http://dev.lyyzoo.com:8000/,就可以看到注冊中心的頁面了,現在還沒有實例注冊上來。(dev.lyyzoo.com 在本地 hosts 文件中映射到 127.0.0.1)

2、Eureka Client

創建兩個 demo 服務,demo-producer 服務作為生產者提供一個接口,demo-consumer 服務作為消費者去調用 demo-producer 的接口。

① 創建客戶端服務:demo-producer

創建maven工程,服務名稱為 demo-producer,在 pom.xml 中引入注冊中心客戶端的依賴,並添加了 web 的依賴。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

② 添加配置文件

在 resouces 下添加 application.yml 配置文件,添加注冊中心客戶端相關的配置。

 1 server:
 2   port: 8010
 3 spring:
 4   application:
 5     name: demo-producer
 6 
 7 eureka:
 8   client:
 9     serviceUrl:
10       defaultZone: ${EUREKA_DEFAULT_ZONE:http://dev.lyyzoo.com:8000/eureka}

③ 添加啟動類

添加啟動類,並在啟動類上加上 @EnableEurekaClient 注解,啟用客戶端。

1 @EnableEurekaClient
2 @SpringBootApplication
3 public class ProducerApplication {
4 
5     public static void main(String[] args) {
6         SpringApplication.run(ProducerApplication.class, args);
7     }
8 }

④ 添加一個 rest 接口

添加一個接口用於測試調用:

 1 @RestController
 2 public class DemoController {
 3 
 4     private final Logger logger = LoggerFactory.getLogger(getClass());
 5 
 6     @GetMapping("/v1/uuid")
 7     public ResponseEntity<String> getUUID() {
 8         String uuid = UUID.randomUUID().toString();
 9         logger.info("generate uuid: {}", uuid);
10         return ResponseEntity.ok(uuid);
11     }
12 }

⑤  創建客戶端服務:demo-consumer

類似的方式,再創建消費者服務:demo-producer,這個服務中添加一個消費者接口,通過 RestTemplate 負載均衡的方式來調用 demo-producer 的接口。

因此需要先配置一個帶有負載均衡的 RestTemplate:

 1 @EnableEurekaClient
 2 @SpringBootApplication
 3 public class ConsumerApplication {
 4 
 5     @Bean
 6     @LoadBalanced
 7     public RestTemplate restTemplate() {
 8         return new RestTemplate();
 9     }
10 
11     public static void main(String[] args) {
12         SpringApplication.run(ConsumerApplication.class, args);
13     }
14 }

添加消費者接口,注意這里 url 是寫的服務名稱,並不是具體的 ip 地址或端口,在微服務場景下,服務間調用也不可能寫死某個具體的地址。

 1 @RestController
 2 public class DemoController {
 3 
 4     private final Logger logger = LoggerFactory.getLogger(getClass());
 5 
 6     @Autowired
 7     private RestTemplate restTemplate;
 8 
 9     @GetMapping("/v1/id")
10     public ResponseEntity<String> getId() {
11         ResponseEntity<String> result = restTemplate.getForEntity("http://demo-producer/v1/uuid", String.class);
12         String uuid = result.getBody();
13         logger.info("request id: {}", uuid);
14         return ResponseEntity.ok(uuid);
15     }
16 }

⑥ 啟動注冊中心客戶端

以兩個不同的端口啟動 demo-producer,可以通過環境變量的方式制定端口。然后再啟動 demo-consumer。

啟動完成之后,就可以在注冊中心看到注冊上來的兩個 demo-producer 實例和一個 demo-consumer 實例,並且狀態都為 UP。

⑦ 測試接口

調用消費者服務的接口,多次訪問 http://dev.lyyzoo.com:8020/v1/id 接口,會發現生產者服務 demo-consumer 兩個實例的控制台會交替的輸出日志信息。這就說明消費者客戶端通過服務名稱訪問到生產者了。

3、Eureka 基礎架構

通過前面的體驗,可以發現,服務間調用只需知道某個服務的名稱就可以調用這個服務的api了,而不需要指定具體的ip地址和端口,那這是怎么做到的呢?

不難看出,Eureka 的基礎架構包含三種角色:

  • 服務注冊中心:Eureka Server,提供服務注冊和發現的功能
  • 服務提供者:Eureka Client,提供服務(本身也可以作為消費者)
  • 服務消費者:Eureka Client,消費服務(本身也可以作為提供者)

首先需要一個服務注冊中心,客戶端則向注冊中心注冊,將自己的信息(比如服務名、服務的 IP 地址和端口信息等)提交給注冊中心。客戶端向注冊中心獲取一份服務注冊列表的信息,該列表包含了所有向注冊中心注冊的服務信息。獲取服務注冊列表信息之后,客戶端服務就可以根據服務名找到服務的所有實例,然后通過負載均衡選擇其中一個實例,根據其 IP 地址和端口信息,就可以調用服務的API接口了。

這就是注冊中心最基礎的架構和功能了,提供服務注冊和發現,為各個客戶端提供服務注冊列表信息。但為了完成這些工作,Eureka 有很多的機制來實現以及保證其高可用,如服務注冊、服務續約、獲取服務注冊列表、服務下線、服務剔除等等。Eureka 也提供了很多參數讓我們可以根據實際的場景來優化它的一些功能和配置,比如維持心跳的時間、拉取注冊表的間隔時間、自我保護機制等等。下面我們就從 eureka 的源碼層面來分析下 eureka 的這些功能以及參數,理解其原理,學習它的一些設計。

二、Eureka 源碼准備

雖然我們在 pom.xml 中依賴的是 spring-cloud-starter-netflix-eureka-server 和 spring-cloud-starter-netflix-eureka-client,但 spring-cloud-starter-netflix 只是對 eureka 做了封裝,使得其可以通過 springboot 的方式來啟動和初始化,其底層其實是 netflix 的 eureka-core、eureka-client 等。所以我們先分析 netflix eureka 的源碼,最后再看看 spring-cloud-starter-netflix 的源碼。

1、源碼環境准備

① 下載源碼

Netflix Eureka:https://github.com/Netflix/eureka

Spring Cloud Netflix:https://github.com/spring-cloud/spring-cloud-netflix

克隆 eureka 的源碼到本地:

$ git clone https://github.com/Netflix/eureka.git

由於我們依賴的是 1.9.25 版本,將代碼克隆到本地后,將其切換到 1.9.25:

$ git checkout -b 1.9.25

然后到 eureka 根目錄下執行構建的命令:

$ ./gradlew clean build -x test

② IDEA 打開源碼

由於 eureka 使用 gradle 管理依賴,所以本地需要先安裝 gradle,之后 IDEA 中也需要安裝 gradle 的插件,跟 maven 都是類似的,安裝教程可自行百度。

2、Eureka 工程結構

Eureka 主要包含如下模塊:

  • eureka-client:eureka 客戶端
  • eureka-core:eureka 服務端,注冊中心的核心功能
  • eureka-resources:基於jsp的eureka控制台,可以查看注冊了哪些服務實例
  • eureka-server:注冊中心,集成了 eureka-client、eureka-core、eureka-resources,因為依賴了 eureka-client,因此 eureka-server 也是一個客戶端,在 eureka server 集群模式下,eureka-server 也會作為客戶端注冊到其它注冊中心上
  • eureka-examples:eureka 例子
  • eureka-test-utils:eureka 單元測試工具
  • eureka-core|client-jersey2:對 jersey 框架的封裝,jersey 類似於 spring mvc,支持 http restful 請求,eureka-client 和 eureka-server 之間的通信就是基於 jersey 框架來的

三、Eureka Server 啟動初始化

首先要看的是 eureka-server,注冊中心啟起來之后,客戶端才能來注冊服務和發現服務。

1、eureka-server 模塊

① eureka-server 目錄

  • resources 目錄中主要是 eureka client 和 server 的配置文件
  • webapp 下有一個 web.xml 配置文件,這里面就配置了啟動初始化的入口,從這也可以看出,eureka-server 會被打包成 war 包來運行
  • test 下有個單元測試類 EurekaClientServerRestIntegrationTest,這里面就包含了服務注冊、續約、下線等單元測試,我們就可以運行這些單元測試來調試代碼

② web.xml

web.xml 的內容:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <web-app version="2.5"
 3          xmlns="http://java.sun.com/xml/ns/javaee"
 4          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 5          xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
 6     http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">
 7   <!-- eureka 啟動初始化類 -->
 8   <listener>
 9     <listener-class>com.netflix.eureka.EurekaBootStrap</listener-class>
10   </listener>
11 
12   <!-- 狀態過濾器 -->
13   <filter>
14     <filter-name>statusFilter</filter-name>
15     <filter-class>com.netflix.eureka.StatusFilter</filter-class>
16   </filter>
17 
18   <!-- 認證過濾器 -->
19   <filter>
20     <filter-name>requestAuthFilter</filter-name>
21     <filter-class>com.netflix.eureka.ServerRequestAuthFilter</filter-class>
22   </filter>
23 
24   <!-- 限流過濾器 -->
25   <filter>
26     <filter-name>rateLimitingFilter</filter-name>
27     <filter-class>com.netflix.eureka.RateLimitingFilter</filter-class>
28   </filter>
29   <filter>
30     <filter-name>gzipEncodingEnforcingFilter</filter-name>
31     <filter-class>com.netflix.eureka.GzipEncodingEnforcingFilter</filter-class>
32   </filter>
33 
34   <!-- jersey 容器 -->
35   <filter>
36     <filter-name>jersey</filter-name>
37     <filter-class>com.sun.jersey.spi.container.servlet.ServletContainer</filter-class>
38     <init-param>
39       <param-name>com.sun.jersey.config.property.WebPageContentRegex</param-name>
40       <param-value>/(flex|images|js|css|jsp)/.*</param-value>
41     </init-param>
42     <init-param>
43       <param-name>com.sun.jersey.config.property.packages</param-name>
44       <param-value>com.sun.jersey;com.netflix</param-value>
45     </init-param>
46 
47     <!-- GZIP content encoding/decoding -->
48     <init-param>
49       <param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
50       <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
51     </init-param>
52     <init-param>
53       <param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name>
54       <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
55     </init-param>
56   </filter>
57 
58   <filter-mapping>
59     <filter-name>statusFilter</filter-name>
60     <url-pattern>/*</url-pattern>
61   </filter-mapping>
62 
63   <filter-mapping>
64     <filter-name>requestAuthFilter</filter-name>
65     <url-pattern>/*</url-pattern>
66   </filter-mapping>
67 
68   <!-- Uncomment this to enable rate limiter filter.
69   <filter-mapping>
70     <filter-name>rateLimitingFilter</filter-name>
71     <url-pattern>/v2/apps</url-pattern>
72     <url-pattern>/v2/apps/*</url-pattern>
73   </filter-mapping>
74   -->
75 
76   <filter-mapping>
77     <filter-name>gzipEncodingEnforcingFilter</filter-name>
78     <url-pattern>/v2/apps</url-pattern>
79     <url-pattern>/v2/apps/*</url-pattern>
80   </filter-mapping>
81 
82   <filter-mapping>
83     <filter-name>jersey</filter-name>
84     <url-pattern>/*</url-pattern>
85   </filter-mapping>
86 
87   <!-- 歡迎頁 -->
88   <welcome-file-list>
89     <welcome-file>jsp/status.jsp</welcome-file>
90   </welcome-file-list>
91 
92 </web-app>
View Code

web.xml 中可以得知如下信息:

  • eureka server 啟動時首先通過 com.netflix.eureka.EurekaBootStrap 類來進行啟動初始化相關的工作
  • 配置了 StatusFilter(server 狀態過濾器)、ServerRequestAuthFilter(認證過濾器)、RateLimitingFilter(限流過濾器) 等過濾器,但 RateLimitingFilter 默認沒有啟用
  • 配置了 jersey 的 servlet 容器,其實就跟 springframework 的 DispatcherServlet 是類似的,用來攔截處理 http restful 請求,這塊我們不用過於關注
  • 最后還配置了 eureka server 的歡迎頁為 jsp/status.jsp 頁面,這個頁面在 eureka-resources 模塊下,也就是前面看到的 eureka 控制台頁面

③ 單元測試類 EurekaClientServerRestIntegrationTest

首先看 setUp 方法,每個測試用例運行之前都會先運行 setUp 方法來初始化運行環境。

 1 @BeforeClass
 2 public static void setUp() throws Exception {
 3     // 初始化 eureka 配置
 4     injectEurekaConfiguration();
 5     // 啟動 eureka server,會找 build/libs 目錄下的 eureka-server.*.war 包來運行
 6     // 這一步啟動時,就會加載 web.xm 配置文件,然后進入 EurekaBootStrap 初始化類
 7     startServer();
 8     // eureka server 配置
 9     createEurekaServerConfig();
10 
11     // 創建 jersey 客戶端,使用 jersey 客戶端來調用資源
12     httpClientFactory = JerseyEurekaHttpClientFactory.newBuilder()
13             .withClientName("testEurekaClient")
14             .withConnectionTimeout(1000)
15             .withReadTimeout(1000)
16             .withMaxConnectionsPerHost(1)
17             .withMaxTotalConnections(1)
18             .withConnectionIdleTimeout(1000)
19             .build();
20 
21     jerseyEurekaClient = httpClientFactory.newClient(new DefaultEndpoint(eurekaServiceUrl));
22 
23     ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
24     jerseyReplicationClient = JerseyReplicationClient.createReplicationClient(
25             eurekaServerConfig,
26             serverCodecs,
27             eurekaServiceUrl
28     );
29 }

這個類提供了如下的一些測試用例,我們可以運行這些測試用例來進行調試。

2、EurekaBootStrap 初始化

EurekaBootStrap 是監聽器的入口,實現了 ServletContextListener 接口,主要完成了 eureka server 的啟動初始化。

從 contextInitialized 方法進去,整體上來說,分為 eureka 環境初始化和 eureka server 上下文初始化。

 1 @Override
 2 public void contextInitialized(ServletContextEvent event) {
 3     try {
 4         // eureka 環境初始化
 5         initEurekaEnvironment();
 6         // eureka server 上下文初始化
 7         initEurekaServerContext();
 8 
 9         ServletContext sc = event.getServletContext();
10         sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
11     } catch (Throwable e) {
12         logger.error("Cannot bootstrap eureka server :", e);
13         throw new RuntimeException("Cannot bootstrap eureka server :", e);
14     }
15 }

① eureka環境初始化

initEurekaEnvironment 方法內主要是設置數據中心和運行環境參數:

  • archaius.deployment.datacenter = default
  • archaius.deployment.environment = test

② eureka server 上下文初始化

initEurekaServerContext 上下文初始化則包含了很多階段:

  • 構造 eureka 注冊中心配置:EurekaServerConfig
  • 構造 eureka 實例配置:EurekaInstanceConfig
  • 構造實例信息:InstanceInfo
  • 構造實例管理器:ApplicationInfoManager 
  • 構造 eureka 客戶端配置:EurekaClientConfig
  • 創建 eureka 客戶端:EurekaClient(DiscoveryClient)
  • 創建注冊表(可以感知eureka集群的注冊表):PeerAwareInstanceRegistry
  • 創建集群:PeerEurekaNodes
  • 將信息封裝到eureka上下文:EurekaServerContext
  • 將eureka上下文放到一個全局容器中:EurekaServerContextHolder
  • 初始化eureka上下文
  • 同步eureka server的注冊表
  • 開啟追蹤
  • 注冊監控統計
 1 protected void initEurekaServerContext() throws Exception {
 2     // 1、eureka 注冊中心配置
 3     EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
 4 
 5     // For backward compatibility
 6     JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
 7     XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
 8 
 9     logger.info("Initializing the eureka client...");
10     logger.info(eurekaServerConfig.getJsonCodecName());
11     ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
12 
13     ApplicationInfoManager applicationInfoManager = null;
14 
15     if (eurekaClient == null) {
16         // 2、eureka 實例配置
17         EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
18                 ? new CloudInstanceConfig()
19                 : new MyDataCenterInstanceConfig();
20 
21         // 3、構造 InstanceInfo 實例信息
22         // 4、構造 ApplicationInfoManager 應用管理器
23         applicationInfoManager = new ApplicationInfoManager(
24                 instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
25 
26         // 5、eureka 客戶端配置
27         EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
28         // 6、構造 EurekaClient,DiscoveryClient 封裝了客戶端相關的操作
29         eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
30     } else {
31         applicationInfoManager = eurekaClient.getApplicationInfoManager();
32     }
33 
34     PeerAwareInstanceRegistry registry;
35     if (isAws(applicationInfoManager.getInfo())) {
36         registry = new AwsInstanceRegistry(
37                 eurekaServerConfig,
38                 eurekaClient.getEurekaClientConfig(),
39                 serverCodecs,
40                 eurekaClient
41         );
42         awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
43         awsBinder.start();
44     } else {
45         // 7、構造感知eureka集群的注冊表
46         registry = new PeerAwareInstanceRegistryImpl(
47                 eurekaServerConfig,
48                 eurekaClient.getEurekaClientConfig(),
49                 serverCodecs,
50                 eurekaClient
51         );
52     }
53 
54     // 8、構造eureka-server集群信息
55     PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
56             registry,
57             eurekaServerConfig,
58             eurekaClient.getEurekaClientConfig(),
59             serverCodecs,
60             applicationInfoManager
61     );
62 
63     // 9、基於前面構造的對象創建 EurekaServerContext
64     serverContext = new DefaultEurekaServerContext(
65             eurekaServerConfig,
66             serverCodecs,
67             registry,
68             peerEurekaNodes,
69             applicationInfoManager
70     );
71 
72     // 將 serverContext 放到 EurekaServerContextHolder 上下文中,
73     // 這樣其它地方都可以通過 EurekaServerContextHolder 拿到 EurekaServerContext
74     EurekaServerContextHolder.initialize(serverContext);
75 
76     // 10、初始化eureka-server上下文
77     serverContext.initialize();
78     logger.info("Initialized server context");
79 
80     // 11、從相鄰的eureka-server同步注冊表
81     int registryCount = registry.syncUp();
82     ///12、啟動注冊表,啟動一些定時任務
83     registry.openForTraffic(applicationInfoManager, registryCount);
84 
85     ///13、注冊監控統計
86     EurekaMonitors.registerAllStats();
87 }

3、面向接口的配置讀取

初始化中有三個配置接口,EurekaServerConfig、EurekaInstanceConfig、EurekaClientConfig,分別對應了注冊中心、eureka實例、eureka客戶端的配置獲取。

從它們默認實現類的構造方法進去可以看到,EurekaServerConfig 是讀取的 eureka-server.properties 配置文件,命名前綴是 eureka.server;EurekaInstanceConfig、EurekaClientConfig 是讀取的 eureka-client.properties 配置文件,命名前綴分別是 eureka.instance、eureka.client。

這里可以看到,eureka 在代碼中獲取配置的方式是通過接口方法的形式來獲取的,在其默認的實現類里通過硬編碼的方式定義了配置的編碼以及默認值。這種基於接口的配置讀取方式是可以借鑒的,這種方式讀取配置更易於維護,不用維護一堆常量,如果配置編碼變了只需更改實現類即可。

例如下面的配置:

 1 @Override
 2 public int getExpectedClientRenewalIntervalSeconds() {
 3     final int configured = configInstance.getIntProperty(
 4             namespace + "expectedClientRenewalIntervalSeconds",
 5             30).get();
 6     return configured > 0 ? configured : 30;
 7 }
 8 
 9 @Override
10 public double getRenewalPercentThreshold() {
11     return configInstance.getDoubleProperty(
12             namespace + "renewalPercentThreshold", 0.85).get();
13 }
14 
15 @Override
16 public boolean shouldEnableReplicatedRequestCompression() {
17     return configInstance.getBooleanProperty(
18             namespace + "enableReplicatedRequestCompression", false).get();
19 }

4、基於建造者模式構造服務實例

看 new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get() 這段代碼,在 get 方法中完成了服務實例信息的構造。它這里主要用到了建造者設計模式來構建 LeaseInfo 和 InstanceInfo,以 InstanceInfo 為例,它的內部有一個靜態的 Builder 類,通過 newBuilder() 方法創建了 InstanceInfo 對象,然后可以調用 Builder 的屬性設置方法來設置屬性,在設置這些屬性的時候,會做一些關聯性的校驗,在設置完成后,就調用 build() 方法返回對象,也可以在 build 方法中再做一些最終的校驗。建造者模式就很適合用於構建這種復雜的對象。

 1 public synchronized InstanceInfo get() {
 2     if (instanceInfo == null) {
 3         // 續約信息:主要有續約間隔時間(默認30秒)和續約過期時間(默認90秒)
 4         LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
 5                 .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
 6                 .setDurationInSecs(config.getLeaseExpirationDurationInSeconds());
 7 
 8         if (vipAddressResolver == null) {
 9             vipAddressResolver = new Archaius1VipAddressResolver();
10         }
11 
12         // 基於建造者模式來創建 InstanceInfo
13         InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(vipAddressResolver);
14 
15         // set the appropriate id for the InstanceInfo, falling back to datacenter Id if applicable, else hostname
16         String instanceId = config.getInstanceId();
17         if (instanceId == null || instanceId.isEmpty()) {
18             DataCenterInfo dataCenterInfo = config.getDataCenterInfo();
19             if (dataCenterInfo instanceof UniqueIdentifier) {
20                 instanceId = ((UniqueIdentifier) dataCenterInfo).getId();
21             } else {
22                 instanceId = config.getHostName(false);
23             }
24         }
25 
26         String defaultAddress;
27         if (config instanceof RefreshableInstanceConfig) {
28             // Refresh AWS data center info, and return up to date address
29             defaultAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(false);
30         } else {
31             defaultAddress = config.getHostName(false);
32         }
33 
34         // fail safe
35         if (defaultAddress == null || defaultAddress.isEmpty()) {
36             defaultAddress = config.getIpAddress();
37         }
38 
39         // 設置屬性
40         builder.setNamespace(config.getNamespace())
41                 .setInstanceId(instanceId)
42                 .setAppName(config.getAppname())
43                 .setAppGroupName(config.getAppGroupName())
44                 .setDataCenterInfo(config.getDataCenterInfo())
45                 .setIPAddr(config.getIpAddress())
46                 .setHostName(defaultAddress)
47                 .setPort(config.getNonSecurePort())
48                 .enablePort(PortType.UNSECURE, config.isNonSecurePortEnabled())
49                 .setSecurePort(config.getSecurePort())
50                 .enablePort(PortType.SECURE, config.getSecurePortEnabled())
51                 .setVIPAddress(config.getVirtualHostName())
52                 .setSecureVIPAddress(config.getSecureVirtualHostName())
53                 .setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
54                 .setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl())
55                 .setASGName(config.getASGName())
56                 .setHealthCheckUrls(config.getHealthCheckUrlPath(),
57                         config.getHealthCheckUrl(), config.getSecureHealthCheckUrl());
58 
59 
60         // Start off with the STARTING state to avoid traffic
61         if (!config.isInstanceEnabledOnit()) {
62             InstanceStatus initialStatus = InstanceStatus.STARTING;
63             LOG.info("Setting initial instance status as: {}", initialStatus);
64             builder.setStatus(initialStatus);
65         } else {
66             LOG.info("Setting initial instance status as: {}. This may be too early for the instance to advertise "
67                      + "itself as available. You would instead want to control this via a healthcheck handler.",
68                      InstanceStatus.UP);
69         }
70 
71         // Add any user-specific metadata information
72         for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {
73             String key = mapEntry.getKey();
74             String value = mapEntry.getValue();
75             // only add the metadata if the value is present
76             if (value != null && !value.isEmpty()) {
77                 builder.add(key, value);
78             }
79         }
80 
81         // 調用 build 方法做屬性校驗並創建 InstanceInfo 實例
82         instanceInfo = builder.build();
83         instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
84     }
85     return instanceInfo;
86 }
View Code

LeaseInfo 就是續約信息,可以看到主要的兩個配置就是續約間隔時間和多久未續約認為實例過期,實例過期就會被剔除。然后就是基於 config 設置 InstanceInfo,就是實例信息,包含了實例ID、主機名稱、端口、LeaseInfo 等等。

5、注冊中心構造客戶端 DiscoveryClient

在集群模式下,eureka server 也會作為客戶端注冊到其它注冊中心,此時,它本身就是一個 eureka client。因此會去構建 EurekaClient,其默認實現類是 DiscoveryClient。DiscoveryClient 包含了 eureka 客戶端的大部分核心功能,比如服務注冊、續約、維持心跳、拉取注冊表等。

一步步進入到DiscoveryClient最復雜的那個構造方法,我們先整體分析下做了哪些事情,抓大放小,很多組件的細節等后面分析具體功能的時候再來看。

  • 將 EurekaClientConfig、EurekaInstanceConfig、EurekaTransportConfig、InstanceInfo、ApplicationInfoManager 等保存到本地變量中
  • 如果要獲取注冊表,就創建一個注冊表狀態度量器
  • 如果要注冊到注冊中心,就創建一個心跳狀態度量器
  • 如果不獲取注冊表且不注冊到注冊中心,就不會創建調度器、心跳線程池這些了,釋放一些資源
  • 如果要注冊到注冊中心且要抓取注冊表,就初始化一些調度的資源:
    • 創建了支持調度的線程池,有兩個核心線程,從后面可以看出,主要就是處理心跳和緩存刷新的任務
    • 創建了維持心跳的線程池,核心線程數為1,最大線程數配置默認為5
    • 創建了刷新緩存的線程池,核心線程數為1,最大線程數配置默認為5
    • 創建了eureka client 與 eureka server 進行網絡通信的組件 EurekaTransport,並進行了一些初始化 。EurekaTransport 里的客戶端主要就是封裝了對 server 的 api 調用接口,便於調用
    • 接着,如果要抓取注冊表,就會抓取注冊表了,fetchRegistry 里面可以看到是分為全量抓取和增量抓取的,第一次啟動的時候就是全量抓取注冊表
  • 開始初始化調度任務:
    • 如果要抓取注冊表,就創建刷新緩存的任務,並開始調度,默認每隔30秒抓取一次注冊表
    • 如果要注冊到注冊中心,就創建發送心跳的任務,並開始調度,默認每隔30秒發送一次心跳
    • 如果要注冊到注冊中心,還會創建實例副本傳播器(內部也是一個定時調度任務)、實例狀態變更的監聽器
  1 DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
  2                 Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
  3     if (args != null) {
  4         this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
  5         this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
  6         this.eventListeners.addAll(args.getEventListeners());
  7         this.preRegistrationHandler = args.preRegistrationHandler;
  8     } else {
  9         this.healthCheckCallbackProvider = null;
 10         this.healthCheckHandlerProvider = null;
 11         this.preRegistrationHandler = null;
 12     }
 13 
 14     // 將實例信息、配置信息保存到本地
 15     this.applicationInfoManager = applicationInfoManager;
 16     InstanceInfo myInfo = applicationInfoManager.getInfo();
 17     clientConfig = config;
 18     staticClientConfig = clientConfig;
 19     transportConfig = config.getTransportConfig();
 20     instanceInfo = myInfo;
 21     if (myInfo != null) {
 22         appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
 23     } else {
 24         logger.warn("Setting instanceInfo to a passed in null value");
 25     }
 26 
 27     this.backupRegistryProvider = backupRegistryProvider;
 28     this.endpointRandomizer = endpointRandomizer;
 29     this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
 30     localRegionApps.set(new Applications());
 31 
 32 
 33     fetchRegistryGeneration = new AtomicLong(0);
 34     remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
 35     // 從遠程拉取注冊表的地址數組,使用的原子類,在運行中可能會動態更新地址
 36     remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
 37 
 38     // 如果要獲取注冊表,就會注冊狀態監視器
 39     if (config.shouldFetchRegistry()) {
 40         this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
 41     } else {
 42         this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
 43     }
 44 
 45     // 如果要注冊到 eureka-server,就會創建心跳狀態監視器
 46     if (config.shouldRegisterWithEureka()) {
 47         this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
 48     } else {
 49         this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
 50     }
 51 
 52     logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
 53 
 54     // 如果不注冊到注冊中心,且不拉取注冊表,就不創建調度器、線程池等資源了
 55     if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
 56         logger.info("Client configured to neither register nor query for data.");
 57         scheduler = null;
 58         heartbeatExecutor = null;
 59         cacheRefreshExecutor = null;
 60         eurekaTransport = null;
 61         instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
 62 
 63         // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
 64         // to work with DI'd DiscoveryClient
 65         DiscoveryManager.getInstance().setDiscoveryClient(this);
 66         DiscoveryManager.getInstance().setEurekaClientConfig(config);
 67 
 68         initTimestampMs = System.currentTimeMillis();
 69         initRegistrySize = this.getApplications().size();
 70         registrySize = initRegistrySize;
 71         logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
 72                 initTimestampMs, initRegistrySize);
 73 
 74         return;  // no need to setup up an network tasks and we are done
 75     }
 76 
 77     try {
 78         // 創建定時調度器,默認有2個核心線程,主要處理心跳任務和緩存刷新任務
 79         scheduler = Executors.newScheduledThreadPool(2,
 80                 new ThreadFactoryBuilder()
 81                         .setNameFormat("DiscoveryClient-%d")
 82                         .setDaemon(true)
 83                         .build());
 84 
 85         // 維持心跳的線程池,一個核心線程,最大線程數默認5。
 86         // 注意其使用的隊列是 SynchronousQueue 隊列,這個隊列只能放一個任務,一個線程將任務取走后,才能放入下一個任務,否則只能阻塞。
 87         heartbeatExecutor = new ThreadPoolExecutor(
 88                 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
 89                 new SynchronousQueue<Runnable>(),
 90                 new ThreadFactoryBuilder()
 91                         .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
 92                         .setDaemon(true)
 93                         .build()
 94         );  // use direct handoff
 95 
 96         // 刷新緩存的線程池,一個核心線程,最大線程數據默認為5
 97         cacheRefreshExecutor = new ThreadPoolExecutor(
 98                 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
 99                 new SynchronousQueue<Runnable>(),
100                 new ThreadFactoryBuilder()
101                         .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
102                         .setDaemon(true)
103                         .build()
104         );  // use direct handoff
105 
106         // eureka http 調用客戶端,支持 eureka client 與 eureka server 之間的通信
107         eurekaTransport = new EurekaTransport();
108         // 初始化 eurekaTransport
109         scheduleServerEndpointTask(eurekaTransport, args);
110 
111         AzToRegionMapper azToRegionMapper;
112         if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
113             azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
114         } else {
115             azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
116         }
117         if (null != remoteRegionsToFetch.get()) {
118             azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
119         }
120         instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
121     } catch (Throwable e) {
122         throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
123     }
124 
125     if (clientConfig.shouldFetchRegistry()) {
126         try {
127             // 拉取注冊表:全量抓取和增量抓取
128             boolean primaryFetchRegistryResult = fetchRegistry(false);
129             if (!primaryFetchRegistryResult) {
130                 logger.info("Initial registry fetch from primary servers failed");
131             }
132             boolean backupFetchRegistryResult = true;
133             if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
134                 backupFetchRegistryResult = false;
135                 logger.info("Initial registry fetch from backup servers failed");
136             }
137             if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
138                 throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
139             }
140         } catch (Throwable th) {
141             logger.error("Fetch registry error at startup: {}", th.getMessage());
142             throw new IllegalStateException(th);
143         }
144     }
145 
146     // call and execute the pre registration handler before all background tasks (inc registration) is started
147     if (this.preRegistrationHandler != null) {
148         this.preRegistrationHandler.beforeRegistration();
149     }
150 
151     if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
152         try {
153             if (!register() ) {
154                 throw new IllegalStateException("Registration error at startup. Invalid server response.");
155             }
156         } catch (Throwable th) {
157             logger.error("Registration error at startup: {}", th.getMessage());
158             throw new IllegalStateException(th);
159         }
160     }
161 
162     // 初始化一些調度任務:刷新緩存的調度任務、發送心跳的調度任務、實例副本傳播器
163     initScheduledTasks();
164 
165     try {
166         Monitors.registerObject(this);
167     } catch (Throwable e) {
168         logger.warn("Cannot register timers", e);
169     }
170 
171     // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
172     // to work with DI'd DiscoveryClient
173     DiscoveryManager.getInstance().setDiscoveryClient(this);
174     DiscoveryManager.getInstance().setEurekaClientConfig(config);
175 
176     // 初始化的時間
177     initTimestampMs = System.currentTimeMillis();
178     initRegistrySize = this.getApplications().size();
179     registrySize = initRegistrySize;
180     logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
181             initTimestampMs, initRegistrySize);
182 }
183 
184 ////////////////////////////////////////////////////////////////////
185 
186 private void initScheduledTasks() {
187     if (clientConfig.shouldFetchRegistry()) {
188         // 抓取注冊表的間隔時間,默認30秒
189         int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
190         // 刷新緩存調度器延遲時間擴大倍數,在任務超時的時候,將擴大延遲時間
191         // 這在出現網絡抖動、eureka-sever 不可用時,可以避免頻繁發起無效的調度
192         int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
193         // 注冊表刷新的定時任務
194         cacheRefreshTask = new TimedSupervisorTask(
195                 "cacheRefresh",
196                 scheduler,
197                 cacheRefreshExecutor,
198                 registryFetchIntervalSeconds,
199                 TimeUnit.SECONDS,
200                 expBackOffBound,
201                 new CacheRefreshThread() // 刷新注冊表的任務
202         );
203         // 30秒后開始調度刷新注冊表的任務
204         scheduler.schedule(
205                 cacheRefreshTask,
206                 registryFetchIntervalSeconds, TimeUnit.SECONDS);
207     }
208 
209     if (clientConfig.shouldRegisterWithEureka()) {
210         // 續約間隔時間,默認30秒
211         int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
212         // 心跳調度器的延遲時間擴大倍數,默認10
213         int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
214         logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
215 
216         // 心跳的定時任務
217         heartbeatTask = new TimedSupervisorTask(
218                 "heartbeat",
219                 scheduler,
220                 heartbeatExecutor,
221                 renewalIntervalInSecs,
222                 TimeUnit.SECONDS,
223                 expBackOffBound,
224                 new HeartbeatThread()
225         );
226         // 30秒后開始調度心跳的任務
227         scheduler.schedule(
228                 heartbeatTask,
229                 renewalIntervalInSecs, TimeUnit.SECONDS);
230 
231         // 實例副本傳播器,用於定時更新自己狀態
232         instanceInfoReplicator = new InstanceInfoReplicator(
233                 this,
234                 instanceInfo,
235                 clientConfig.getInstanceInfoReplicationIntervalSeconds(),
236                 2); // burstSize
237 
238         // 實例狀態變更的監聽器
239         statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
240             @Override
241             public String getId() {
242                 return "statusChangeListener";
243             }
244 
245             @Override
246             public void notify(StatusChangeEvent statusChangeEvent) {
247                 if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) {
248                     logger.error("Saw local status change event {}", statusChangeEvent);
249                 } else {
250                     logger.info("Saw local status change event {}", statusChangeEvent);
251                 }
252                 instanceInfoReplicator.onDemandUpdate();
253             }
254         };
255 
256         // 向 ApplicationInfoManager 注冊監聽器
257         if (clientConfig.shouldOnDemandUpdateStatusChange()) {
258             applicationInfoManager.registerStatusChangeListener(statusChangeListener);
259         }
260 
261         // 啟動副本傳播器,默認延遲時間40秒
262         instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
263     } else {
264         logger.info("Not registering with Eureka server per configuration");
265     }
266 }
View Code

6、定時任務監管器的設計

可以看到,eureka client 為了定時發送心跳以及定時抓取注冊表,使用了定時任務和調度器,我覺得它這里的定時調度的設計思想是可以參考和借鑒的。

以心跳任務的這段代碼為例:

 1 if (clientConfig.shouldFetchRegistry()) {
 2     // 抓取注冊表的間隔時間,默認30秒
 3     int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
 4     // 刷新緩存調度器延遲時間擴大倍數,在任務超時的時候,將擴大延遲時間
 5     // 這在出現網絡抖動、eureka-sever 不可用時,可以避免頻繁發起無效的調度
 6     int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
 7     // 注冊表刷新的定時任務
 8     cacheRefreshTask = new TimedSupervisorTask(
 9             "cacheRefresh",
10             scheduler,
11             cacheRefreshExecutor,
12             registryFetchIntervalSeconds,
13             TimeUnit.SECONDS,
14             expBackOffBound,
15             new CacheRefreshThread() // 刷新注冊表的任務
16     );
17     // 30秒后開始調度刷新注冊表的任務
18     scheduler.schedule(
19             cacheRefreshTask,
20             registryFetchIntervalSeconds, TimeUnit.SECONDS);
21 }

上面這段代碼其實並不復雜,主要就是創建了一個定時任務,然后使用調度器在一定的延遲之后開始調度。但它這里並不是直接使用調度器調度任務(CacheRefreshThread),也不是以一個固定的頻率調度(每隔30秒)。它定義了一個任務的監管器 TimedSupervisorTask,在創建這個監管器的時候,傳入了調度器、要執行的任務、以及間隔時間等參數,然后調度器調度 TimedSupervisorTask。

看 TimedSupervisorTask 的構造方法,主要有以下幾個點:

  • 任務的超時時間等於間隔時間,也就是默認30秒的超時時間,然后延遲時間默認等於超時時間  如果 eureka server down 了,或者網絡問題,就有可能出現超時
  • 設置了最大的延遲時間,默認在超時時間的基礎上擴大10倍,即300秒
  • 最后創建了一些計數器,分別統計成功、超時、拒絕、異常的次數,可以看到,它這里對任務的調度是有做統計的
 1 public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
 2                            int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
 3     this.name = name;
 4     this.scheduler = scheduler;
 5     this.executor = executor;
 6     // 任務超時時間就等於任務調度的間隔時間
 7     this.timeoutMillis = timeUnit.toMillis(timeout);
 8     this.task = task;
 9     // 延遲時間默認為超時時間
10     this.delay = new AtomicLong(timeoutMillis);
11     // 最大延遲時間,默認在超時時間的基礎上擴大10倍
12     this.maxDelay = timeoutMillis * expBackOffBound;
13 
14     // 初始化計數器並注冊
15     successCounter = Monitors.newCounter("success");
16     timeoutCounter = Monitors.newCounter("timeouts");
17     rejectedCounter = Monitors.newCounter("rejectedExecutions");
18     throwableCounter = Monitors.newCounter("throwables");
19     threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
20     Monitors.registerObject(name, this);
21 }

再看 TimedSupervisorTask 的 run 方法:

  • 1)首先將任務異步提交到線程池去執行,它這里並不是直接運行任務,而是異步提交到線程池中,這樣可以實現超時等待,不影響主任務
  • 2)任務如果超時,比如出現網絡延遲、eureka server 不可用等情況,超時了,它這個時候就會認為如果還是30秒后調度,可能 eureka server 還是不可用的狀態,那么就增大延遲時間,那么第一次超時就會在300秒后再調度。如果300秒內 eureka server 可用了,然后有新的服務實例注冊上去了,那這個客戶端就不能及時感知到了,因此我覺得可以將 getCacheRefreshExecutorExponentialBackOffBound 對應的參數適當設置小一點(默認10倍)。
  • 3)如果任務沒有超時,在調度成功后,就會重置延遲時間為默認的超時時間。最后在 finally 中進行下一次的調度。
 1 public void run() {
 2     Future<?> future = null;
 3     try {
 4         // 提交任務到線程池
 5         future = executor.submit(task);
 6         threadPoolLevelGauge.set((long) executor.getActiveCount());
 7         // 阻塞直到任務完成或超時
 8         future.get(timeoutMillis, TimeUnit.MILLISECONDS);
 9         // 任務完成后,重置延遲時間為超時時間,即30秒
10         delay.set(timeoutMillis);
11         threadPoolLevelGauge.set((long) executor.getActiveCount());
12         // 成功次數+1
13         successCounter.increment();
14     } catch (TimeoutException e) {
15         logger.warn("task supervisor timed out", e);
16         // 超時次數+1
17         timeoutCounter.increment();
18 
19         // 如果任務超時了,就會增大延遲時間,當前延遲時間*2,然后取一個最大值
20         long currentDelay = delay.get();
21         long newDelay = Math.min(maxDelay, currentDelay * 2);
22         // 設置為最大的一個延遲時間
23         delay.compareAndSet(currentDelay, newDelay);
24 
25     } catch (RejectedExecutionException e) {
26         if (executor.isShutdown() || scheduler.isShutdown()) {
27             logger.warn("task supervisor shutting down, reject the task", e);
28         } else {
29             logger.warn("task supervisor rejected the task", e);
30         }
31 
32         rejectedCounter.increment();
33     } catch (Throwable e) {
34         if (executor.isShutdown() || scheduler.isShutdown()) {
35             logger.warn("task supervisor shutting down, can't accept the task");
36         } else {
37             logger.warn("task supervisor threw an exception", e);
38         }
39 
40         throwableCounter.increment();
41     } finally {
42         if (future != null) {
43             future.cancel(true);
44         }
45 
46         if (!scheduler.isShutdown()) {
47             // 延遲 delay 時間后,繼續調度任務
48             scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
49         }
50     }
51 }

總結一下這塊設計:

  • 1)首先在遠程調用的時候要考慮到網絡不可用、server 端 down 了等情況導致調用超時,可以使用線程池異步提交任務,實現等待超時機制。
  • 2)超時之后,可以假想服務恢復可用狀態可能需要一定的時間,如果還是按原來的時間間隔調度,可能還是會超時,因此增大延遲時間。如果調用成功,說明已經恢復了,則重置延遲時間。
  • 3)定時任務的調度以一定的延遲時間來循環調度(schedule),延遲時間可以根據實際情況變化,而不是一開始就按一個固定的頻率來調度(scheduleAtFixedRate)。
  • 4)定時任務、線程池里的任務,最好做好任務執行狀態的統計,便於觀察任務的調度情況。

7、構造注冊表

接着構造 PeerAwareInstanceRegistry,從命名來看,這是一個可以感知 eureka 集群的注冊表,就是在集群模式下,eureka server 從其它 server 節點拉取注冊表。它的默認實現類是 PeerAwareInstanceRegistryImpl,繼承自 AbstractInstanceRegistry,就是實例注冊表。

① 構造 PeerAwareInstanceRegistry

進入 PeerAwareInstanceRegistryImpl 的構造方法:

  • 首先是將前面構造的 EurekaServerConfig、EurekaClientConfig、EurekaClient 等傳入構造方法來構造 PeerAwareInstanceRegistry
  • 調用了 super 的構造方法,主要初始化了如下幾個東西:
    • 保存最近下線實例的循環隊列
    • 保存最近注冊實例的循環隊列
    • 最近一分鍾續約次數的計數器
    • 定時任務剔除 recentlyChangedQueue 中的實例
  • 然后創建了一個最近一分鍾集群同步次數的計數器 numberOfReplicationsLastMin。MeasuredRate 我們到后面再來分析它的設計。
 1 public PeerAwareInstanceRegistryImpl(
 2         EurekaServerConfig serverConfig,
 3         EurekaClientConfig clientConfig,
 4         ServerCodecs serverCodecs,
 5         EurekaClient eurekaClient
 6 ) {
 7     super(serverConfig, clientConfig, serverCodecs);
 8     this.eurekaClient = eurekaClient;
 9     // 最近一分鍾集群同步的次數計數器
10     this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
11     // We first check if the instance is STARTING or DOWN, then we check explicit overrides,
12     // then we check the status of a potentially existing lease.
13     this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
14             new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
15 }
16 
17 
18 ///////////////////////////////////////////////
19 
20 
21 protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
22     this.serverConfig = serverConfig;
23     this.clientConfig = clientConfig;
24     this.serverCodecs = serverCodecs;
25     // 最近下線的循環隊列
26     this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
27     // 最近注冊的循環隊列
28     this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);
29 
30     // 最近一分鍾續約的計數器
31     this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
32     // 一個定時調度任務,定時剔除最近改變隊列中過期的實例
33     this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
34             serverConfig.getDeltaRetentionTimerIntervalInMs(),
35             serverConfig.getDeltaRetentionTimerIntervalInMs());
36 }
View Code

這塊的具體細節等后面分析具體功能的時候再來看,我們先知道有這些隊列、計數器就行了。

② 循環隊列 CircularQueue 的設計

從構造方法可以看到,它使用了循環隊列來保存最近下線和最近注冊的實例信息,容量固定為1000,這樣就把最近的實例數量控制在1000以內。

CircularQueue 是它自定義的一個循環隊列,繼承自 AbstractQueue。其內部其實就是代理了 ArrayBlockingQueue,然后重寫了入隊的 offer 方法,當隊列滿了,就取出頭部的一個元素,然后再放到隊列尾部。

 1 static class CircularQueue<E> extends AbstractQueue<E> {
 2     private final ArrayBlockingQueue<E> delegate;
 3     private final int capacity;
 4 
 5     public CircularQueue(int capacity) {
 6         this.capacity = capacity;
 7         this.delegate = new ArrayBlockingQueue<>(capacity);
 8     }
 9 
10     @Override
11     public Iterator<E> iterator() {
12         return delegate.iterator();
13     }
14 
15     @Override
16     public int size() {
17         return delegate.size();
18     }
19 
20     @Override
21     public boolean offer(E e) {
22         // 如果隊列滿了,就取出頭部的一個元素,然后再放到尾部
23         while (!delegate.offer(e)) {
24             delegate.poll();
25         }
26         return true;
27     }
28 
29     @Override
30     public E poll() {
31         return delegate.poll();
32     }
33 
34     @Override
35     public E peek() {
36         return delegate.peek();
37     }
38 }

8、創建 Eureka Server 上下文並初始化

接下來先是創建了 PeerEurekaNodes,應該就是代表 eureka 集群的。然后基於前面創建的一些東西創建 eureka server 上下文 EurekaServerContext,從 DefaultEurekaServerContext 構造方法進去可以看到,只是將前面構造的東西封裝起來,便於全局使用。然后將 serverContext 放到 EurekaServerContextHolder 中,這樣其它地方就可以通過這個 holder 獲取 serverContext 了。

接着就是初始化eureka server上下文:

  • 啟動 eureka 集群:
    • 主要是啟動一個定時任務(間隔時間默認10分鍾)更新 eureka 集群節點的信息,根據配置的 eureka server 地址更新 PeerEurekaNode,這樣當有 eureka server 下線或上線后,就可以及時感知到其它 server 節點。PeerEurekaNode 主要就是用於集群節點間的數據同步,這塊后面分析集群的時候再具體分析。
  • 注冊表初始化:
    • 首先啟動了前面創建的計數器:numberOfReplicationsLastMin
    • 初始化響應緩存,eureka server 構造了一個多級緩存來響應客戶端抓取注冊表的請求,這個多級緩存的設計就是響應頻繁抓取注冊表請求的核心所在,等后面分析客戶端抓取注冊表的時候再具體分析
    • 定時調度任務更新續約閾值,主要就是更新 numberOfRenewsPerMinThreshold 這個值,即每分鍾續約次數,等分析續約的時候再來分析
    • 初始化 RemoteRegionRegistry,猜測是跟 eureka 多個區域(region)部署有關的
 1 public void initialize() {
 2     logger.info("Initializing ...");
 3     // 啟動eureka集群
 4     peerEurekaNodes.start();
 5     try {
 6         // 注冊表初始化
 7         registry.init(peerEurekaNodes);
 8     } catch (Exception e) {
 9         throw new RuntimeException(e);
10     }
11     logger.info("Initialized");
12 }

PeerEurekaNodes 的 start 方法:

 1 public void start() {
 2     // 單個線程的線程池
 3     taskExecutor = Executors.newSingleThreadScheduledExecutor(
 4             new ThreadFactory() {
 5                 @Override
 6                 public Thread newThread(Runnable r) {
 7                     Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
 8                     thread.setDaemon(true);
 9                     return thread;
10                 }
11             }
12     );
13     try {
14         // 根據集群地址更新 PeerEurekaNode,PeerEurekaNode 就包含了調度其它注冊中心的客戶端
15         updatePeerEurekaNodes(resolvePeerUrls());
16         Runnable peersUpdateTask = new Runnable() {
17             @Override
18             public void run() {
19                 try {
20                     updatePeerEurekaNodes(resolvePeerUrls());
21                 } catch (Throwable e) {
22                     logger.error("Cannot update the replica Nodes", e);
23                 }
24 
25             }
26         };
27         // 定時跟新集群信息 PeerEurekaNode,如果有eureka-server不可用了,就可以及時下線,或者新上線了eureka-server,可以及時感知到
28         taskExecutor.scheduleWithFixedDelay(
29                 peersUpdateTask,
30                 serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
31                 serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
32                 TimeUnit.MILLISECONDS
33         );
34     } catch (Exception e) {
35         throw new IllegalStateException(e);
36     }
37     for (PeerEurekaNode node : peerEurekaNodes) {
38         logger.info("Replica node URL:  {}", node.getServiceUrl());
39     }
40 }
View Code

PeerAwareInstanceRegistryImpl 的 init 方法:

 1 public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
 2     // 啟動計數器
 3     this.numberOfReplicationsLastMin.start();
 4     this.peerEurekaNodes = peerEurekaNodes;
 5     // 初始化響應緩存,eureka server 構造了一個多級緩存來響應客戶端抓取注冊表的請求
 6     initializedResponseCache();
 7     // 定時調度任務更新續約閥值,主要就是更新 numberOfRenewsPerMinThreshold 這個值,即每分鍾續約次數
 8     scheduleRenewalThresholdUpdateTask();
 9     // 初始化 RemoteRegionRegistry
10     initRemoteRegionRegistry();
11 
12     try {
13         Monitors.registerObject(this);
14     } catch (Throwable e) {
15         logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
16     }
17 }
View Code

9、完成 Eureka Server 初始化

接下來看最后幾步:

  • 首先調用 registry.syncUp() 將 EurekaClient 本地的實例同步到注冊表,在集群模式下,eureka server 也是一個客戶端,因此會獲取到其它注冊中心的注冊表同步到當前 server 的注冊表中。它默認會重試5次,每次間隔30秒。在單機模式下,應該將重試次數設置為 0。
  • 然后調用 registry.openForTraffic 做最后的一些初始化:
    • 更新每分鍾續約閾值
    • 設置實例狀態
    • 啟動統計最近一分鍾續約次數的計數器
    • 啟動定時任務剔除下線的實例,定時任務默認每隔60秒調度一次
  • 最后一步就是注冊 eureka 自身的一些監控統計

syncUp 方法:

 1 public int syncUp() {
 2     // Copy entire entry from neighboring DS node
 3     int count = 0;
 4 
 5     // 注冊表同步重試次數,默認5次
 6     for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
 7         if (i > 0) {
 8             try {
 9                 // 同步重試時間,默認30秒
10                 Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
11             } catch (InterruptedException e) {
12                 logger.warn("Interrupted during registry transfer..");
13                 break;
14             }
15         }
16         Applications apps = eurekaClient.getApplications();
17         for (Application app : apps.getRegisteredApplications()) {
18             for (InstanceInfo instance : app.getInstances()) {
19                 try {
20                     if (isRegisterable(instance)) {
21                         // 注冊實例
22                         register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
23                         count++;
24                     }
25                 } catch (Throwable t) {
26                     logger.error("During DS init copy", t);
27                 }
28             }
29         }
30     }
31     return count;
32 }
View Code

openForTraffic 方法:

 1 @Override
 2 public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
 3     // 期望的客戶端每分鍾的續約次數
 4     this.expectedNumberOfClientsSendingRenews = count;
 5     // 更新每分鍾續約閥值
 6     updateRenewsPerMinThreshold();
 7     logger.info("Got {} instances from neighboring DS node", count);
 8     logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
 9     this.startupTime = System.currentTimeMillis();
10     if (count > 0) {
11         this.peerInstancesTransferEmptyOnStartup = false;
12     }
13     DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
14     boolean isAws = Name.Amazon == selfName;
15     if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
16         logger.info("Priming AWS connections for all replicas..");
17         primeAwsReplicas(applicationInfoManager);
18     }
19     logger.info("Changing status to UP");
20     // 設置實例狀態為已啟動
21     applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
22     super.postInit();
23 }
24 
25 ///////////////////////////////////////
26 
27 protected void postInit() {
28     // 啟動 統計最近一分鍾續約次數的計數器
29     renewsLastMin.start();
30     if (evictionTaskRef.get() != null) {
31         evictionTaskRef.get().cancel();
32     }
33     // 定時剔除任務
34     evictionTaskRef.set(new EvictionTask());
35     evictionTimer.schedule(evictionTaskRef.get(),
36             serverConfig.getEvictionIntervalTimerInMs(),
37             serverConfig.getEvictionIntervalTimerInMs());
38 }
View Code

10、Eureka Server 啟動流程圖

下面通過一張圖來展示下 eureka server 的啟動初始化流程。

四、Eureka Client 啟動初始化

eureka client 的啟動初始化我們看 eureka-examples 模塊下的 ExampleEurekaClient 這個類,它的 main 方法中就模擬了作為一個 eureka client 啟動初始化,並向注冊中心發送請求。

eureka server 的初始化中其實已經包含了客戶端的初始化,可以看出,客戶端的初始化主要有如下的一些東西:

  • 讀取 eureka-client.properties 配置文件,創建 EurekaInstanceConfig
  • 基於 InstanceConfig 創建實例信息 InstanceInfo
  • 基於 InstanceConfig 和 InstanceInfo 創建應用實例管理器 ApplicationInfoManager
  • 讀取 eureka-client.properties 配置文件,創建 EurekaClientConfig
  • 基於應用實例管理器和 clientConfig 創建 EurekaClient(DiscoveryClient),初始化流程跟 eureka server 初始化流程中 DiscoveryClient 的創建是一樣的
 1 public static void main(String[] args) {
 2     ExampleEurekaClient sampleClient = new ExampleEurekaClient();
 3 
 4     // 基於實例配置和實例信息創建應用實例管理器
 5     ApplicationInfoManager applicationInfoManager = initializeApplicationInfoManager(new MyDataCenterInstanceConfig());
 6     // 基於應用實例管理器和客戶端配置創建 EurekaClient(DiscoveryClient)
 7     EurekaClient client = initializeEurekaClient(applicationInfoManager, new DefaultEurekaClientConfig());
 8 
 9     // use the client
10     sampleClient.sendRequestToServiceUsingEureka(client);
11 
12     // shutdown the client
13     eurekaClient.shutdown();
14 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM