SpringCloud 源碼系列(4)—— 負載均衡 Ribbon(上)


SpringCloud 源碼系列(1)—— 注冊中心 Eureka(上)

SpringCloud 源碼系列(2)—— 注冊中心 Eureka(中)

SpringCloud 源碼系列(3)—— 注冊中心 Eureka(下)

SpringCloud 源碼系列(4)—— 負載均衡 Ribbon(上)

SpringCloud 源碼系列(5)—— 負載均衡 Ribbon(下)

SpringCloud 源碼系列(6)—— 聲明式服務調用 Feign

 

一、負載均衡

1、RestTemplate

在研究 eureka 源碼上篇中,我們在 demo-consumer 消費者服務中定義了用 @LoadBalanced 標記的 RestTemplate,然后使用 RestTemplate 通過服務名的形式來調用遠程服務 demo-producer,然后請求會輪詢到兩個 demo-producer 實例上。

RestTemplate 是 Spring Resources 中一個訪問第三方 RESTful API 接口的網絡請求框架。RestTemplate 是用來消費 REST 服務的,所以 RestTemplate 的主要方法都與 REST 的 Http協議的一些方法緊密相連,例如 HEAD、GET、POST、PUT、DELETE 和 OPTIONS 等方法,這些方法在 RestTemplate 類對應的方法為 headForHeaders()、getForObject()、postForObject()、put() 和 delete() 等。

RestTemplate 本身是不具備負載均衡的能力的,如果 RestTemplate 未使用 @LoadBalanced 標記,就通過服務名的形式來調用,必然會報錯。用 @LoadBalanced 標記后,調用 RestTemplate 的 REST 方法就會通過負載均衡的方式通過一定的策略路由到某個服務實例上,底層負責負載均衡的組件就是 Ribbon。后面我們再來看 @LoadBalanced 是如何讓 RestTemplate 具備負載均衡的能力的。

 1 @SpringBootApplication
 2 public class ConsumerApplication {
 3 
 4     @Bean
 5     @LoadBalanced
 6     public RestTemplate restTemplate() {
 7         return new RestTemplate();
 8     }
 9 
10     public static void main(String[] args) {
11         SpringApplication.run(ConsumerApplication.class, args);
12     }
13 }
14 
15 @RestController
16 public class DemoController {
17     private final Logger logger = LoggerFactory.getLogger(getClass());
18 
19     @Autowired
20     private RestTemplate restTemplate;
21 
22     @GetMapping("/v1/id")
23     public ResponseEntity<String> getId() {
24         ResponseEntity<String> result = restTemplate.getForEntity("http://demo-producer/v1/uuid", String.class);
25         String uuid = result.getBody();
26         logger.info("request id: {}", uuid);
27         return ResponseEntity.ok(uuid);
28     }
29 }

2、Ribbon 與負載均衡

① 負載均衡

負載均衡是指將負載分攤到多個執行單元上,負載均衡主要可以分為集中式負載均衡與進程內負載均衡:

  • 集中式負載均衡指位於因特網與執行單元之間,並負責把網絡請求轉發到各個執行單元上,比如 Nginx、F5。集中式負載均衡也可以稱為服務端負載均衡。
  • 進程內負載均衡是將負載均衡邏輯集成到客戶端上,客戶端維護了一份服務提供者的實例列表,實例列表一般會從注冊中心比如 Eureka 中獲取。有了實例列表,就可以通過負載均衡策略將請求分攤給多個服務提供者,從而達到負載均衡的目的。進程內負載均衡一般也稱為客戶端負載均衡。

Ribbon 是一個客戶端負載均衡器,可以很好地控制 HTTP 和 TCP 客戶端的負載均衡行為。Ribbon 是 Netflix 公司開源的一個負載均衡組件,已經整合到 SpringCloud 生態中,它在 Spring Cloud 生態內是一個不可缺少的組件,少了它,服務便不能橫向擴展。

② Ribbon 模塊

Ribbon 有很多子模塊,官方文檔中說明,目前 Netflix 公司主要用於生產環境的 Ribbon 子模塊如下:

  • ribbon-loadbalancer:可以獨立使用或與其他模塊一起使用的負載均衡器 API。
  • ribbon-eureka:Ribbon 結合 Eureka 客戶端的 API,為負載均衡器提供動態服務注冊列表信息。
  • ribbon-core:Ribbon 的核心API。

③ springcloud 與 ribbon 整合

與 eureka 整合到 springcloud 類似,springcloud 提供了對應的 spring-cloud-starter-netflix-eureka-client(server) 依賴包,ribbon 則整合到了 spring-cloud-starter-netflix-ribbon 中。一般也不需要單獨引入 ribbon 的依賴包,spring-cloud-starter-netflix-eureka-client 中已經依賴了 spring-cloud-starter-netflix-ribbon。因此我們引入了 spring-cloud-starter-netflix-eureka-client 就可以使用 Ribbon 的功能了。

④ Ribbon 與 RestTemplate 整合使用

在 Spring Cloud 構建的微服務系統中,Ribbon 作為服務消費者的負載均衡器,有兩種使用方式,一種是和 RestTemplate 相結合,另一種是和 Feign 相結合。前面已經演示了帶有負載均衡的 RestTemplate 的使用,下面用一張圖來看看 RestTemplate 基於 Ribbon 的遠程調用。

二、RestTemplate 負載均衡

1、@LoadBalanced 注解

以 RestTemplate 為切入點,來看 Ribbon 的負載均衡核心原理。那么首先就要先看看 @LoadBalanced 注解如何讓 RestTemplate 具備負載均衡的能力了。

首先看 @LoadBalanced 這個注解的定義,可以得到如下信息:

  • 這個注解使用 @Qualifier 標記,其它地方就可以注入 LoadBalanced 注解的 bean 對象。
  • 從注釋中可以了解到,@LoadBalanced 標記的 RestTemplate 或 WebClient 將使用 LoadBalancerClient 來配置 bean 對象。
 1 /**
 2  * Annotation to mark a RestTemplate or WebClient bean to be configured to use a LoadBalancerClient.
 3  * @author Spencer Gibb
 4  */
 5 @Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
 6 @Retention(RetentionPolicy.RUNTIME)
 7 @Documented
 8 @Inherited
 9 @Qualifier
10 public @interface LoadBalanced {
11 
12 }

注意 @LoadBalanced 是 spring-cloud-commons 模塊下 loadbalancer 包下的。

2、RestTemplate  負載均衡自動化配置

在 @LoadBalanced 同包下,有一個 LoadBalancerAutoConfiguration 自動化配置類,從注釋也可以看出,這是客戶端負載均衡 Ribbon 的自動化配置類。

從這個自動化配置類可以得到如下信息:

  • 首先要有 RestTemplate 的依賴和定義了 LoadBalancerClient 對象的前提下才會觸發這個自動化配置類,這也對應了前面,RestTemplate 要用 LoadBalancerClient  來配置。
  • 接着可以看到這個類注入了帶有 @LoadBalanced 標識的 RestTemplate 對象,就是要對這部分對象增加負載均衡的能力。
  • 從 SmartInitializingSingleton 的構造中可以看到,就是在 bean 初始化完成后,用 RestTemplateCustomizer 定制化 RestTemplate。
  • 再往下可以看到,RestTemplateCustomizer 其實就是向 RestTemplate 中添加了 LoadBalancerInterceptor 這個攔截器。
  • 而 LoadBalancerInterceptor 的構建又需要 LoadBalancerClient 和 LoadBalancerRequestFactory,LoadBalancerRequestFactory 則通過 LoadBalancerClient 和 LoadBalancerRequestTransformer 構造完成。
 1 /**
 2  * Auto-configuration for Ribbon (client-side load balancing).
 3  */
 4 @Configuration(proxyBeanMethods = false)
 5 @ConditionalOnClass(RestTemplate.class) // 有 RestTemplate 的依賴
 6 @ConditionalOnBean(LoadBalancerClient.class) // 定義了 LoadBalancerClient 的 bean 對象
 7 @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
 8 public class LoadBalancerAutoConfiguration {
 9 
10     // 注入 @LoadBalanced 標記的 RestTemplate 對象
11     @LoadBalanced
12     @Autowired(required = false)
13     private List<RestTemplate> restTemplates = Collections.emptyList();
14 
15     @Autowired(required = false)
16     private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
17 
18     @Bean
19     public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
20             final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
21         return () -> restTemplateCustomizers.ifAvailable(customizers -> {
22             for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
23                 for (RestTemplateCustomizer customizer : customizers) {
24                     // 利用 RestTemplateCustomizer 定制化 restTemplate
25                     customizer.customize(restTemplate);
26                 }
27             }
28         });
29     }
30 
31     @Bean
32     @ConditionalOnMissingBean
33     public LoadBalancerRequestFactory loadBalancerRequestFactory(
34             LoadBalancerClient loadBalancerClient) {
35         return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
36     }
37 
38     @Configuration(proxyBeanMethods = false)
39     @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
40     static class LoadBalancerInterceptorConfig {
41 
42         // 創建 LoadBalancerInterceptor 需要 LoadBalancerClient 和 LoadBalancerRequestFactory
43         @Bean
44         public LoadBalancerInterceptor ribbonInterceptor(
45                 LoadBalancerClient loadBalancerClient,
46                 LoadBalancerRequestFactory requestFactory) {
47             return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
48         }
49 
50         @Bean
51         @ConditionalOnMissingBean
52         public RestTemplateCustomizer restTemplateCustomizer(
53                 final LoadBalancerInterceptor loadBalancerInterceptor) {
54             return restTemplate -> {
55                 List<ClientHttpRequestInterceptor> list = new ArrayList<>(
56                         restTemplate.getInterceptors());
57                 // 向 restTemplate 添加 LoadBalancerInterceptor 攔截器
58                 list.add(loadBalancerInterceptor);
59                 restTemplate.setInterceptors(list);
60             };
61         }
62 
63     }
64 }

3、RestTemplate 攔截器 LoadBalancerInterceptor

LoadBalancerAutoConfiguration 自動化配置主要就是給 RestTemplate 添加了一個負載均衡攔截器 LoadBalancerInterceptor。從 setInterceptors 的參數可以看出,攔截器的類型是 ClientHttpRequestInterceptor,如果我們想定制化 RestTemplate,就可以實現這個接口來定制化,然后還可以用 @Order 標記攔截器的先后順序。

1 public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) {
2     if (this.interceptors != interceptors) {
3         this.interceptors.clear();
4         this.interceptors.addAll(interceptors);
5         // 根據 @Order 注解的順序排序
6         AnnotationAwareOrderComparator.sort(this.interceptors);
7     }
8 }

interceptors 攔截器是在 RestTemplate 的父類 InterceptingHttpAccessor 中的, RestTemplate 的類結構如下圖所示。

從 restTemplate.getForEntity("http://demo-producer/v1/uuid", String.class) 這個GET請求進去看看,是如何使用 LoadBalancerInterceptor 的。一步步進去,可以看到最終是進入到 doExecute 這個方法了。

在 doExecute 方法中,首先根據 url、method 創建一個 ClientHttpRequest,然后利用 ClientHttpRequest 來發起請求。

 1 protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
 2         @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
 3     ClientHttpResponse response = null;
 4     try {
 5         // 創建一個 ClientHttpRequest
 6         ClientHttpRequest request = createRequest(url, method);
 7         if (requestCallback != null) {
 8             requestCallback.doWithRequest(request);
 9         }
10         // 調用 ClientHttpRequest 的 execute() 方法
11         response = request.execute();
12         // 處理返回結果
13         handleResponse(url, method, response);
14         return (responseExtractor != null ? responseExtractor.extractData(response) : null);
15     }
16     catch (IOException ex) {
17         // ...
18     }
19     finally {
20         if (response != null) {
21             response.close();
22         }
23     }
24 }
25 
26 //////////////////////////////////////
27 
28 protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
29     ClientHttpRequest request = getRequestFactory().createRequest(url, method);
30     initialize(request);
31     if (logger.isDebugEnabled()) {
32         logger.debug("HTTP " + method.name() + " " + url);
33     }
34     return request;
35 }

InterceptingHttpAccessor 中重寫了父類 HttpAccessor 的 getRequestFactory 方法,父類默認的 requestFactory 是 SimpleClientHttpRequestFactory。

重寫后的 getRequestFactory 方法中,如果攔截器不為空,則基於父類默認的 SimpleClientHttpRequestFactory 和攔截器創建了 InterceptingClientHttpRequestFactory。

 1 public ClientHttpRequestFactory getRequestFactory() {
 2     List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
 3     if (!CollectionUtils.isEmpty(interceptors)) {
 4         ClientHttpRequestFactory factory = this.interceptingRequestFactory;
 5         if (factory == null) {
 6             // 傳入 SimpleClientHttpRequestFactory 和 ClientHttpRequestInterceptor 攔截器
 7             factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
 8             this.interceptingRequestFactory = factory;
 9         }
10         return factory;
11     }
12     else {
13         return super.getRequestFactory();
14     }
15 }

也就是說調用了 InterceptingClientHttpRequestFactory 的 createRequest 方法來創建 ClientHttpRequest。進去可以看到,ClientHttpRequest 的實際類型就是 InterceptingClientHttpRequest。

1 protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
2     return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
3 }

InterceptingClientHttpRequest 的類結構如下:

RestTemplate 的 doExecute 中調用 request.execute() 其實是調用了 InterceptingClientHttpRequest 父類 AbstractClientHttpRequest 中的 execute 方法。一步步進去可以發現最終其實是調用了 InterceptingClientHttpRequest 的 executeInternal 方法。

在 InterceptingClientHttpRequest  的 executeInternal 方法中,創建了 InterceptingRequestExecution 來執行請求。在 InterceptingRequestExecution 的 execute 方法中,會先遍歷執行所有攔截器,然后通過 ClientHttpRequest 發起真正的 http 請求。

 1 protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
 2     // 創建 InterceptingRequestExecution
 3     InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
 4     // 請求調用
 5     return requestExecution.execute(this, bufferedOutput);
 6 }
 7 
 8 private class InterceptingRequestExecution implements ClientHttpRequestExecution {
 9 
10     private final Iterator<ClientHttpRequestInterceptor> iterator;
11 
12     public InterceptingRequestExecution() {
13         // 攔截器迭代器
14         this.iterator = interceptors.iterator();
15     }
16 
17     @Override
18     public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
19         if (this.iterator.hasNext()) {
20             ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
21             // 利用攔截器攔截處理,並傳入 InterceptingRequestExecution
22             return nextInterceptor.intercept(request, body, this);
23         }
24         else {
25             // 攔截器遍歷完后開始發起真正的 http 請求
26             HttpMethod method = request.getMethod();
27             Assert.state(method != null, "No standard HTTP method");
28             ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
29             request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
30             if (body.length > 0) {
31                 if (delegate instanceof StreamingHttpOutputMessage) {
32                     StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
33                     streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
34                 }
35                 else {
36                     StreamUtils.copy(body, delegate.getBody());
37                 }
38             }
39             return delegate.execute();
40         }
41     }
42 }

進入到 LoadBalancerInterceptor 的 intercept 攔截方法內,可以看到從請求的原始地址中獲取了服務名稱,然后調用了 loadBalancer 的 execute 方法,也就是 LoadBalancerClient。

到這里,其實已經可以想象,loadBalancer.execute 這行代碼就是根據服務名稱去獲取一個具體的實例,然后將原始地址替換為實例的IP地址。那這個 loadBalancer 又是什么呢?

 1 public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
 2     // 原始地址:http://demo-producer/v1/uuid
 3     final URI originalUri = request.getURI();
 4     // host 就是服務名:demo-producer
 5     String serviceName = originalUri.getHost();
 6     Assert.state(serviceName != null,
 7             "Request URI does not contain a valid hostname: " + originalUri);
 8     return this.loadBalancer.execute(serviceName,
 9             this.requestFactory.createRequest(request, body, execution));
10 }

4、負載均衡客戶端 LoadBalancerClient

在配置 LoadBalancerInterceptor 時,需要兩個參數,LoadBalancerClient 和 LoadBalancerRequestFactory,LoadBalancerRequestFactory前面已經知道是如何創建的了。LoadBalancerClient 又是在哪創建的呢?通過 IDEA 搜索,可以發現是在 spring-cloud-netflix-ribbon 模塊下的 RibbonAutoConfiguration 中配置的,可以看到 LoadBalancerClient 的實際類型是 RibbonLoadBalancerClient。

配置類的順序是 EurekaClientAutoConfiguration、RibbonAutoConfiguration、LoadBalancerAutoConfiguration,因為使 RestTemplate 具備負載均衡的能力需要 LoadBalancerInterceptor 攔截器,創建 LoadBalancerInterceptor 又需要 LoadBalancerClient,而 LoadBalancerClient 底層要根據服務名獲取某個實例,肯定又需要一個實例庫,比如從配置文件、注冊中心獲取。從這里就可以看出來,RibbonLoadBalancerClient 默認會從 Eureka 注冊中心獲取實例。

 1 @Configuration
 2 @Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
 3 @RibbonClients
 4 // 后於 EurekaClientAutoConfiguration 配置
 5 @AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
 6 // 先於 LoadBalancerAutoConfiguration 配置
 7 @AutoConfigureBefore({ LoadBalancerAutoConfiguration.class,    AsyncLoadBalancerAutoConfiguration.class })
 8 @EnableConfigurationProperties({ RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class })
 9 public class RibbonAutoConfiguration {
10 
11     @Autowired(required = false)
12     private List<RibbonClientSpecification> configurations = new ArrayList<>();
13 
14     @Bean
15     @ConditionalOnMissingBean
16     public SpringClientFactory springClientFactory() {
17         SpringClientFactory factory = new SpringClientFactory();
18         factory.setConfigurations(this.configurations);
19         return factory;
20     }
21 
22     @Bean
23     @ConditionalOnMissingBean(LoadBalancerClient.class)
24     public LoadBalancerClient loadBalancerClient() {
25         return new RibbonLoadBalancerClient(springClientFactory());
26     }
27 }

LoadBalancerClient 主要提供了三個接口:

 1 public interface LoadBalancerClient extends ServiceInstanceChooser {
 2 
 3     // 從 LoadBalancer 找一個 Server 來發送請求
 4     <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
 5 
 6     // 從傳入的 ServiceInstance 取 Server 來發送請求  
 7     <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
 8 
 9     // 對原始 URI 重構
10     URI reconstructURI(ServiceInstance instance, URI original);
11 }

進入到 RibbonLoadBalancerClient 的 execute 方法中可以看到:

  • 首先根據服務名獲取服務對應的負載均衡器 ILoadBalancer。
  • 然后從 ILoadBalancer 中根據一定策略選出一個實例 Server。
  • 然后將 server、serviceId 等信息封裝到 RibbonServer 中,也就是一個服務實例 ServiceInstance。
  • 最后調用了 LoadBalancerRequest 的 apply,並傳入 ServiceInstance,將地址中的服務名替換為真實的IP地址。
 1 public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
 2     return execute(serviceId, request, null);
 3 }
 4 
 5 public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
 6         throws IOException {
 7     // 根據服務名獲取一個負載均衡器 ILoadBalancer
 8     ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
 9     // 利用負載均衡器獲取實例 Server
10     Server server = getServer(loadBalancer, hint);
11     if (server == null) {
12         throw new IllegalStateException("No instances available for " + serviceId);
13     }
14     // 封裝實例信息:RibbonServer 的父類是 ServiceInstance
15     RibbonServer ribbonServer = new RibbonServer(serviceId, server,
16             isSecure(server, serviceId),
17             serverIntrospector(serviceId).getMetadata(server));
18     return execute(serviceId, ribbonServer, request);
19 }
20 
21 @Override
22 public <T> T execute(String serviceId, ServiceInstance serviceInstance,
23         LoadBalancerRequest<T> request) throws IOException {
24     Server server = null;
25     if (serviceInstance instanceof RibbonServer) {
26         server = ((RibbonServer) serviceInstance).getServer();
27     }
28     if (server == null) {
29         throw new IllegalStateException("No instances available for " + serviceId);
30     }
31 
32     try {
33         // 處理地址,將服務名替換為真實的IP地址
34         T returnVal = request.apply(serviceInstance);
35         return returnVal;
36     } catch (Exception ex) {
37         // ...
38     }
39     return null;
40 }

這個 LoadBalancerRequest 其實就是 LoadBalancerInterceptor 的 intercept 中創建的一個匿名類,在它的函數式接口內,主要是用裝飾器 ServiceRequestWrapper 將 request 包了一層。

 1 public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) {
 2     return instance -> {
 3         // 封裝 HttpRequest,ServiceRequestWrapper 重載了 getURI 方法。
 4         HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
 5         if (this.transformers != null) {
 6             for (LoadBalancerRequestTransformer transformer : this.transformers) {
 7                 serviceRequest = transformer.transformRequest(serviceRequest, instance);
 8             }
 9         }
10         // 繼續執行攔截器
11         return execution.execute(serviceRequest, body);
12     };
13 }

ServiceRequestWrapper 主要就是重寫了 getURI 方法,在重寫的 getURI 方法內,它用 loadBalancer 對 URI 進行了重構,進去可以發現,就是將原始地址中的服務名替換為 Server 的真實IP、端口地址。

1 @Override
2 public URI getURI() {
3     // 重構 URI
4     URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
5     return uri;
6 }
 1 public URI reconstructURI(ServiceInstance instance, URI original) {
 2     Assert.notNull(instance, "instance can not be null");
 3     // 服務名
 4     String serviceId = instance.getServiceId();
 5     RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
 6 
 7     URI uri;
 8     Server server;
 9     if (instance instanceof RibbonServer) {
10         RibbonServer ribbonServer = (RibbonServer) instance;
11         server = ribbonServer.getServer();
12         uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
13     }
14     else {
15         server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
16         IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
17         ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
18         uri = updateToSecureConnectionIfNeeded(original, clientConfig, serverIntrospector, server);
19     }
20     // 重構地址
21     return context.reconstructURIWithServer(server, uri);
22 }

reconstructURIWithServer:

 1 public URI reconstructURIWithServer(Server server, URI original) {
 2     String host = server.getHost();
 3     int port = server.getPort();
 4     String scheme = server.getScheme();
 5     
 6     if (host.equals(original.getHost()) 
 7             && port == original.getPort()
 8             && scheme == original.getScheme()) {
 9         return original;
10     }
11     if (scheme == null) {
12         scheme = original.getScheme();
13     }
14     if (scheme == null) {
15         scheme = deriveSchemeAndPortFromPartialUri(original).first();
16     }
17 
18     try {
19         StringBuilder sb = new StringBuilder();
20         sb.append(scheme).append("://");
21         if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
22             sb.append(original.getRawUserInfo()).append("@");
23         }
24         sb.append(host);
25         if (port >= 0) {
26             sb.append(":").append(port);
27         }
28         sb.append(original.getRawPath());
29         if (!Strings.isNullOrEmpty(original.getRawQuery())) {
30             sb.append("?").append(original.getRawQuery());
31         }
32         if (!Strings.isNullOrEmpty(original.getRawFragment())) {
33             sb.append("#").append(original.getRawFragment());
34         }
35         URI newURI = new URI(sb.toString());
36         return newURI;            
37     } catch (URISyntaxException e) {
38         throw new RuntimeException(e);
39     }
40 }
View Code

5、RestTemplate 負載均衡總結

到這里,我們基本就弄清楚了一個簡單的 @LoadBalanced 注解如何讓 RestTemplate 具備了負載均衡的能力了,這一節來做個小結。

① RestTemplate 如何獲得負載均衡的能力

  • 1)首先 RestTemplate 是 spring-web 模塊下一個訪問第三方 RESTful API 接口的網絡請求框架
  • 2)在 spring cloud 微服務架構中,用 @LoadBalanced 對 RestTemplate 做個標記,就可以使 RestTemplate 具備負載均衡的能力
  • 3)使 RestTemplate 具備負載均衡的核心組件就是 LoadBalancerAutoConfiguration 配置類中向其添加的 LoadBalancerInterceptor 負載均衡攔截器
  • 4)RestTemplate 在發起 http 調用前,會遍歷所有攔截器來對 RestTemplate 定制化,LoadBalancerInterceptor 就是在這時將URI中的服務名替換為實例的真實IP地址。定制完成后,就會發起真正的 http 請求。
  • 5)LoadBalancerInterceptor 又主要是使用負載均衡客戶端 LoadBalancerClient 來完成URI的重構的,LoadBalancerClient 就可以根據服務名查找一個可用的實例,然后重構URI。

② 核心組件

這里會涉及多個模塊,下面是核心組件的所屬模塊:

spring-web:

  • RestTemplate
  • InterceptingClientHttpRequest:執行攔截器,並發起最終http調用

spring-cloud-commons:

  • @LoadBalanced
  • LoadBalancerAutoConfiguration
  • LoadBalancerRequestFactory:創建裝飾類 ServiceRequestWrapper 替換原來的 HttpRequest,重載 getURI 方法。
  • LoadBalancerInterceptor:負載均衡攔截器
  • LoadBalancerClient:負載均衡客戶端接口

spring-cloud-netflix-ribbon:

  • RibbonLoadBalancerClient:LoadBalancerClient 的實現類,Ribbon 的負載均衡客戶端
  • RibbonAutoConfiguration

ribbon-loadbalancer:

  • ILoadBalancer:負載均衡器
  • Server:實例

③ 最后再用一張圖把 RestTemplate 這塊的關系捋一下

三、ILoadBalancer 獲取 Server

從前面 RestTemplate 那張圖可以看出,使 RestTemplate 具備負載均衡的能力,最重要的一個組件之一就是 ILoadBalancer,因為要用它來獲取能調用的 Server,有了 Server 才能對原始帶有服務名的 URI 進行重構。這節就來看下 Ribbon 的負載均衡器 ILoadBalancer 是如何創建的以及如何通過它獲取 Server。

1、創建負載均衡器 ILoadBalancer

① SpringClientFactory與上下文

ILoadBalancer 是用 SpringClientFactory 的 getLoadBalancer 方法根據服務名獲取的,從 getInstance 一步步進去可以發現,每個服務都會創建一個 AnnotationConfigApplicationContext,也就是一個應用上下文 ApplicationContext。相當於就是一個服務綁定一個 ILoadBalancer。

1 public <C> C getInstance(String name, Class<C> type) {
2     C instance = super.getInstance(name, type);
3     if (instance != null) {
4         return instance;
5     }
6     IClientConfig config = getInstance(name, IClientConfig.class);
7     return instantiateWithConfig(getContext(name), type, config);
8 }
1 public <T> T getInstance(String name, Class<T> type) {
2     // 根據名稱獲取
3     AnnotationConfigApplicationContext context = getContext(name);
4     if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type).length > 0) {
5         return context.getBean(type);
6     }
7     return null;
8 }
 1 protected AnnotationConfigApplicationContext getContext(String name) {
 2     // contexts => Map<String, AnnotationConfigApplicationContext>
 3     if (!this.contexts.containsKey(name)) {
 4         synchronized (this.contexts) {
 5             if (!this.contexts.containsKey(name)) {
 6                 this.contexts.put(name, createContext(name));
 7             }
 8         }
 9     }
10     return this.contexts.get(name);
11 }

調試看下 AnnotationConfigApplicationContext 上下文,可以看到放入了與這個服務綁定的 ILoadBalancer、IClientConfig、RibbonLoadBalancerContext 等。

它這里為什么要每個服務都綁定一個 ApplicationContext 呢?我猜想應該是因為服務實例列表可以有多個來源,比如可以從 eureka 注冊中心獲取、可以通過代碼配置、可以通過配置文件配置,另外每個服務還可以有很多個性化的配置,有默認的配置、定制的全局配置、個別服務的特定配置等,它這樣做就便於用戶定制每個服務的負載均衡策略。

② Ribbon的飢餓加載

而且這個Ribbon客戶端的應用上下文默認是懶加載的,並不是在啟動的時候就加載上下文,而是在第一次調用的時候才會去初始化。

如果想服務啟動時就初始化,可以指定Ribbon客戶端的具體名稱,在啟動的時候就加載配置項的上下文:

1 ribbon:
2   eager-load:
3     enabled: true
4     clients: demo-producer,demo-xxx

在 RibbonAutoConfiguration 配置類中可以找到這個飢餓配置,如果開啟了飢餓加載,就會創建 RibbonApplicationContextInitializer 來在啟動時初始化上下文。

 1 @Bean
 2 @ConditionalOnProperty("ribbon.eager-load.enabled")
 3 public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() {
 4     return new RibbonApplicationContextInitializer(springClientFactory(), ribbonEagerLoadProperties.getClients());
 5 }
 6 
 7 public class RibbonApplicationContextInitializer implements ApplicationListener<ApplicationReadyEvent> {
 8     private final SpringClientFactory springClientFactory;
 9 
10     // List of Ribbon client names
11     private final List<String> clientNames;
12 
13     public RibbonApplicationContextInitializer(SpringClientFactory springClientFactory, List<String> clientNames) {
14         this.springClientFactory = springClientFactory;
15         this.clientNames = clientNames;
16     }
17 
18     protected void initialize() {
19         if (clientNames != null) {
20             for (String clientName : clientNames) {
21                 // 提前初始化上下文
22                 this.springClientFactory.getContext(clientName);
23             }
24         }
25     }
26 
27     @Override
28     public void onApplicationEvent(ApplicationReadyEvent event) {
29         initialize();
30     }
31 }
View Code

③ RibbonClientConfiguration

ILoadBalancer 的創建在哪呢?看 RibbonClientConfiguration,這個配置類提供了 ILoadBalancer 的默認創建方法,ILoadBalancer 的默認實現類為 ZoneAwareLoadBalancer。

 1 public class RibbonClientConfiguration {
 2 
 3     public static final int DEFAULT_CONNECT_TIMEOUT = 1000;
 4 
 5     public static final int DEFAULT_READ_TIMEOUT = 1000;
 6 
 7     public static final boolean DEFAULT_GZIP_PAYLOAD = true;
 8 
 9     @RibbonClientName
10     private String name = "client";
11 
12     @Autowired
13     private PropertiesFactory propertiesFactory;
14 
15     @Bean
16     @ConditionalOnMissingBean
17     public IClientConfig ribbonClientConfig() {
18         DefaultClientConfigImpl config = new DefaultClientConfigImpl();
19         config.loadProperties(this.name);
20         config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
21         config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
22         config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
23         return config;
24     }
25 
26     @Bean
27     @ConditionalOnMissingBean
28     public IRule ribbonRule(IClientConfig config) {
29         if (this.propertiesFactory.isSet(IRule.class, name)) {
30             return this.propertiesFactory.get(IRule.class, config, name);
31         }
32         ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
33         rule.initWithNiwsConfig(config);
34         return rule;
35     }
36 
37     @Bean
38     @ConditionalOnMissingBean
39     public IPing ribbonPing(IClientConfig config) {
40         if (this.propertiesFactory.isSet(IPing.class, name)) {
41             return this.propertiesFactory.get(IPing.class, config, name);
42         }
43         return new DummyPing();
44     }
45 
46     @Bean
47     @ConditionalOnMissingBean
48     @SuppressWarnings("unchecked")
49     public ServerList<Server> ribbonServerList(IClientConfig config) {
50         if (this.propertiesFactory.isSet(ServerList.class, name)) {
51             return this.propertiesFactory.get(ServerList.class, config, name);
52         }
53         ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
54         serverList.initWithNiwsConfig(config);
55         return serverList;
56     }
57 
58     @Bean
59     @ConditionalOnMissingBean
60     public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
61         return new PollingServerListUpdater(config);
62     }
63 
64     @Bean
65     @ConditionalOnMissingBean
66     @SuppressWarnings("unchecked")
67     public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
68         if (this.propertiesFactory.isSet(ServerListFilter.class, name)) {
69             return this.propertiesFactory.get(ServerListFilter.class, config, name);
70         }
71         ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
72         filter.initWithNiwsConfig(config);
73         return filter;
74     }
75 
76     @Bean
77     @ConditionalOnMissingBean
78     public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
79             ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
80             IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
81         // 先判斷配置文件中是否配置了負載均衡器
82         if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
83             // 通過反射創建
84             return this.propertiesFactory.get(ILoadBalancer.class, config, name);
85         }
86         return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
87                 serverListFilter, serverListUpdater);
88     }
89 }

可以看到創建 ILoadBalancer 需要 IClientConfig、ServerList<Server>、ServerListFilter<Server>、IRule、IPing、ServerListUpdater,其實這6個接口加上 ILoadBalancer 就是 Ribbon 的核心接口,它們共同定義了 Ribbon 的行為特性。

這7個核心接口和默認實現類如下:

2、客戶端 Ribbon 定制

可以看到在 RibbonClientConfiguration 中創建 IRule、IPing、ServerList<Server>、ServerListFilter<Server>、ILoadBalancer 時,都先通過 propertiesFactory.isSet 判斷是否已配置了對應類型的實現類,沒有才使用默認的實現類。

也就是說針對特定的服務,這幾個類可以自行定制化,也可以通過配置指定其它的實現類。

① 全局策略配置

如果想要全局更改配置,需要加一個配置類,比如像下面這樣:

 1 @Configuration
 2 public class GlobalRibbonConfiguration {
 3 
 4     @Bean
 5     public IRule ribbonRule() {
 6         return new RandomRule();
 7     }
 8 
 9     @Bean
10     public IPing ribbonPing() {
11         return new NoOpPing();
12     }
13 }

② 基於注解的配置

如果想針對某一個服務定制配置,可以通過 @RibbonClients 來配置特定服務的配置類。

需要先定義一個服務配置類:

 1 @Configuration
 2 public class ProducerRibbonConfiguration {
 3 
 4     @Bean
 5     public IRule ribbonRule() {
 6         return new RandomRule();
 7     }
 8 
 9     @Bean
10     public IPing ribbonPing() {
11         return new NoOpPing();
12     }
13 }

用 @RibbonClients 注解為服務指定特定的配置類,並排除掉,不讓 Spring 掃描,否則就變成了全局配置了。

1 @RibbonClients({
2     @RibbonClient(name = "demo-producer", configuration = ProducerRibbonConfiguration.class)
3 })
4 @ComponentScan(excludeFilters = {
5     @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = ProducerRibbonConfiguration.class)
6 })

③ 配置文件配置

通過配置文件的方式來配置,配置的格式就是 <服務名稱>.ribbon.<屬性>:

 1 demo-producer:
 2   ribbon:
 3     # ILoadBalancer
 4     NFLoadBalancerClassName: com.netflix.loadbalancer.NoOpLoadBalancer
 5     # IRule
 6     NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
 7     # IPing
 8     NFLoadBalancerPingClassName:
 9     # ServerList<Server>
10     NIWSServerListClassName:
11     # ServerListFilter<Server>
12     NIWSServerListFilterClassName:

④ 優先級順序

這幾種配置方式的優先級順序是 配置文件配置 > @RibbonClients 配置 > 全局配置 > 默認配置。

3、ZoneAwareLoadBalancer 選擇 Server

獲取到 ILoadBalancer 后,就要去獲取 Server 了,可以看到,就是用 ILoadBalancer 來獲取 Server。

1 protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
2     if (loadBalancer == null) {
3         return null;
4     }
5     // Use 'default' on a null hint, or just pass it on?
6     return loadBalancer.chooseServer(hint != null ? hint : "default");
7 }

ILoadBalancer  的默認實現類是 ZoneAwareLoadBalancer,進入它的 chooseServer 方法內,如果只配置了一個 zone,就走父類的 chooseServer,否則從多個 zone 中去選擇實例。

 1 public Server chooseServer(Object key) {
 2     // ENABLED => ZoneAwareNIWSDiscoveryLoadBalancer.enabled 默認 true
 3     // AvailableZones 配置的只有一個 defaultZone
 4     if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
 5         logger.debug("Zone aware logic disabled or there is only one zone");
 6         // 走父類獲取 Server 的邏輯
 7         return super.chooseServer(key);
 8     }
 9     
10     // 多 zone 邏輯....
11 }

先看下 ZoneAwareLoadBalancer 的類繼承結構,ZoneAwareLoadBalancer 的直接父類是 DynamicServerListLoadBalancer,DynamicServerListLoadBalancer 的父類又是 BaseLoadBalancer。

ZoneAwareLoadBalancer 調用父類的 chooseServer 方法是在 BaseLoadBalancer 中的,進去可以看到,它主要是用 IRule 來選擇實例,最終選擇實例的策略就交給了 IRule 接口。

 1 public Server chooseServer(Object key) {
 2     if (counter == null) {
 3         counter = createCounter();
 4     }
 5     counter.increment();
 6     if (rule == null) {
 7         return null;
 8     } else {
 9         try {
10             // IRule
11             return rule.choose(key);
12         } catch (Exception e) {
13             logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
14             return null;
15         }
16     }
17 }

4、ZoneAvoidanceRule 斷言篩選、輪詢選擇 Server

IRule 的默認實現類是 ZoneAvoidanceRule,先看下 ZoneAvoidanceRule 的繼承結構,ZoneAvoidanceRule 的直接父類是 PredicateBasedRule。

rule.choose 的邏輯在 PredicateBasedRule 中,getPredicate() 返回的是 ZoneAvoidanceRule 創建的一個組合斷言 CompositePredicate,就是用這個斷言來過濾出可用的 Server,並通過輪詢的策略返回一個 Server。

 1 public Server choose(Object key) {
 2     ILoadBalancer lb = getLoadBalancer();
 3     // getPredicate() Server斷言 => CompositePredicate
 4     // RoundRobin 輪詢方式獲取實例
 5     Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
 6     if (server.isPresent()) {
 7         return server.get();
 8     } else {
 9         return null;
10     }       
11 }

在初始化 ZoneAvoidanceRule 配置時,創建了 CompositePredicate,可以看到這個組合斷言主要有兩個斷言,一個是斷言 Server 的 zone 是否可用,一個斷言 Server 本身是否可用,例如 Server 無法 ping 通。

 1 public void initWithNiwsConfig(IClientConfig clientConfig) {
 2     // 斷言 Server 的 zone 是否可用,只有一個 defaultZone 的情況下都是可用的
 3     ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);
 4     // 斷言 Server 是否可用
 5     AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);
 6     // 封裝組合斷言
 7     compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
 8 }
 9 
10 private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
11     // 建造者模式創建斷言
12     return CompositePredicate.withPredicates(p1, p2)
13                          .addFallbackPredicate(p2)
14                          .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
15                          .build();
16     
17 }

接着看選擇Server的 chooseRoundRobinAfterFiltering,參數 servers 是通過 ILoadBalancer 獲取的所有實例,可以看到它其實就是返回了 ILoadBalancer 在內存中緩存的服務所有 Server。這個 Server 從哪來的我們后面再來看。

1 public List<Server> getAllServers() {
2     // allServerList => List<Server>
3     return Collections.unmodifiableList(allServerList);
4 }

先對所有實例通過斷言過濾掉不可用的 Server,然后是通過輪詢的方式獲取一個 Server 返回。這就是默認配置下 ILoadBalancer(ZoneAwareLoadBalancer) 通過 IRule(ZoneAvoidanceRule) 選擇 Server 的流程了。

 1 public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
 2     // 斷言獲取可用的 Server
 3     List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
 4     if (eligible.size() == 0) {
 5         return Optional.absent();
 6     }
 7     // 通過取模的方式輪詢 Server
 8     return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
 9 }
10 
11 public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
12     if (loadBalancerKey == null) {
13         return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));            
14     } else {
15         List<Server> results = Lists.newArrayList();
16         // 對每個 Server 斷言
17         for (Server server: servers) {
18             if (this.apply(new PredicateKey(loadBalancerKey, server))) {
19                 results.add(server);
20             }
21         }
22         return results;            
23     }
24 }
25 
26 private int incrementAndGetModulo(int modulo) {
27     for (;;) {
28         int current = nextIndex.get();
29         // 模運算取余數
30         int next = (current + 1) % modulo;
31         // CAS 更新 nextIndex
32         if (nextIndex.compareAndSet(current, next) && current < modulo)
33             return current;
34     }
35 }

四、Ribbon 整合 Eureka Client 拉取Server列表

前面在通過 IRule 選擇 Server 的時候,首先通過 lb.getAllServers() 獲取了所有的 Server,那這些 Server 從哪里來的呢,這節就來看下。

1、ILoadBalancer 初始化

ILoadBalancer 的默認實現類是 ZoneAwareLoadBalancer,先從 ZoneAwareLoadBalancer 的構造方法進去看看都做了些什么事情。

 1 @Bean
 2 @ConditionalOnMissingBean
 3 public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
 4         ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
 5         IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
 6     if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
 7         return this.propertiesFactory.get(ILoadBalancer.class, config, name);
 8     }
 9     return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
10             serverListFilter, serverListUpdater);
11 }

可以看到,ZoneAwareLoadBalancer 直接調用了父類 DynamicServerListLoadBalancer 的構造方法,DynamicServerListLoadBalancer 先調用父類 BaseLoadBalancer 初始化,然后又做了一些剩余的初始化工作。

 1 public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
 2                              IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
 3                              ServerListUpdater serverListUpdater) {
 4     // DynamicServerListLoadBalancer
 5     super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
 6 }
 7 
 8 public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
 9                                      ServerList<T> serverList, ServerListFilter<T> filter,
10                                      ServerListUpdater serverListUpdater) {
11     // BaseLoadBalancer
12     super(clientConfig, rule, ping);
13     this.serverListImpl = serverList;
14     this.filter = filter;
15     this.serverListUpdater = serverListUpdater;
16     if (filter instanceof AbstractServerListFilter) {
17         ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
18     }
19     // 剩余的一些初始化
20     restOfInit(clientConfig);
21 }
22 
23 public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) {
24     // createLoadBalancerStatsFromConfig => LoadBalancerStats 統計
25     initWithConfig(config, rule, ping, createLoadBalancerStatsFromConfig(config));
26 

看 BaseLoadBalancer 的 initWithConfig,主要做了如下的初始化:

  • 設置 IPing 和 IRule,ping 的間隔時間是 30 秒,setPing 會啟動一個后台定時任務,然后每隔30秒運行一次 PingTask 任務。
  • 設置了 ILoadBalancer 的 統計器 LoadBalancerStats,對 ILoadBalancer 的 Server 狀態進行統計,比如連接失敗、成功、熔斷等信息。
  • 在啟用 PrimeConnections 請求預熱的情況下,創建 PrimeConnections 來預熱客戶端 與 Server 的鏈接。默認是關閉的。
  • 最后是注冊了一些監控、開啟請求預熱。
 1 void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
 2     this.config = clientConfig;
 3     String clientName = clientConfig.getClientName();
 4     this.name = clientName;
 5     // ping 間隔時間,默認30秒
 6     int pingIntervalTime = Integer.parseInt(""
 7             + clientConfig.getProperty(
 8                     CommonClientConfigKey.NFLoadBalancerPingInterval,
 9                     Integer.parseInt("30")));
10     // 沒看到用的地方                
11     int maxTotalPingTime = Integer.parseInt(""
12             + clientConfig.getProperty(
13                     CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
14                     Integer.parseInt("2")));
15     // 設置 ping 間隔時間,並重新設置了 ping 任務
16     setPingInterval(pingIntervalTime);
17     setMaxTotalPingTime(maxTotalPingTime);
18 
19     // 設置 IRule、IPing
20     setRule(rule);
21     setPing(ping);
22 
23     setLoadBalancerStats(stats);
24     rule.setLoadBalancer(this);
25     if (ping instanceof AbstractLoadBalancerPing) {
26         ((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
27     }
28     logger.info("Client: {} instantiated a LoadBalancer: {}", name, this);
29     
30     // PrimeConnections,請求預熱,默認關閉
31     // 作用主要用於解決那些部署環境(如讀EC2)在實際使用實時請求之前,從防火牆連接/路徑進行預熱(比如先加白名單、初始化等等動作比較耗時,可以用它先去打通)。
32     boolean enablePrimeConnections = clientConfig.get(
33             CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS);
34     if (enablePrimeConnections) {
35         this.setEnablePrimingConnections(true);
36         PrimeConnections primeConnections = new PrimeConnections(
37                 this.getName(), clientConfig);
38         this.setPrimeConnections(primeConnections);
39     }
40     // 注冊一些監控
41     init();
42 }
43 
44 protected void init() {
45     Monitors.registerObject("LoadBalancer_" + name, this);
46     // register the rule as it contains metric for available servers count
47     Monitors.registerObject("Rule_" + name, this.getRule());
48     // 默認關閉
49     if (enablePrimingConnections && primeConnections != null) {
50         primeConnections.primeConnections(getReachableServers());
51     }
52 }

再看下 DynamicServerListLoadBalancer 的初始化,核心的初始化邏輯在 restOfInit 中,主要就是做了兩件事情:

  • 開啟動態更新 Server 的特性,比如實例上線、下線、故障等,要能夠更新 ILoadBalancer 的 Server 列表。
  • 然后就全量更新一次本地的 Server 列表。
 1 void restOfInit(IClientConfig clientConfig) {
 2     boolean primeConnection = this.isEnablePrimingConnections();
 3     // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
 4     this.setEnablePrimingConnections(false);
 5     
 6     // 開啟動態更新 Server 的特性
 7     enableAndInitLearnNewServersFeature();
 8 
 9     // 更新 Server 列表
10     updateListOfServers();
11     
12     // 開啟請求預熱的情況下,對可用的 Server 進行預熱
13     if (primeConnection && this.getPrimeConnections() != null) {
14         this.getPrimeConnections()
15                 .primeConnections(getReachableServers());
16     }
17     this.setEnablePrimingConnections(primeConnection);
18     LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
19 }

2、全量更新Server列表

先看下 updateListOfServers() 是如何更新 Server 列表的,進而看下 ILoadBalancer 是如何存儲 Server 的。

  • 首先使用 ServerList 獲取所有的 Server 列表,在 RibbonClientConfiguration 中配置的是 ConfigurationBasedServerList,但和 eureka 集合和,就不是 ConfigurationBasedServerList 了,這塊下一節再來看。
  • 然后使用 ServerListFilter 對 Server 列表過濾,其默認實現類是 ZonePreferenceServerListFilter,它主要是過濾出當前 Zone(defaultZone)下的 Server。
  • 最后就是更新所有 Server 列表,先是設置 Server alive,然后調用父類(BaseLoadBalancer)的 setServersList 來更新Server列表,這說明 Server 是存儲在 BaseLoadBalancer 里的。
 1 public void updateListOfServers() {
 2     List<T> servers = new ArrayList<T>();
 3     if (serverListImpl != null) {
 4         // 從 ServerList 獲取所有 Server 列表
 5         servers = serverListImpl.getUpdatedListOfServers();
 6         LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers);
 7 
 8         if (filter != null) {
 9             // 用 ServerListFilter 過濾 Server
10             servers = filter.getFilteredListOfServers(servers);
11             LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers);
12         }
13     }
14     // 更新所有 Server 到本地緩存
15     updateAllServerList(servers);
16 }
17 
18 protected void updateAllServerList(List<T> ls) {
19     if (serverListUpdateInProgress.compareAndSet(false, true)) {
20         try {
21             for (T s : ls) {
22                 s.setAlive(true); // 設置 Server alive
23             }
24             setServersList(ls);
25             // 強制初始化 Ping
26             super.forceQuickPing();
27         } finally {
28             serverListUpdateInProgress.set(false);
29         }
30     }
31 }
32 
33 public void setServersList(List lsrv) {
34     // BaseLoadBalancer
35     super.setServersList(lsrv);
36     
37     // 將 Server 更新到 LoadBalancerStats 統計中 ....
38 }

接着看父類的 setServersList,可以看出,存儲所有 Server 的數據結構 allServerList 是一個加了 synchronized 的線程安全的容器,setServersList 就是直接將得到的 Server 列表替換  allServerList。

 1 public void setServersList(List lsrv) {
 2     Lock writeLock = allServerLock.writeLock();
 3     ArrayList<Server> newServers = new ArrayList<Server>();
 4     // 加寫鎖
 5     writeLock.lock();
 6     try {
 7         // for 循環將 lsrv 中的 Server 轉移到 allServers
 8         ArrayList<Server> allServers = new ArrayList<Server>();
 9         for (Object server : lsrv) {
10             if (server == null) {
11                 continue;
12             }
13             if (server instanceof String) {
14                 server = new Server((String) server);
15             }
16             if (server instanceof Server) {
17                 logger.debug("LoadBalancer [{}]:  addServer [{}]", name, ((Server) server).getId());
18                 allServers.add((Server) server);
19             } else {
20                 throw new IllegalArgumentException("Type String or Server expected, instead found:" + server.getClass());
21             }
22         }
23         
24         boolean listChanged = false;
25         // allServerList => volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>())
26         if (!allServerList.equals(allServers)) {
27             listChanged = true;
28             // 服務列表變更監聽器 ServerListChangeListener, 發出服務變更通知...
29         }
30         
31         // 啟用了服務預熱,開始 Server 預熱...
32         
33         // 直接替換
34         allServerList = allServers;
35         if (canSkipPing()) {
36             for (Server s : allServerList) {
37                 s.setAlive(true);
38             }
39             upServerList = allServerList;
40         } else if (listChanged) {
41             forceQuickPing();
42         }
43     } finally {
44         // 釋放寫鎖
45         writeLock.unlock();
46     }
47 }

前面 chooseRoundRobinAfterFiltering 獲取所有 Server 時就是返回的這個 allServerList列表。

1 public List<Server> getAllServers() {
2     return Collections.unmodifiableList(allServerList);
3 }

3、Eureka Ribbon 客戶端配置

獲取 Server 的組件是 ServerList,RibbonClientConfiguration 中配置的默認實現類是 ConfigurationBasedServerList。ConfigurationBasedServerList 默認是從配置文件中獲取,可以像下面這樣配置服務實例地址,多個 Server 地址用逗號隔開。

1 demo-producer:
2   ribbon:
3     listOfServers: http://10.215.0.92:8010,http://10.215.0.92:8011

但是和 eureka-client 結合后,也就是引入 spring-cloud-starter-netflix-eureka-client 的客戶端依賴,它會幫我們引入 spring-cloud-netflix-eureka-client 依賴,這個包中有一個 RibbonEurekaAutoConfiguration 自動化配置類,它通過 @RibbonClients 注解定義了全局的 Ribbon 客戶端配置類 為 EurekaRibbonClientConfiguration

1 @Configuration(proxyBeanMethods = false)
2 @EnableConfigurationProperties
3 @ConditionalOnRibbonAndEurekaEnabled
4 @AutoConfigureAfter(RibbonAutoConfiguration.class)
5 @RibbonClients(defaultConfiguration = EurekaRibbonClientConfiguration.class)
6 public class RibbonEurekaAutoConfiguration {
7 
8 }

進入 EurekaRibbonClientConfiguration  可以看到:

  • IPing 的默認實現類為 NIWSDiscoveryPing。
  • ServerList 的默認實現類為 DomainExtractingServerList,但是 DomainExtractingServerList 在構造時又傳入了一個類型為 DiscoveryEnabledNIWSServerList 的 ServerList。看名字大概也可以看出,DiscoveryEnabledNIWSServerList 就是從 EurekaClient 獲取 Server 的組件。
 1 @Configuration(proxyBeanMethods = false)
 2 public class EurekaRibbonClientConfiguration {
 3     @Value("${ribbon.eureka.approximateZoneFromHostname:false}")
 4     private boolean approximateZoneFromHostname = false;
 5 
 6     @RibbonClientName
 7     private String serviceId = "client";
 8     @Autowired
 9     private PropertiesFactory propertiesFactory;
10 
11     @Bean
12     @ConditionalOnMissingBean
13     public IPing ribbonPing(IClientConfig config) {
14         if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
15             return this.propertiesFactory.get(IPing.class, config, serviceId);
16         }
17         NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
18         ping.initWithNiwsConfig(config);
19         return ping;
20     }
21 
22     @Bean
23     @ConditionalOnMissingBean
24     public ServerList<?> ribbonServerList(IClientConfig config,
25             Provider<EurekaClient> eurekaClientProvider) {
26         if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
27             return this.propertiesFactory.get(ServerList.class, config, serviceId);
28         }
29         DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(config, eurekaClientProvider);
30         DomainExtractingServerList serverList = new DomainExtractingServerList(discoveryServerList, config, this.approximateZoneFromHostname);
31         return serverList;
32     }
33 }

4、從 DiscoveryClient 獲取Server列表

DynamicServerListLoadBalancer 中通過 ServerList 的 getUpdatedListOfServers 方法全量獲取服務列表,在 eureka-client 環境下,ServerList 默認實現類為 DomainExtractingServerList,那就先看下它的 getUpdatedListOfServers 方法。

可以看出,DomainExtractingServerList 先用 DomainExtractingServerList 獲取服務列表,然后根據 Ribbon 客戶端配置重新構造 Server 對象返回。獲取服務列表的核心在 DiscoveryEnabledNIWSServerList 中。

 1 @Override
 2 public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
 3     // list => DiscoveryEnabledNIWSServerList
 4     List<DiscoveryEnabledServer> servers = setZones(this.list.getUpdatedListOfServers());
 5     return servers;
 6 }
 7 
 8 private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
 9     List<DiscoveryEnabledServer> result = new ArrayList<>();
10     boolean isSecure = this.ribbon.isSecure(true);
11     boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
12     // 根據客戶端配置重新構造 DomainExtractingServer 返回
13     for (DiscoveryEnabledServer server : servers) {
14         result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr, this.approximateZoneFromHostname));
15     }
16     return result;
17 }

先看下 DiscoveryEnabledNIWSServerList  的構造初始化:

  • 主要是傳入了 Provider<EurekaClient> 用來獲取 EurekaClient
  • 另外還設置了客戶端名稱 clientName ,以及 vipAddresses 也是客戶端名稱,這個后面會用得上。
 1 public DiscoveryEnabledNIWSServerList(IClientConfig clientConfig, Provider<EurekaClient> eurekaClientProvider) {
 2     this.eurekaClientProvider = eurekaClientProvider;
 3     initWithNiwsConfig(clientConfig);
 4 }
 5 
 6 @Override
 7 public void initWithNiwsConfig(IClientConfig clientConfig) {
 8     // 客戶端名稱,就是服務名稱
 9     clientName = clientConfig.getClientName();
10     // vipAddresses 得到的也是客戶端名稱
11     vipAddresses = clientConfig.resolveDeploymentContextbasedVipAddresses();
12     
13     // 其它的一些配置....
14 }

接着看獲取實例的 getUpdatedListOfServers,可以看到它的核心邏輯就是根據服務名從 EurekaClient 獲取 InstanceInfo 實例列表,然后封裝 Server 信息返回。

 1 public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
 2     return obtainServersViaDiscovery();
 3 }
 4 
 5 private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
 6     List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
 7     if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
 8         return new ArrayList<DiscoveryEnabledServer>();
 9     }
10     // 得到 EurekaClient,實際類型是 CloudEurekaClient,其父類是 DiscoveryClient
11     EurekaClient eurekaClient = eurekaClientProvider.get();
12     if (vipAddresses!=null){
13         // 分割 vipAddresses,默認就是服務名稱
14         for (String vipAddress : vipAddresses.split(",")) {
15             // 根據服務名稱從 EurekaClient 獲取實例信息
16             List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
17             for (InstanceInfo ii : listOfInstanceInfo) {
18                 if (ii.getStatus().equals(InstanceStatus.UP)) {
19                     // ...
20                     // 根據實例信息 InstanceInfo 創建 Server
21                     DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
22                     serverList.add(des);
23                 }
24             }
25             if (serverList.size()>0 && prioritizeVipAddressBasedServers){
26                 break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
27             }
28         }
29     }
30     return serverList;
31 }

注意這里的 vipAddress 其實就是服務名:

最后看 EurekaClient 的 getInstancesByVipAddress,到這里就很清楚了,其實就是從 DiscoveryClient 的本地應用 Applications 中根據服務名取出所有的實例列表。

這里就和 Eureka 源碼那塊銜接上了,eureka-client 全量抓取注冊表以及每隔30秒增量抓取注冊表,都是合並到本地的 Applications 中。Ribbon 與 Eureka 結合后,Ribbon 獲取 Server 就從 DiscoveryClient 的 Applications 中獲取 Server 列表了。

 1 public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure, String region) {
 2     // ...
 3     Applications applications;
 4     if (instanceRegionChecker.isLocalRegion(region)) {
 5         // 取本地應用 Applications
 6         applications = this.localRegionApps.get();
 7     } else {
 8         applications = remoteRegionVsApps.get(region);
 9         if (null == applications) {
10             return Collections.emptyList();
11         }
12     }
13 
14     if (!secure) {
15         // 返回服務名對應的實例
16         return applications.getInstancesByVirtualHostName(vipAddress);
17     } else {
18         return applications.getInstancesBySecureVirtualHostName(vipAddress);
19     }
20 }

5、定時更新Server列表

DynamicServerListLoadBalancer 初始化時,有個方法還沒說,就是 enableAndInitLearnNewServersFeature()。這個方法只是調用 ServerListUpdater 啟動了一個 UpdateAction,這個 UpdateAction 又只是調用了一下 updateListOfServers 方法,就是前面講解過的全量更新 Server 的邏輯。

 1 public void enableAndInitLearnNewServersFeature() {
 2     serverListUpdater.start(updateAction);
 3 }
 4 
 5 protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
 6     @Override
 7     public void doUpdate() {
 8         // 調用 updateListOfServers
 9         updateListOfServers();
10     }
11 };

ServerListUpdater 的默認實現類是 PollingServerListUpdater,看下它的 start 方法:

其實就是以固定的頻率,每隔30秒調用一下 updateListOfServers 方法,將 DiscoveryClient 中 Applications 中緩存的實例同步到 ILoadBalancer 中的 allServerList 列表中。

 1 public synchronized void start(final UpdateAction updateAction) {
 2     if (isActive.compareAndSet(false, true)) {
 3         final Runnable wrapperRunnable = new Runnable() {
 4             @Override
 5             public void run() {
 6                 // ...
 7                 try {
 8                     // 執行一次 updateListOfServers
 9                     updateAction.doUpdate();
10                     // 設置最后更新時間
11                     lastUpdated = System.currentTimeMillis();
12                 } catch (Exception e) {
13                     logger.warn("Failed one update cycle", e);
14                 }
15             }
16         };
17         
18         // 固定頻率調度
19         scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
20                 wrapperRunnable,
21                 initialDelayMs, // 默認 1000
22                 refreshIntervalMs, // 默認 30 * 1000
23                 TimeUnit.MILLISECONDS
24         );
25     } else {
26         logger.info("Already active, no-op");
27     }
28 }

6、判斷Server是否存活

在創建 ILoadBalancer 時,IPing 還沒有看過是如何工作的。在初始化的時候,可以看到,主要就是設置了當前的 ping,然后重新設置了一個調度任務,默認每隔30秒調度一次 PingTask 任務。

 1 public void setPing(IPing ping) {
 2     if (ping != null) {
 3         if (!ping.equals(this.ping)) {
 4             this.ping = ping;
 5             // 設置 Ping 任務
 6             setupPingTask();
 7         }
 8     } else {
 9         this.ping = null;
10         // cancel the timer task
11         lbTimer.cancel();
12     }
13 }
14 
15 void setupPingTask() {
16     // ...
17     // 創建一個定時調度器
18     lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true);
19     // pingIntervalTime 默認為 30 秒,每隔30秒調度一次 PingTask
20     lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
21     // 立即發起以 Ping
22     forceQuickPing();
23 }

ShutdownEnabledTimer 可以簡單了解下,它是繼承自 Timer 的,它在創建的時候向 Runtime 注冊了一個回調,在 jvm 關閉的時候來取消 Timer 的執行,進而釋放資源。

 1 public class ShutdownEnabledTimer extends Timer {
 2     private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownEnabledTimer.class);
 3 
 4     private Thread cancelThread;
 5     private String name;
 6 
 7     public ShutdownEnabledTimer(String name, boolean daemon) {
 8         super(name, daemon);
 9         this.name = name;
10         // 取消定時器的線程
11         this.cancelThread = new Thread(new Runnable() {
12             public void run() {
13                 ShutdownEnabledTimer.super.cancel();
14             }
15         });
16 
17         LOGGER.info("Shutdown hook installed for: {}", this.name);
18         // 向 Runtime 注冊一個鈎子,在 jvm 關閉時,調用 cancelThread 取消定時任務
19         Runtime.getRuntime().addShutdownHook(this.cancelThread);
20     }
21 
22     @Override
23     public void cancel() {
24         super.cancel();
25         LOGGER.info("Shutdown hook removed for: {}", this.name);
26         try {
27             Runtime.getRuntime().removeShutdownHook(this.cancelThread);
28         } catch (IllegalStateException ise) {
29             LOGGER.info("Exception caught (might be ok if at shutdown)", ise);
30         }
31 
32     }
33 }
View Code

再來看下 PingTask,PingTask 核心邏輯就是遍歷 allServers 列表,使用 IPingStrategy 和 IPing 來判斷 Server 是否存活,並更新 Server 的狀態,以及將所有存活的 Server 更新到 upServerList 中,upServerList 緩存了所有存活的 Server。

 1 class PingTask extends TimerTask {
 2     public void run() {
 3         try {
 4             // pingStrategy => SerialPingStrategy
 5             new Pinger(pingStrategy).runPinger();
 6         } catch (Exception e) {
 7             logger.error("LoadBalancer [{}]: Error pinging", name, e);
 8         }
 9     }
10 }
11 
12 class Pinger {
13     private final IPingStrategy pingerStrategy;
14 
15     public Pinger(IPingStrategy pingerStrategy) {
16         this.pingerStrategy = pingerStrategy;
17     }
18 
19     public void runPinger() throws Exception {
20         if (!pingInProgress.compareAndSet(false, true)) { 
21             return; // Ping in progress - nothing to do
22         }
23 
24         Server[] allServers = null;
25         boolean[] results = null;
26 
27         Lock allLock = null;
28         Lock upLock = null;
29         
30         try {
31             allLock = allServerLock.readLock();
32             allLock.lock();
33             // 加讀鎖,取出 allServerList 中的 Server
34             allServers = allServerList.toArray(new Server[allServerList.size()]);
35             allLock.unlock();
36 
37             int numCandidates = allServers.length;
38             // 使用 IPingStrategy 和 IPing 對所有 Server 發起 ping 請求
39             results = pingerStrategy.pingServers(ping, allServers);
40 
41             final List<Server> newUpList = new ArrayList<Server>();
42             final List<Server> changedServers = new ArrayList<Server>();
43 
44             for (int i = 0; i < numCandidates; i++) {
45                 boolean isAlive = results[i];
46                 Server svr = allServers[i];
47                 boolean oldIsAlive = svr.isAlive();
48                 // 設置 alive 是否存活
49                 svr.setAlive(isAlive);
50 
51                 // 實例變更
52                 if (oldIsAlive != isAlive) {
53                     changedServers.add(svr);
54                     logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}",  name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
55                 }
56 
57                 // 添加存活的 Server
58                 if (isAlive) {
59                     newUpList.add(svr);
60                 }
61             }
62             upLock = upServerLock.writeLock();
63             upLock.lock();
64             // 更新 upServerList,upServerList 只保存了存活的 Server
65             upServerList = newUpList;
66             upLock.unlock();
67             // 通知變更
68             notifyServerStatusChangeListener(changedServers);
69         } finally {
70             pingInProgress.set(false);
71         }
72     }
73 }
View Code

IPingStrategy 的默認實現類是 SerialPingStrategy,進入可以發現它只是遍歷所有 Server,然后用 IPing 判斷 Server 是否存活。

 1 private static class SerialPingStrategy implements IPingStrategy {
 2     @Override
 3     public boolean[] pingServers(IPing ping, Server[] servers) {
 4         int numCandidates = servers.length;
 5         boolean[] results = new boolean[numCandidates];
 6 
 7         for (int i = 0; i < numCandidates; i++) {
 8             results[i] = false;
 9             try {
10                 if (ping != null) {
11                     // 使用 IPing 判斷 Server 是否存活
12                     results[i] = ping.isAlive(servers[i]);
13                 }
14             } catch (Exception e) {
15                 logger.error("Exception while pinging Server: '{}'", servers[i], e);
16             }
17         }
18         return results;
19     }
20 }

在集成 eureka-client 后,IPing默認實現類是 NIWSDiscoveryPing,看它的 isAlive 方法,其實就是判斷對應 Server 的實例 InstanceInfo 的狀態是否是 UP 狀態,UP狀態就表示 Server 存活。

 1 public boolean isAlive(Server server) {
 2     boolean isAlive = true;
 3     if (server!=null && server instanceof DiscoveryEnabledServer){
 4         DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;                
 5         InstanceInfo instanceInfo = dServer.getInstanceInfo();
 6         if (instanceInfo!=null){                    
 7             InstanceStatus status = instanceInfo.getStatus();
 8             if (status!=null){
 9                 // 判斷Server對應的實例狀態是否是 UP
10                 isAlive = status.equals(InstanceStatus.UP);
11             }
12         }
13     }
14     return isAlive;
15 }

7、一張圖總結 Ribbon 核心原理

① Ribbon 核心工作原理總結

  • 首先,Ribbon 的7個核心接口共同定義了 Ribbon 的行為特性,它們就是 Ribbon 的核心骨架。
  • 使用 Ribbon 來對客戶端做負載均衡,基本的用法就是用 @LoadBalanced 注解標注一個 RestTemplate 的 bean 對象,之后在 LoadBalancerAutoConfiguration 配置類中會對帶有 @LoadBalanced 注解的 RestTemplate 添加 LoadBalancerInterceptor 攔截器。
  • LoadBalancerInterceptor 會攔截 RestTemplate 的 HTTP 請求,將請求綁定進 Ribbon 負載均衡的生命周期,然后使用 LoadBalancerClient 的 execute 方法來處理請求。
  • LoadBalancerClient 首先會得到一個 ILoadBalancer,再使用它去得到一個 Server,這個 Server 就是具體某一個實例的信息封裝。得到 Server 之后,就用 Server 的 IP 和端口重構原始 URI。
  • ILoadBalancer 最終在選擇實例的時候,會通過 IRule 均衡策略來選擇一個 Server。
  • ILoadBalancer 的父類 BaseLoadBalancer 中有一個 allServerList 列表緩存了所有 Server,Ribbon 中 Server 的來源就是 allServerList。
  • 在加載Ribbon客戶端上下文時,ILoadBalancer 會用 ServerList 從 DiscoveryClient 的 Applications 中獲取客戶端對應的實例列表,然后使用 ServerListFilter 過濾,最后更新到 allServerList 中。
  • ILoadBalancer 還會開啟一個后台任務 ServerListUpdater ,每隔30秒運行一次,用 ServerList 將 DiscoveryClient 的 Applications 中的實例列表同步到 allServerList 中。
  • ILoadBalancer 還會開啟一個后台任務 PingTask,每隔30秒運行一次,用 IPing 判斷 Server 的存活狀態,EurekaClient 環境下,就是判斷 InstanceInfo 的狀態是否為 UP。

② 下面用一張圖來總結下 Ribbon 這塊獲取Server的核心流程以及對應的核心接口間的關系。

8、Ribbon 脫離 Eureka 使用

在默認情況下,Ribbon 客戶端會從 EurekaClient 獲取服務列表,其實就是間接從注冊中心讀取服務注冊信息列表,來達到動態負載均衡的功能。但如果不想從 EurekaClient 讀取,可以禁用 Ribbon 的 Eureka 功能。
在 Ribbon 中禁用Eureka功能,可以做如下配置:

1 ribbon:
2   eureka:
3     enabled: false

那 ribbon.eureka.enabled 是如何控制禁用 Eureka 的呢?看 RibbonEurekaAutoConfiguration:

1 @Configuration(proxyBeanMethods = false)
2 @EnableConfigurationProperties
3 @ConditionalOnRibbonAndEurekaEnabled
4 @AutoConfigureAfter(RibbonAutoConfiguration.class)
5 @RibbonClients(defaultConfiguration = EurekaRibbonClientConfiguration.class)
6 public class RibbonEurekaAutoConfiguration {
7 
8 }

這個配置類通過 @RibbonClients 指定了默認的客戶端配置類為 EurekaRibbonClientConfiguration,但生效的前提是 @ConditionalOnRibbonAndEurekaEnabled,進去可以看到這個條件注解就是判斷 Ribbon Eureka 是否啟用了,就可以設置 ribbon.eureka.enabled=false 來禁用 RIbbon Eureka。

 1 @Target({ ElementType.TYPE, ElementType.METHOD })
 2 @Retention(RetentionPolicy.RUNTIME)
 3 @Documented
 4 // 條件類 OnRibbonAndEurekaEnabledCondition
 5 @Conditional(ConditionalOnRibbonAndEurekaEnabled.OnRibbonAndEurekaEnabledCondition.class)
 6 public @interface ConditionalOnRibbonAndEurekaEnabled {
 7 
 8     class OnRibbonAndEurekaEnabledCondition extends AllNestedConditions {
 9 
10         OnRibbonAndEurekaEnabledCondition() {
11             super(ConfigurationPhase.REGISTER_BEAN);
12         }
13         
14         // 引入了類:DiscoveryEnabledNIWSServerList 
15         @ConditionalOnClass(DiscoveryEnabledNIWSServerList.class)
16         // 存在bean對象:SpringClientFactory
17         @ConditionalOnBean(SpringClientFactory.class)
18         // ribbon.eureka.enabled=true
19         @ConditionalOnProperty(value = "ribbon.eureka.enabled", matchIfMissing = true)
20         static class Defaults {
21 
22         }
23 
24         // 存在bean對象:EurekaClient
25         @ConditionalOnBean(EurekaClient.class)
26         static class EurekaBeans {
27 
28         }
29         
30         // eureka.client.enabled=true
31         @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
32         @ConditionalOnDiscoveryEnabled
33         static class OnEurekaClientEnabled {
34 
35         }
36     }
37 }

如果想從其它地方獲取服務列表,可以自定義接口實現 ServerList<Server> 來獲取,也可以在配置文件中設置地址列表:

1 <client-name>:
2   ribbon:
3     listOfServers: http://10.215.0.92:8010,http://10.215.0.92:8011

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM