spring-cloud-netflix-ribbon 源碼分析:
本文主要針對 spring-cloud-starter-netflix-ribbon 的 2.2.3.RELEASE 版本進行源碼的解析。
對於未接觸過 Ribbon 的小伙伴可以參考 https://www.cnblogs.com/wuzhenzhao/p/9468928.html 進行一些基礎知識的了解。
注解 @LoadBalanced:
進入正題,針對我們采用 RestTemplate 進行服務之間的調用的時候,在Ribbon 中是采用了一個注解 @LoadBalanced,首先我們縣來看一下該注解的定義:
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Qualifier public @interface LoadBalanced { }
我們可以理解為這個注解是一個標識作用,其作用在 LoadBalancerAutoConfiguration 自動裝配類中 ,該類中定義了一個屬性如下:
@LoadBalanced @Autowired(required = false) private List<RestTemplate> restTemplates = Collections.emptyList();
其作用是將存在注解 @LoadBalanced 的類進行注入。起到一個過濾的作用。然后在自動裝配的配置類中,進行包裝攔截。
自動裝配之 RibbonAutoConfiguration :
大家都知道。spring-cloud 微服務基於Springboot 的自動裝配機制。首先我們需要了解的就是就是 RibbonAutoConfiguration 自動裝配類。這里講解主要的一些類的作用:
@Configuration @Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class) @RibbonClients @AutoConfigureAfter( name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration") //定義bean的先后加載。 @AutoConfigureBefore({ LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class }) @EnableConfigurationProperties({ RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class }) public class RibbonAutoConfiguration { @Autowired(required = false) private List<RibbonClientSpecification> configurations = new ArrayList<>(); @Autowired private RibbonEagerLoadProperties ribbonEagerLoadProperties; @Bean public HasFeatures ribbonFeature() { return HasFeatures.namedFeature("Ribbon", Ribbon.class); } @Bean public SpringClientFactory springClientFactory() { SpringClientFactory factory = new SpringClientFactory(); factory.setConfigurations(this.configurations); return factory; } // 注入Ribbon負載均衡客戶端實例 @Bean @ConditionalOnMissingBean(LoadBalancerClient.class) public LoadBalancerClient loadBalancerClient() { return new RibbonLoadBalancerClient(springClientFactory()); } // ...... }
可以發現在這個自動裝配的類中會注入很多的Bean,我們需要特別注意 RibbonLoadBalancerClient ,在服務調用的時候會用到。然后另外一個重點是 LoadBalancerAutoConfiguration:
@Configuration(proxyBeanMethods = false) // 條件注入 @ConditionalOnClass(RestTemplate.class) @ConditionalOnBean(LoadBalancerClient.class) @EnableConfigurationProperties(LoadBalancerRetryProperties.class) public class LoadBalancerAutoConfiguration { //這個就是上文提到的需要將注解有@LoadBalanced 的RestTemplate 過濾裝配。 @LoadBalanced @Autowired(required = false) private List<RestTemplate> restTemplates = Collections.emptyList(); @Autowired(required = false) private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList(); // 將 restTemplate進行包裝,以便后續進行攔截包裝 @Bean public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated( final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) { return () -> restTemplateCustomizers.ifAvailable(customizers -> { for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) { for (RestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); } } }); } // 創建請求工廠類,后續會通過該工廠類創建請求 request @Bean @ConditionalOnMissingBean public LoadBalancerRequestFactory loadBalancerRequestFactory( LoadBalancerClient loadBalancerClient) { return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers); } @Configuration(proxyBeanMethods = false) @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate") static class LoadBalancerInterceptorConfig { // 創建 restTemplate 攔截器實例 @Bean public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) { return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); } // 為滿足條件的 restTemplate 添加攔截器鏈 @Bean @ConditionalOnMissingBean public RestTemplateCustomizer restTemplateCustomizer( final LoadBalancerInterceptor loadBalancerInterceptor) { return restTemplate -> { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; } } // ....... }
其中關於 RibbonAutoConfiguration 初始化注入的一些類中比較重要的部分就都提及了。對於這塊的流程相對比較簡單,流程圖如下:
RestTemplate 的調用過程(負載選擇階段):
以 restTemplate.getForObject 為例,我們直接進入到RestTemplate這個類的doExecute方法,因為前面部分的代碼都比較簡單沒有太多邏輯。這段代碼中有一個很重要的邏輯,就是createRequest,這個是構建客戶端請求的一個方法。
@Nullable protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback, @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException { Assert.notNull(url, "URI is required"); Assert.notNull(method, "HttpMethod is required"); ClientHttpResponse response = null; try { ClientHttpRequest request = createRequest(url, method); if (requestCallback != null) { requestCallback.doWithRequest(request); } response = request.execute(); handleResponse(url, method, response); return (responseExtractor != null ? responseExtractor.extractData(response) : null); } catch (IOException ex) { String resource = url.toString(); String query = url.getRawQuery(); resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource); throw new ResourceAccessException("I/O error on " + method.name() + " request for \"" + resource + "\": " + ex.getMessage(), ex); } finally { if (response != null) { response.close(); } } }
createRequest 這里個方法是用來創建一個請求對象,其中getRequestFactory(),調用的是 InterceptingHttpAccessor 中的getRequestFactory方法,因為InterceptingHttpAccessor繼承了HttpAccessor這個類,重寫了 getRequestFactory方法。
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException { ClientHttpRequest request = getRequestFactory().createRequest(url, method); if (logger.isDebugEnabled()) { logger.debug("HTTP " + method.name() + " " + url); } return request; } //其中,getRequestFactory方法代碼如下,其中getInterceptors是獲得當前客戶端請求的所有攔截器,需要注意的是,這里的攔截器,就包含LoadBalancerInterceptor. public ClientHttpRequestFactory getRequestFactory() { List<ClientHttpRequestInterceptor> interceptors = getInterceptors(); if (!CollectionUtils.isEmpty(interceptors)) { ClientHttpRequestFactory factory = this.interceptingRequestFactory; if (factory == null) {//構建一個InterceptingClientHttpRequestFactory工廠,並且將所有的攔截器作為參數傳入 factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors); this.interceptingRequestFactory = factory; } return factory; } else { return super.getRequestFactory(); } }
getInterceptors 這個方法中返回的攔截器列表,是從InterceptingHttpAccessor.setInterceptors()方法來設置的,而這個setInterceptors()調用的地方正好是 LoadBalancerAutoConfiguration.LoadBalancerInterceptorConfig.restTemplateCustomizer。不明白的同學可以打個斷點跟一下代碼就清晰了。
LoadBalancerAutoConfiguration.LoadBalancerInterceptorConfig.restTemplateCustomizer 這里面調用了restTemplate.setInterceptors這個方法設置攔截器,其中RestTemplate又集成了 InterceptingHttpAccessor。所以,你懂的。
org.springframework.http.client.InterceptingClientHttpRequestFactory.createRequest 方法如下:
@Override protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) { return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod); }
所以,再回到createRequest方法中,getRequestFactory()方法返回的是 InterceptingClientHttpRequestFactory,而createRequest方法,最終返回的是 InterceptingClientHttpRequest這個類。
繼續跳回到RestTemplate.doExecute方法,最終會調用request.execute。那么這個時候,request.execute調用誰呢?於是我們看一下InterceptingClientHttpRequest的類關系圖,我們發現它有兩個父類。這是一種模版方法的設計。
所以這里會走到 AbstractClientHttpRequest.execute 方法:
@Override public final ClientHttpResponse execute() throws IOException { assertNotExecuted(); ClientHttpResponse result = executeInternal(this.headers); this.executed = true; return result; }
最終,我們進入到InterceptingClientHttpRequest.executeInternal方法
@Override protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException { InterceptingRequestExecution requestExecution = new InterceptingRequestExecution(); return requestExecution.execute(this, bufferedOutput); }
然后調用 InterceptingClientHttpRequest.execute方法。在InterceptingClientHttpRequest.execute方法中,有兩個處理邏輯
- 如果有配置多個客戶端攔截器,則調用攔截器方法,對請求進行攔截
- 否則,按照正常的處理邏輯進行遠程調用。
@Override public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException { if (this.iterator.hasNext()) { ClientHttpRequestInterceptor nextInterceptor = this.iterator.next(); return nextInterceptor.intercept(request, body, this); } else { HttpMethod method = request.getMethod(); Assert.state(method != null, "No standard HTTP method"); ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method); request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value)); if (body.length > 0) { if (delegate instanceof StreamingHttpOutputMessage) { StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate; streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream)); } else { StreamUtils.copy(body, delegate.getBody()); } } return delegate.execute(); } }
而在當前的場景中,自然是調用LoadBalancerInterceptor.intercept方法。
LoadBalancerInterceptor.intercept :
RestTemplate發起請求的源碼就不分析了,默認采用的是 SimpleClientHttpRequestFactory,內部是調用 jdk 的 HttpConnection來實現http請求的調用,我們主要關心請求的過程中,LoadBalancerInterceptor是如何發揮作用的。通過一些列的方法調用,最終還是回到了LoadBalancerInterceptor.intercept()這個方法上來,它主要實現了對於請求的攔截。
- 獲得一個服務名稱。
- 調用loadBalancer.execute。
@Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI();//獲得請求的URI String serviceName = originalUri.getHost();//獲得請求的名稱 Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));//通過RibbonLoadBalancerClient 執行請求 }
LoadBalancerClient其實是一個接口,我們看一下它的類圖,它有兩個具體的實現。
此時,LoadBalancerClient的具體實例應該是RibbonLoadBalancerClient,這個對象實例是在RibbonAutoConfiguration這個類中進行注入的。
進入 execute 方法:
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException { // 獲取當前配置的負載均衡器 ILoadBalancer loadBalancer = getLoadBalancer(serviceId); // 通過負載均衡器及負載均衡算法選擇一個對應的服務 Server server = getServer(loadBalancer, hint); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } // 構造 RibbonServer RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); // 執行調用 return execute(serviceId, ribbonServer, request); }
以上主要做了以下工作:
- 根據serviceId獲得一個ILoadBalancer
- 調用getServer方法去獲取一個服務實例
- 判斷Server的值是否為空。這里的Server實際上就是傳統的一個服務節點,這個對象存儲了服務節點的一些元數據,比如host、port等
getServer是用來獲得一個具體的服務節點,它的實現如下
protected Server getServer(ILoadBalancer loadBalancer, Object hint) { if (loadBalancer == null) { return null; } // Use 'default' on a null hint, or just pass it on? return loadBalancer.chooseServer(hint != null ? hint : "default"); }
通過代碼可以看到,getServer實際調用了IloadBalancer.chooseServer這個方法,ILoadBalancer這個是一個負載均衡器接口。
public interface ILoadBalancer { public void addServers(List<Server> newServers); public Server chooseServer(Object key); public void markServerDown(Server server); public List<Server> getReachableServers(); public List<Server> getAllServers(); }
還有一個過時的方法就不貼出來了,這五個方法作用如下:
-
addServers表示向負載均衡器中維護的實例列表增加服務實例
-
chooseServer表示通過某種策略,從負載均衡服務器中挑選出一個具體的服務實例
-
markServerDown表示用來通知和標識負載均衡器中某個具體實例已經停止服務,否則負載均衡器在下一次獲取服務實例清單前都會認為這個服務實例是正常工作的
-
getReachableServers表示獲取當前正常工作的服務實例列表
-
getAllServers表示獲取所有的服務實例列表,包括正常的服務和停止工作的服務
我們看一下ILoadBalancer的類關系圖
從整個類的關系圖來看,BaseLoadBalancer類實現了基礎的負載均衡,而 DynamicServerListLoadBalancer和 ZoneAwareLoadBalancer則是在負載均衡策略的基礎上做了一些功能擴展。
-
AbstractLoadBalancer實現了ILoadBalancer接口,它定義了服務分組的枚舉類/chooseServer(用來選取一個服務實例)/getServerList(獲取某一個分組中的所有服務實例)/getLoadBalancerStats用來獲得一個LoadBalancerStats對象,這個對象保存了每一個服務的狀態信息。
-
BaseLoadBalancer,它實現了作為負載均衡器的基本功能,比如服務列表維護、服務存活狀態監測、負載均衡算法選擇Server等。但是它只是完成基本功能,在有些復雜場景中還無法實現,比如動態服務列表、Server過濾、Zone區域意識(服務之間的調用希望盡可能是在同一個區域內進行,減少延遲)。
-
DynamicServerListLoadBalancer是BaseLoadbalancer的一個子類,它對基礎負載均衡提供了擴展,從名字上可以看出,它提供了動態服務列表的特性
-
ZoneAwareLoadBalancer 它是在DynamicServerListLoadBalancer的基礎上,增加了以Zone的形式來配置多個LoadBalancer的功能。
那在getServer方法中, loadBalancer.chooseServer 具體的實現類是哪一個呢?我們找到這個類:
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties // Order is important here, last should be the default, first should be optional // see // https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653 @Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class }) public class RibbonClientConfiguration { /** * Ribbon client default connect timeout. */ public static final int DEFAULT_CONNECT_TIMEOUT = 1000; /** * Ribbon client default read timeout. */ public static final int DEFAULT_READ_TIMEOUT = 1000; /** * Ribbon client default Gzip Payload flag. */ public static final boolean DEFAULT_GZIP_PAYLOAD = true; @RibbonClientName private String name = "client"; // TODO: maybe re-instate autowired load balancers: identified by name they could be // associated with ribbon clients @Autowired private PropertiesFactory propertiesFactory; @Bean @ConditionalOnMissingBean public IClientConfig ribbonClientConfig() { DefaultClientConfigImpl config = new DefaultClientConfigImpl(); config.loadProperties(this.name); config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT); config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT); config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD); return config; } // 默認的負載均衡規則 @Bean @ConditionalOnMissingBean public IRule ribbonRule(IClientConfig config) { if (this.propertiesFactory.isSet(IRule.class, name)) { return this.propertiesFactory.get(IRule.class, config, name); } ZoneAvoidanceRule rule = new ZoneAvoidanceRule(); rule.initWithNiwsConfig(config); return rule; } // 默認的維持心跳的IPing 實現類 @Bean @ConditionalOnMissingBean public IPing ribbonPing(IClientConfig config) { if (this.propertiesFactory.isSet(IPing.class, name)) { return this.propertiesFactory.get(IPing.class, config, name); } return new DummyPing(); } // 默認的服務列表配置類 @Bean @ConditionalOnMissingBean @SuppressWarnings("unchecked") public ServerList<Server> ribbonServerList(IClientConfig config) { if (this.propertiesFactory.isSet(ServerList.class, name)) { return this.propertiesFactory.get(ServerList.class, config, name); } ConfigurationBasedServerList serverList = new ConfigurationBasedServerList(); serverList.initWithNiwsConfig(config); return serverList; } // 這個類會定時獲取服務實例列表更新 @Bean @ConditionalOnMissingBean public ServerListUpdater ribbonServerListUpdater(IClientConfig config) { return new PollingServerListUpdater(config); } // 默認的負載均衡實例 @Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) { return this.propertiesFactory.get(ILoadBalancer.class, config, name); } return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater); } // ..... }
在這個bean的聲明中可以看到,默認情況下采用的是ZoneAwareLoadBalancer。Zone表示區域的意思,區域指的就是地理區域的概念,一般較大規模的互聯網公司,都會做跨區域部署,這樣做有幾個好處,第一個是為不同地域的用戶提供最近的訪問節點減少訪問延遲,其次是為了保證高可用,做容災處理。而ZoneAwareLoadBalancer就是提供了具備區域意識的負載均衡器,它的主要作用是對Zone進行了感知,保證每個Zone里面的負載均衡策略都是隔離的,它並不保證A區域過來的請求一定會發動到A區域對應的Server內。真正實現這個需求的是ZonePreferenceServerListFilter/ZoneAffinityServerListFilter 。ZoneAwareLoadBalancer的核心功能是若開啟了區域意識,且zone的個數 > 1,就繼續區域選擇邏輯根據ZoneAvoidanceRule.getAvailableZones()方法拿到可用區(會T除掉完全不可用的區域,以及可用但是負載最高的一個區域)從可用區zone們中,通過ZoneAvoidanceRule.randomChooseZone隨機選一個zone出來 (該隨機遵從權重規則:誰的zone里面Server數量最多,被選中的概率越大)在選中的zone里面的所有Server中,采用該zone對對應的Rule,進行choose:
@Override public Server chooseServer(Object key) { //ENABLED,表示是否用區域意識的choose選擇Server,默認是true, //如果禁用了區域、或者只有一個zone,就直接按照父類的邏輯來進行處理,父類默認采用輪詢算法 if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) { logger.debug("Zone aware logic disabled or there is only one zone"); return super.chooseServer(key); } Server server = null; try { LoadBalancerStats lbStats = getLoadBalancerStats(); Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats); logger.debug("Zone snapshots: {}", zoneSnapshot); if (triggeringLoad == null) { triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d); } if (triggeringBlackoutPercentage == null) { triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d); } //根據相關閾值計算可用區域 Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()); logger.debug("Available zones: {}", availableZones); if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) { //從可用區域中隨機選擇一個區域,zone里面的服務器節點越多,被選中的概率越大 String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones); logger.debug("Zone chosen: {}", zone); if (zone != null) { //根據zone獲得該zone中的LB,然后根據該Zone的負載均衡算法選擇一個server BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone); server = zoneLoadBalancer.chooseServer(key); } } } catch (Exception e) { logger.error("Error choosing server using zone aware logic for load balancer={}", name, e); } if (server != null) { return server; } else { logger.debug("Zone avoidance logic is not invoked."); return super.chooseServer(key); } }
BaseLoadBalancer.chooseServer :根據默認的負載均衡算法來獲得指定的服務節點。默認的算法是RoundBin。
public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } counter.increment(); if (rule == null) { return null; } else { try { return rule.choose(key); } catch (Exception e) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null; } } }
分析到這里,我們已經搞明白了Ribbon是如何運用負載均衡算法來從服務列表中獲得一個目標服務進行訪問的。但是還有一個疑惑,就是動態服務列表的更新和獲取是在哪里實現呢?
服務列表的加載及動態感知過程:
這個時候要回到 RibbonClientConfiguration 的注入過程了,我們發現在這個類里注入了一個 PollingServerListUpdater ,從名字上看就很明顯是進行服務列表的感知工作的,那么是哪里進行的調用呢 ?
跟到 ZoneAwareLoadBalancer 的構造方法里:
// 初始化 ZoneAwareLoadBalancer public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping, serverList, filter, serverListUpdater); } // 初始化父類 public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping); this.serverListImpl = serverList; this.filter = filter; this.serverListUpdater = serverListUpdater; if (filter instanceof AbstractServerListFilter) { ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats()); } restOfInit(clientConfig); } // 初始化 void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); //啟動定時任務,定時更新服務注冊中心實例列表 enableAndInitLearnNewServersFeature(); // 獲取服務實例列表 updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); }
其中 enableAndInitLearnNewServersFeature() :調用了一個 serverListUpdater.start(updateAction) 就是上面提到的 PollingServerListUpdater:
public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try {// 直接調用 updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; //啟動一個定時線程去定時獲取服務列表 scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } }
updateAction.doUpdate() 這個調用我們需要明白 updateAction 是什么東西 :
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { @Override public void doUpdate() { updateListOfServers(); } };
可以看到這里就是先調用了一次獲取服務列表請求,然后再開啟的一個定時任務,然后調用 DynamicServerListLoadBalancer.updateListOfServers 從注冊中心獲取服務列表。
@VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); }
然后這里調用 serverListImpl.getUpdatedListOfServers() 又很多的實現:
然后底層通過對應的實現獲取到服務列表。該階段所涉及的自動配置的流程如下圖:
RestTemplate 的調用過程(實際調用階段):
再回到 RibbonLoadBalancerClient.execute方法中。通過getServer獲得一個Server對象之后,再把Server包裝成一個RibbonServer對象,這個對象保存了Server的所有信息,同時還保存了服務名、是否需要https等。
在調用另外一個execute重載方法,在這個方法中最終會調用apply方法,這個方法會向一個具體的實例發送請求。
@Override public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { Server server = null; if (serviceInstance instanceof RibbonServer) { server = ((RibbonServer) serviceInstance).getServer(); } if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server); try { T returnVal = request.apply(serviceInstance); statsRecorder.recordStats(returnVal); return returnVal; } // catch IOException and rethrow so RestTemplate behaves correctly catch (IOException ex) { statsRecorder.recordStats(ex); throw ex; } catch (Exception ex) { statsRecorder.recordStats(ex); ReflectionUtils.rethrowRuntimeException(ex); } return null; }
request.apply :request是LoadBalancerRequest接口,它里面提供了一個apply方法,但是從代碼中我們發現這個方法並沒有實現類,那么它是在哪里實現的呢?
繼續又往前分析發現,這個request對象是從LoadBalancerInterceptor的intercept方法中傳遞過來的.而request的傳遞,是通過 this.requestFactory.createRequest(request, body, execution) 創建二來,於是我們找到這個方法。
public LoadBalancerRequest<ClientHttpResponse> createRequest( final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) { return instance -> { HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer); if (this.transformers != null) { for (LoadBalancerRequestTransformer transformer : this.transformers) { serviceRequest = transformer.transformRequest(serviceRequest, instance); } } return execution.execute(serviceRequest, body); }; }
從代碼中發現,它是一個用lambda表達式實現的匿名內部類。在該內部類中,創建了一個ServiceRequestWrapper,這個ServiceRequestWrapper實際上就是HttpRequestWrapper的一個子類,ServiceRequestWrapper重寫了HttpRequestWrapper的getURI()方法,重寫的URI實際上就是通過調用LoadBalancerClient接口的reconstructURI函數來重新構建一個URI進行訪問
那么 request.apply 其實進入的就是上述方法 ,然后調用了 InterceptingRequestExecution.execute 方法,繼而調用 request.getURI() 重構URI ,這里的request 就是上面這個方法提到的 ServiceRequestWrapper
@Override public URI getURI() { URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI()); return uri; }
reconstructURI這個方法,實際上是重構URI,也就是把一個 http://服務名/轉化為 http://地址/ 的過程。
首先獲得一個serviceId根據serviceId獲得一個RibbonLoadBalancerContext對象,這個是用來存儲一些被負載均衡器使用的上下文內容。
調用reconstructURIWithServer方法來構建服務實例的URI。
@Override public URI reconstructURI(ServiceInstance instance, URI original) { Assert.notNull(instance, "instance can not be null"); String serviceId = instance.getServiceId(); RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); URI uri; Server server; if (instance instanceof RibbonServer) { RibbonServer ribbonServer = (RibbonServer) instance; server = ribbonServer.getServer(); uri = updateToSecureConnectionIfNeeded(original, ribbonServer); } else { server = new Server(instance.getScheme(), instance.getHost(), instance.getPort()); IClientConfig clientConfig = clientFactory.getClientConfig(serviceId); ServerIntrospector serverIntrospector = serverIntrospector(serviceId); uri = updateToSecureConnectionIfNeeded(original, clientConfig, serverIntrospector, server); } return context.reconstructURIWithServer(server, uri); }
最后就是通過 HttpURLConnection 使用這個uri發起遠程通信。整個調用過程的流程圖如下:
關於與服務心跳的實現:
在 RibbonClientConfiguration 自動裝配的時候,注入了一個 DummyPing 實例,然后在初始化 ZoneAwareLoadBalancer 實例的時候,會走到其父類 BaseLoadBalancer 的構造方法 :
public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) { initWithConfig(config, rule, ping); } void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping) { // ......
setPing(ping); // ......
}
這里會將 DummyPing 實例 設置進去:
public void setPing(IPing ping) { if (ping != null) { if (!ping.equals(this.ping)) { this.ping = ping; setupPingTask(); // since ping data changed
} } else { this.ping = null; // cancel the timer task
lbTimer.cancel(); } }
然后啟動一個 pingTask :
void setupPingTask() { if (canSkipPing()) { return; } if (lbTimer != null) { lbTimer.cancel(); } lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true); lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000); forceQuickPing(); }
這里是每隔 10S 去執行一次任務,然后我們看看這個任務具體是在做什么:
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy(); protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY; class PingTask extends TimerTask { public void run() { try { new Pinger(pingStrategy).runPinger(); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error pinging", name, e); } } }
可以看到這里是通過策略模式去實現,默認的 pingerStrategy 是 SerialPingStrategy,但是由於本場景下實現是 DummyPing ,進入到 BaseLoadBalancer.Pinger#runPinger 的方法:
public void runPinger() throws Exception { if (!pingInProgress.compareAndSet(false, true)) { return; // Ping in progress - nothing to do
} // we are "in" - we get to Ping
Server[] allServers = null; boolean[] results = null; Lock allLock = null; Lock upLock = null; try { /* * The readLock should be free unless an addServer operation is * going on... */ allLock = allServerLock.readLock(); allLock.lock(); allServers = allServerList.toArray(new Server[allServerList.size()]); allLock.unlock(); int numCandidates = allServers.length; results = pingerStrategy.pingServers(ping, allServers); final List<Server> newUpList = new ArrayList<Server>(); final List<Server> changedServers = new ArrayList<Server>(); for (int i = 0; i < numCandidates; i++) { boolean isAlive = results[i]; Server svr = allServers[i]; boolean oldIsAlive = svr.isAlive(); svr.setAlive(isAlive); if (oldIsAlive != isAlive) { changedServers.add(svr); logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}", name, svr.getId(), (isAlive ? "ALIVE" : "DEAD")); } if (isAlive) { newUpList.add(svr); } } upLock = upServerLock.writeLock(); upLock.lock(); upServerList = newUpList; upLock.unlock(); notifyServerStatusChangeListener(changedServers); } finally { pingInProgress.set(false); } }
相信這段代碼不難看懂,這里無非就是通過 IPing 實現,遍歷緩存服務列表,進行檢查服務實力是否存活,pingerStrategy.pingServers(ping, allServers) :
public boolean[] pingServers(IPing ping, Server[] servers) { int numCandidates = servers.length; boolean[] results = new boolean[numCandidates]; logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates); for (int i = 0; i < numCandidates; i++) { results[i] = false; /* Default answer is DEAD. */
try { // NOTE: IFF we were doing a real ping // assuming we had a large set of servers (say 15) // the logic below will run them serially // hence taking 15 times the amount of time it takes // to ping each server // A better method would be to put this in an executor // pool // But, at the time of this writing, we dont REALLY // use a Real Ping (its mostly in memory eureka call) // hence we can afford to simplify this design and run // this // serially
if (ping != null) { results[i] = ping.isAlive(servers[i]); } } catch (Exception e) { logger.error("Exception while pinging Server: '{}'", servers[i], e); } } return results; }
基於DummyPing 的 isAlive 方法的實現,是直接返回true,然后這里也有一定的說明,交給注冊中心去做,減少不必要的性能消耗.
如果要實現自己的 IPing ,可以實現 IPing 接口。相關配置可以參考官網,這里復制了官網的相關拓展配置項:
-
<clientName>.ribbon.NFLoadBalancerClassName
: Should implementILoadBalancer
-
<clientName>.ribbon.NFLoadBalancerRuleClassName
: Should implementIRule
-
<clientName>.ribbon.NFLoadBalancerPingClassName
: Should implementIPing
-
<clientName>.ribbon.NIWSServerListClassName
: Should implementServerList
-
<clientName>.ribbon.NIWSServerListFilterClassName
: Should implementServerListFilter
更多細節請參考官方文檔。