深度解析Spring Cloud Ribbon的實現源碼及原理


Spring Cloud Ribbon源碼分析

Ribbon的核心作用就是進行請求的負載均衡,它的基本原理如下圖所示。就是客戶端集成Ribbon這個組件,Ribbon中會針對已經配置的服務提供者地址列表進行負載均衡的計算,得到一個目標地址之后,再發起請求。

image-20211118135001876

那么接下來,我們從兩個層面去分析Ribbon的原理

  1. @LoadBalanced 注解如何讓普通的RestTemplate具備負載均衡的能力
  2. OpenFeign集成Ribbon的實現原理

@LoadBalancer注解解析過程分析

在使用RestTemplate的時候,我們加了一個@LoadBalance注解,就能讓這個RestTemplate在請求時,就擁有客戶端負載均衡的能力。

@Bean
@LoadBalanced
RestTemplate restTemplate() {
    return new RestTemplate();
}

然后,我們打開@LoadBalanced這個注解,可以發現該注解僅僅是聲明了一個@qualifier注解。

@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {

}

@qualifier注解的作用

我們平時在使用注解去注入一個Bean時,都是采用@Autowired。並且大家應該知道@Autowired是可以注入一個List或者Map的。給大家舉個例子(在一個springboot應用中)

定義一個TestClass

@AllArgsConstructor
@Data
public class TestClass {
    private String name;
}

聲明一個配置類,並注入TestClass

@Configuration
public class TestConfig {

    @Bean("testClass1")
    TestClass testClass(){
        return new TestClass("testClass1");
    }

    @Bean("testClass2")
    TestClass testClass2(){
        return new TestClass("testClass2");
    }
}

定義一個Controller,用於測試, 注意,此時我們使用的是@Autowired來注入一個List集合

@RestController
public class TestController {

    @Autowired(required = false)
    List<TestClass> testClasses= Collections.emptyList();

    @GetMapping("/test")
    public Object test(){
        return testClasses;
    }
}

此時訪問:http://localhost:8080/test , 得到的結果是

[
    {
        name: "testClass1"
    },
    {
        name: "testClass2"
    }
]

修改TestConfigTestController

@Configuration
public class TestConfig {

    @Bean("testClass1")
    @Qualifier
    TestClass testClass(){
        return new TestClass("testClass1");
    }

    @Bean("testClass2")
    TestClass testClass2(){
        return new TestClass("testClass2");
    }
}
@RestController
public class TestController {

    @Autowired(required = false)
    @Qualifier
    List<TestClass> testClasses= Collections.emptyList();

    @GetMapping("/test")
    public Object test(){
        return testClasses;
    }
}

再次訪問:http://localhost:8080/test , 得到的結果是

[
    {
        name: "testClass1"
    }
]

@LoadBalancer注解篩選及攔截

了解了@qualifier注解的作用后,再回到@LoadBalancer注解上,就不難理解了。

因為我們需要掃描到增加了@LoadBalancer注解的RestTemplate實例,所以,@LoadBalancer可以完成這個動作,它的具體的實現代碼如下:

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {

   @LoadBalanced
   @Autowired(required = false)
   private List<RestTemplate> restTemplates = Collections.emptyList();
}

從這個代碼中可以看出,在LoadBalancerAutoConfiguration這個配置類中,會使用同樣的方式,把配置了@LoadBalanced注解的RestTemplate注入到restTemplates集合中。

拿到了RestTemplate之后,在LoadBalancerInterceptorConfig配置類中,會針對這些RestTemplate進行攔截,實現代碼如下:

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {

	@LoadBalanced
	@Autowired(required = false)
	private List<RestTemplate> restTemplates = Collections.emptyList();

    //省略....

	@Bean
	@ConditionalOnMissingBean
	public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
		return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
	}

	@Configuration(proxyBeanMethods = false)
	@Conditional(RetryMissingOrDisabledCondition.class)
	static class LoadBalancerInterceptorConfig {
		
        //裝載一個LoadBalancerInterceptor的實例到IOC容器。
		@Bean
		public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
				LoadBalancerRequestFactory requestFactory) {
			return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
		}
		
        //會遍歷所有加了@LoadBalanced注解的RestTemplate,在原有的攔截器之上,再增加了一個LoadBalancerInterceptor
		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
				List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
				list.add(loadBalancerInterceptor);
				restTemplate.setInterceptors(list);
			};
		}

	}
    //省略....
}

LoadBalancerInterceptor

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
      final ClientHttpRequestExecution execution) throws IOException {
   final URI originalUri = request.getURI();
   String serviceName = originalUri.getHost();
   Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
   return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}

RestTemplate調用過程

我們在程序中,使用下面的代碼發起遠程請求時

restTemplate.getForObject(url,String.class);

它的整個調用過程如下。

RestTemplate.getForObject

​ -----> AbstractClientHttpRequest.execute()

​ ----->AbstractBufferingClientHttpRequest.executeInternal()

​ -----> InterceptingClientHttpRequest.executeInternal()

​ -----> InterceptingClientHttpRequest.execute()

InterceptingClientHttpRequest.execute()方法的代碼如下。

@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
    if (this.iterator.hasNext()) { //遍歷所有的攔截器,通過攔截器進行逐個處理。
        ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
        return nextInterceptor.intercept(request, body, this);
    }
    else {
        HttpMethod method = request.getMethod();
        Assert.state(method != null, "No standard HTTP method");
        ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
        request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
        if (body.length > 0) {
            if (delegate instanceof StreamingHttpOutputMessage) {
                StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
            }
            else {
                StreamUtils.copy(body, delegate.getBody());
            }
        }
        return delegate.execute();
    }
}

LoadBalancerInterceptor

LoadBalancerInterceptor是一個攔截器,當一個被@Loadbalanced注解修飾的RestTemplate對象發起HTTP請求時,會被LoadBalancerInterceptorintercept方法攔截,

在這個方法中直接通過getHost方法就可以獲取到服務名(因為我們在使用RestTemplate調用服務的時候,使用的是服務名而不是域名,所以這里可以通過getHost直接拿到服務名然后去調用execute方法發起請求)

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
      final ClientHttpRequestExecution execution) throws IOException {
   final URI originalUri = request.getURI();
   String serviceName = originalUri.getHost();
   Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
   return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}

LoadBalancerClient其實是一個接口,我們看一下它的類圖,它有一個唯一的實現類:RibbonLoadBalancerClient

image-20211211152356718

RibbonLoadBalancerClient.execute

RibbonLoadBalancerClient這個類的代碼比較長,我們主要看一下他的核心方法execute

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
    throws IOException {
    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
    Server server = getServer(loadBalancer, hint);
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                                                 isSecure(server, serviceId),
                                                 serverIntrospector(serviceId).getMetadata(server));

    return execute(serviceId, ribbonServer, request);
}

上述代碼的實現邏輯如下:

  • 根據serviceId獲得一個ILoadBalancer,實例為:ZoneAwareLoadBalancer
  • 調用getServer方法去獲取一個服務實例
  • 判斷Server的值是否為空。這里的Server實際上就是傳統的一個服務節點,這個對象存儲了服務節點的一些元數據,比如host、port等

getServer

getServer是用來獲得一個具體的服務節點,它的實現如下

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
    if (loadBalancer == null) {
        return null;
    }
    // Use 'default' on a null hint, or just pass it on?
    return loadBalancer.chooseServer(hint != null ? hint : "default");
}

通過代碼可以看到,getServer實際調用了IloadBalancer.chooseServer這個方法,ILoadBalancer這個是一個負載均衡器接口。

public interface ILoadBalancer {
    //addServers表示向負載均衡器中維護的實例列表增加服務實例
    public void addServers(List<Server> newServers);
    //chooseServer表示通過某種策略,從負載均衡服務器中挑選出一個具體的服務實例
    public Server chooseServer(Object key);
    //markServerDown表示用來通知和標識負載均衡器中某個具體實例已經停止服務,否則負載均衡器在下一次獲取服務實例清單前都會認為這個服務實例是正常工作的
    public void markServerDown(Server server);
    //getReachableServers表示獲取當前正常工作的服務實例列表
    public List<Server> getReachableServers();
    //getAllServers表示獲取所有的服務實例列表,包括正常的服務和停止工作的服務
    public List<Server> getAllServers();
}

ILoadBalancer的類關系圖如下:

image-20211211153617850

從整個類的關系圖來看,BaseLoadBalancer類實現了基礎的負載均衡,而DynamicServerListLoadBalancer和ZoneAwareLoadBalancer則是在負載均衡策略的基礎上做了一些功能擴展。

  • AbstractLoadBalancer實現了ILoadBalancer接口,它定義了服務分組的枚舉類/chooseServer(用來選取一個服務實例)/getServerList(獲取某一個分組中的所有服務實例)/getLoadBalancerStats用來獲得一個LoadBalancerStats對象,這個對象保存了每一個服務的狀態信息。
  • BaseLoadBalancer,它實現了作為負載均衡器的基本功能,比如服務列表維護、服務存活狀態監測、負載均衡算法選擇Server等。但是它只是完成基本功能,在有些復雜場景中還無法實現,比如動態服務列表、Server過濾、Zone區域意識(服務之間的調用希望盡可能是在同一個區域內進行,減少延遲)。
  • DynamicServerListLoadBalancer是BaseLoadbalancer的一個子類,它對基礎負載均衡提供了擴展,從名字上可以看出,它提供了動態服務列表的特性
  • ZoneAwareLoadBalancer 它是在DynamicServerListLoadBalancer的基礎上,增加了以Zone的形式來配置多個LoadBalancer的功能。

那在getServer方法中,loadBalancer.chooseServer具體的實現類是哪一個呢?我們找到RibbonClientConfiguration這個類

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                                        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                                        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
        return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                                       serverListFilter, serverListUpdater);
}

從上述聲明中,發現如果沒有自定義ILoadBalancer,則直接返回一個ZoneAwareLoadBalancer

ZoneAwareLoadBalancer

Zone表示區域的意思,區域指的就是地理區域的概念,一般較大規模的互聯網公司,都會做跨區域部署,這樣做有幾個好處,第一個是為不同地域的用戶提供最近的訪問節點減少訪問延遲,其次是為了保證高可用,做容災處理。

而ZoneAwareLoadBalancer就是提供了具備區域意識的負載均衡器,它的主要作用是對Zone進行了感知,保證每個Zone里面的負載均衡策略都是隔離的,它並不保證A區域過來的請求一定會發動到A區域對應的Server內。真正實現這個需求的是ZonePreferenceServerListFilter/ZoneAffinityServerListFilter

ZoneAwareLoadBalancer的核心功能是

  • 若開啟了區域意識,且zone的個數 > 1,就繼續區域選擇邏輯
  • 根據ZoneAvoidanceRule.getAvailableZones()方法拿到可用區們(會T除掉完全不可用的區域們,以及可用但是負載最高的一個區域)
  • 從可用區zone們中,通過ZoneAvoidanceRule.randomChooseZone隨機選一個zone出來 (該隨機遵從權重規則:誰的zone里面Server數量最多,被選中的概率越大)
  • 在選中的zone里面的所有Server中,采用該zone對對應的Rule,進行choose
@Override
public Server chooseServer(Object key) {
    //ENABLED,表示是否用區域意識的choose選擇Server,默認是true,
    //如果禁用了區域、或者只有一個zone,就直接按照父類的邏輯來進行處理,父類默認采用輪詢算法
    if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
        logger.debug("Zone aware logic disabled or there is only one zone");
        return super.chooseServer(key);
    }
    Server server = null;
    try {
        LoadBalancerStats lbStats = getLoadBalancerStats();
        Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
        logger.debug("Zone snapshots: {}", zoneSnapshot);
        if (triggeringLoad == null) {
            triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
        }

        if (triggeringBlackoutPercentage == null) {
            triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
        }
        //根據相關閾值計算可用區域
        Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
        logger.debug("Available zones: {}", availableZones);
        if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
            //從可用區域中隨機選擇一個區域,zone里面的服務器節點越多,被選中的概率越大
            String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
            logger.debug("Zone chosen: {}", zone);
            if (zone != null) {
                //根據zone獲得該zone中的LB,然后根據該Zone的負載均衡算法選擇一個server
                BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                server = zoneLoadBalancer.chooseServer(key);
            }
        }
    } catch (Exception e) {
        logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
    }
    if (server != null) {
        return server;
    } else {
        logger.debug("Zone avoidance logic is not invoked.");
        return super.chooseServer(key);
    }
}

BaseLoadBalancer.chooseServer

假設我們現在沒有使用多區域部署,那么負載策略會執行到BaseLoadBalancer.chooseServer

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

根據默認的負載均衡算法來獲得指定的服務節點。默認的算法是RoundBin。

rule.choose

rule代表負載均衡算法規則,它有很多實現,IRule的實現類關系圖如下。

image-20211211155112400

默認情況下,rule的實現為ZoneAvoidanceRule,它是在RibbonClientConfiguration這個配置類中定義的,代碼如下:

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
// Order is important here, last should be the default, first should be optional
// see
// https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
		RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
        if (this.propertiesFactory.isSet(IRule.class, name)) {
            return this.propertiesFactory.get(IRule.class, config, name);
        }
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    }
}

所以,在BaseLoadBalancer.chooseServer中調用rule.choose(key);,實際會進入到ZoneAvoidanceRulechoose方法

@Override
public Server choose(Object key) {
    ILoadBalancer lb = getLoadBalancer(); //獲取負載均衡器
    Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); //通過該方法獲取目標服務
    if (server.isPresent()) {
        return server.get();
    } else {
        return null;
    }       
}

復合判斷server所在區域的性能和server的可用性選擇server

主要分析chooseRoundRobinAfterFiltering方法。

chooseRoundRobinAfterFiltering

從方法名稱可以看出來,它是通過對目標服務集群通過過濾算法過濾一遍后,再使用輪詢實現負載均衡。

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
    List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {
        return Optional.absent();
    }
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

CompositePredicate.getEligibleServers

使用主過濾條件對所有實例過濾並返回過濾后的清單,

@Override
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
    //
    List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
    
    //按照fallbacks中存儲的過濾器順序進行過濾(此處就行先ZoneAvoidancePredicate然后AvailabilityPredicate)
    Iterator<AbstractServerPredicate> i = fallbacks.iterator();
    while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
           && i.hasNext()) {
        AbstractServerPredicate predicate = i.next();
        result = predicate.getEligibleServers(servers, loadBalancerKey);
    }
    return result;
}

依次使用次過濾條件對主過濾條件的結果進行過濾*

  • //不論是主過濾條件還是次過濾條件,都需要判斷下面兩個條件
  • //只要有一個條件符合,就不再過濾,將當前結果返回供線性輪詢
    • 第1個條件:過濾后的實例總數>=最小過濾實例數(默認為1)
    • 第2個條件:過濾互的實例比例>最小過濾百分比(默認為0)

getEligibleServers

這里的實現邏輯是,遍歷所有服務器列表,調用this.apply方法進行驗證,驗證通過的節點,會加入到results這個列表中返回。

public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
    if (loadBalancerKey == null) {
        return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));            
    } else {
        List<Server> results = Lists.newArrayList();
        for (Server server: servers) {
            if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                results.add(server);
            }
        }
        return results;            
    }
}

this.apply,會進入到CompositePredicate.apply方法中,代碼如下。

//CompositePredicate.apply

@Override
public boolean apply(@Nullable PredicateKey input) {
    return delegate.apply(input);
}

delegate的實例是AbstractServerPredicate, 代碼如下!

public static AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) {
    return new AbstractServerPredicate() {
        @Override
        @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP")
            public boolean apply(PredicateKey input) {
            return p.apply(input);
        }            
    };        
}

也就是說,會通過AbstractServerPredicate.apply方法進行過濾,其中,input表示目標服務器集群的某一個具體節點。

其中p,表示AndPredicate實例,這里用到了組合predicate進行判斷,而這里的組合判斷是and的關系,用到了AndPredicate實現。

 private static class AndPredicate<T> implements Predicate<T>, Serializable {
        private final List<? extends Predicate<? super T>> components;
        private static final long serialVersionUID = 0L;

        private AndPredicate(List<? extends Predicate<? super T>> components) {
            this.components = components;
        }

        public boolean apply(@Nullable T t) {
            for(int i = 0; i < this.components.size(); ++i) { //遍歷多個predicate,逐一進行判斷。
                if (!((Predicate)this.components.get(i)).apply(t)) {
                    return false;
                }
            }

            return true;
        }
 }

上述代碼中,components是由兩個predicate組合而成

  1. AvailabilityPredicate,過濾熔斷狀態下的服務以及並發連接過多的服務。
  2. ZoneAvoidancePredicate,過濾掉無可用區域的節點。

所以在AndPredicateapply方法中,需要遍歷這兩個predicate逐一進行判斷。

AvailablilityPredicate

過濾熔斷狀態下的服務以及並發連接過多的服務,代碼如下:

@Override
public boolean apply(@Nullable PredicateKey input) {
    LoadBalancerStats stats = getLBStats();
    if (stats == null) {
        return true;
    }
    return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}

判斷是否要跳過這個目標節點,實現邏輯如下。

private boolean shouldSkipServer(ServerStats stats) {  
        //niws.loadbalancer.availabilityFilteringRule.filterCircuitTripped是否為true
    if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) //該Server是否為斷路狀態
        || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {//本機發往這個Server未處理完的請求個數是否大於Server實例最大的活躍連接數
        return true;
    }
    return false;
}

Server是否為斷路狀態是如何判斷的呢?

ServerStats源碼,這里詳細源碼我們不貼了,說一下機制:

斷路是通過時間判斷實現的,每次失敗記錄上次失敗時間。如果失敗了,則觸發判斷,是否大於斷路的最小失敗次數,則判斷:

計算斷路持續時間: (2^失敗次數)* 斷路時間因子,如果大於最大斷路時間,則取最大斷路時間。
判斷當前時間是否大於上次失敗時間+短路持續時間,如果小於,則是斷路狀態。
這里又涉及三個配置(這里需要將default替換成你調用的微服務名稱):

  • niws.loadbalancer.default.connectionFailureCountThreshold,默認為3, 觸發判斷是否斷路的最小失敗次數,也就是,默認如果失敗三次,就會判斷是否要斷路了。
  • niws.loadbalancer.default.circuitTripTimeoutFactorSeconds, 默認為10, 斷路時間因子,
  • niws.loadbalancer.default.circuitTripMaxTimeoutSeconds,默認為30,最大斷路時間

ZoneAvoidancePredicate

ZoneAvoidancePredicate,過濾掉不可用區域的節點,代碼如下!

@Override
public boolean apply(@Nullable PredicateKey input) {
    if (!ENABLED.get()) {//查看niws.loadbalancer.zoneAvoidanceRule.enabled配置的熟悉是否為true(默認為true)如果為false沒有開啟分片過濾 則不進行過濾
        return true;
    }
    ////獲取配置的分區字符串 默認為UNKNOWN
    String serverZone = input.getServer().getZone();
    if (serverZone == null) { //如果沒有分區,則不需要進行過濾,直接返回即可
        // there is no zone information from the server, we do not want to filter
        // out this server
        return true;
    }
    //獲取負載均衡的狀態信息
    LoadBalancerStats lbStats = getLBStats();
    if (lbStats == null) {
        // no stats available, do not filter
        return true;
    }
    //如果可用區域小於等於1,也不需要進行過濾直接返回
    if (lbStats.getAvailableZones().size() <= 1) {
        // only one zone is available, do not filter
        return true;
    }
    //針對當前負載信息,創建一個區域快照,后續會用快照數據進行計算(避免后續因為數據變更導致判斷計算不准確問題)
    Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
    if (!zoneSnapshot.keySet().contains(serverZone)) { //如果快照信息中沒有包含當前服務器所在區域,則也不需要進行判斷。
        // The server zone is unknown to the load balancer, do not filter it out 
        return true;
    }
    logger.debug("Zone snapshots: {}", zoneSnapshot);
    //獲取有效區域
    Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
    logger.debug("Available zones: {}", availableZones);
    if (availableZones != null) { //有效區域如果包含當前節點,則返回true,否則返回false, 返回false表示這個區域不可用,不需要進行目標節點分發。
        return availableZones.contains(input.getServer().getZone());
    } else {
        return false;
    }
} 

LoadBalancerStats,在每次發起通訊的時候,狀態信息會在控制台打印如下!

DynamicServerListLoadBalancer for client goods-service initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=goods-service,current list of Servers=[localhost:9091, localhost:9081],Load balancer stats=Zone stats: {unknown=[Zone:unknown;	Instance count:2;	Active connections count: 0;	Circuit breaker tripped count: 0;	Active connections per server: 0.0;]
},Server stats: [[Server:localhost:9091;	Zone:UNKNOWN;	Total Requests:0;	Successive connection failure:0;	Total blackout seconds:0;	Last connection made:Thu Jan 01 08:00:00 CST 1970;	First connection made: Thu Jan 01 08:00:00 CST 1970;	Active Connections:0;	total failure count in last (1000) msecs:0;	average resp time:0.0;	90 percentile resp time:0.0;	95 percentile resp time:0.0;	min resp time:0.0;	max resp time:0.0;	stddev resp time:0.0]
, [Server:localhost:9081;	Zone:UNKNOWN;	Total Requests:0;	Successive connection failure:0;	Total blackout seconds:0;	Last connection made:Thu Jan 01 08:00:00 CST 1970;	First connection made: Thu Jan 01 08:00:00 CST 1970;	Active Connections:0;	total failure count in last (1000) msecs:0;	average resp time:0.0;	90 percentile resp time:0.0;	95 percentile resp time:0.0;	min resp time:0.0;	max resp time:0.0;	stddev resp time:0.0]
]}ServerList:com.netflix.loadbalancer.ConfigurationBasedServerList@74ddb59a

getAvailableZones方法的代碼如下,用來計算有效可用區域。

public static Set<String> getAvailableZones(
    Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
    double triggeringBlackoutPercentage) {
    if (snapshot.isEmpty()) { //如果快照信息為空,返回空
        return null;
    }
    //定義一個集合存儲有效區域節點
    Set<String> availableZones = new HashSet<String>(snapshot.keySet());
    if (availableZones.size() == 1) { //如果有效區域的集合只有1個,直接返回
        return availableZones;
    }
    //記錄有問題的區域集合
    Set<String> worstZones = new HashSet<String>();
    double maxLoadPerServer = 0; //定義一個變量,保存所有zone中,平均負載最高值
    // true:zone有限可用
    // false:zone全部可用
    boolean limitedZoneAvailability = false; //
	
    //遍歷所有的區域信息. 對每個zone進行逐一分析
    for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
        String zone = zoneEntry.getKey();  //得到zone字符串
        ZoneSnapshot zoneSnapshot = zoneEntry.getValue(); //得到該zone的快照信息
        int instanceCount = zoneSnapshot.getInstanceCount();
        if (instanceCount == 0) { //若該zone內一個實例都木有了,那就是完全不可用,那就移除該zone,然后標記zone是有限可用的(並非全部可用)
            availableZones.remove(zone);
            limitedZoneAvailability = true;
        } else {
            double loadPerServer = zoneSnapshot.getLoadPerServer(); //獲取該區域的平均負載
            // 機器的熔斷總數 / 總實例數已經超過了閾值(默認為1,也就是全部熔斷才會認為該zone完全不可用)
            if (((double) zoneSnapshot.getCircuitTrippedCount())
                / instanceCount >= triggeringBlackoutPercentage
                || loadPerServer < 0) { //loadPerServer表示當前區域所有節點都熔斷了。
                availableZones.remove(zone); 
                limitedZoneAvailability = true;
            } else { // 進入到這個邏輯,說明並不是完全不可用,就看看區域的狀態
                // 如果當前負載和最大負載相當,那認為當前區域狀態很不好,加入到worstZones中
                if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
                    // they are the same considering double calculation
                    // round error
                    worstZones.add(zone);
                   
                } else if (loadPerServer > maxLoadPerServer) {// 或者若當前負載大於最大負載了。
                    maxLoadPerServer = loadPerServer;
                    worstZones.clear();
                    worstZones.add(zone);
                }
            }
        }
    }
    // 如果最大負載小於設定的負載閾值 並且limitedZoneAvailability=false
	// 說明全部zone都可用,並且最大負載都還沒有達到閾值,那就把全部zone返回   
    if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
        // zone override is not needed here
        return availableZones;
    }
    //若最大負載超過閾值, 就不能全部返回,則直接從負載最高的區域中隨機返回一個,這么處理的目的是把負載最高的那個哥們T除掉,再返回結果。
    String zoneToAvoid = randomChooseZone(snapshot, worstZones);
    if (zoneToAvoid != null) {
        availableZones.remove(zoneToAvoid);
    }
    return availableZones;

}

上述邏輯還是比較復雜的,我們通過一個簡單的文字進行說明:

  1. 如果zone為null,那么也就是沒有可用區域,直接返回null
  2. 如果zone的可用區域為1,也沒有什么可以選擇的,直接返回這一個
  3. 使用Set<String> worstZones記錄所有zone中比較狀態不好的的zone列表,用maxLoadPerServer表示所有zone中負載最高的區域;用limitedZoneAvailability表示是否是部分zone可用(true:部分可用,false:全部可用),接着我們需要遍歷所有的zone信息,逐一進行判斷從而對有效zone的結果進行處理。
    1. 如果當前zoneinstanceCount為0,那就直接把這個區域移除就行,並且標記limitedZoneAvailability為部分可用,沒什么好說的。
    2. 獲取當前總的平均負載loadPerServer,如果zone內的熔斷實例數 / 總實例數 >= triggeringBlackoutPercentage 或者 loadPerServer < 0的話,說明當前區域有問題,直接執行remove移除當前zone,並且limitedZoneAvailability=true .
      1. (熔斷實例數 / 總實例數 >= 閾值,標記為當前zone就不可用了(移除掉),這個很好理解。這個閾值為0.99999d也就說所有的Server實例被熔斷了,該zone才算不可用了).
      2. loadPerServer = -1,也就說當所有實例都熔斷了。這兩個條件判斷都差不多,都是判斷這個區域的可用性。
    3. 如果當前zone沒有達到閾值,則判斷區域的負載情況,從所有zone中找到負載最高的區域(負載差值在0.000001d),則把這些區域加入到worstZones列表,也就是這個集合保存的是負載較高的區域。
  4. 通過上述遍歷對區域數據進行計算后,最后要設置返回的有效區域數據。
    1. 最高負載maxLoadPerServer仍舊小於提供的triggeringLoad閾值,並且並且limitedZoneAvailability=false(就是說所有zone都可用的情況下),那就返回所有的zone:availableZones。 (也就是所有區域的負載都在閾值范圍內並且每個區域內的節點都還存活狀態,就全部返回)
    2. 否則,最大負載超過閾值或者某些區域存在部分不可用的節點時,就從這些負載較高的節點worstZones中隨機移除一個

AbstractServerPredicate

在回答下面的代碼,通過getEligibleServers判斷可用服務節點后,如果可用節點不為0 ,則執行incrementAndGetModulo方法進行輪詢。

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
    List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {
        return Optional.absent();
    }
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

該方法是通過輪詢來實現,代碼如下!

private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextIndex.get();
        int next = (current + 1) % modulo;
        if (nextIndex.compareAndSet(current, next) && current < modulo)
            return current;
    }
}

服務列表的加載過程

在本實例中,我們將服務列表配置在application.properties文件中,意味着在某個時候會加載這個列表,保存在某個位置,那它是在什么時候加載的呢?

RibbonClientConfiguration這個配置類中,有下面這個Bean的聲明,(該Bean是條件觸發)它用來定義默認的負載均衡實現。

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                                        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                                        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
        return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                                       serverListFilter, serverListUpdater);
}

前面分析過,它的類關系圖如下!

image-20211211153617850

ZoneAwareLoadBalancer在初始化時,會調用父類DynamicServerListLoadBalancer的構造方法,代碼如下。

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }

restOfInit

restOfInit方法主要做兩件事情。

  1. 開啟動態更新Server的功能
  2. 更新Server列表
void restOfInit(IClientConfig clientConfig) {
    boolean primeConnection = this.isEnablePrimingConnections();
    // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
    this.setEnablePrimingConnections(false);
    enableAndInitLearnNewServersFeature(); //開啟動態更新Server

    updateListOfServers(); //更新Server列表
    
    
    if (primeConnection && this.getPrimeConnections() != null) {
        this.getPrimeConnections()
            .primeConnections(getReachableServers());
    }
    this.setEnablePrimingConnections(primeConnection);
    LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}

updateListOfServers

全量更新一次服務列表。

public void updateListOfServers() {
    List<T> servers = new ArrayList<T>();
    if (serverListImpl != null) {
        servers = serverListImpl.getUpdatedListOfServers();
        LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                     getIdentifier(), servers);

        if (filter != null) {
            servers = filter.getFilteredListOfServers(servers);
            LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                         getIdentifier(), servers);
        }
    }
    updateAllServerList(servers);
}

上述代碼解釋如下

  1. 由於我們是通過application.properties文件配置的靜態服務地址列表,所以此時serverListImpl的實例為:ConfigurationBasedServerList,調用getUpdatedListOfServers方法時,返回的是在application.properties文件中定義的服務列表。
  2. 判斷是否需要filter,如果有,則通過filter進行服務列表過濾。

最后調用updateAllServerList,更新所有Server到本地緩存中。

protected void updateAllServerList(List<T> ls) {
    // other threads might be doing this - in which case, we pass
    if (serverListUpdateInProgress.compareAndSet(false, true)) {
        try {
            for (T s : ls) {
                s.setAlive(true); // set so that clients can start using these
                // servers right away instead
                // of having to wait out the ping cycle.
            }
            setServersList(ls);
            super.forceQuickPing();
        } finally {
            serverListUpdateInProgress.set(false);
        }
    }
}

動態Ping機制

在Ribbon中,基於Ping機制,目標服務地址也會發生動態變更,具體的實現方式在DynamicServerListLoadBalancer.restOfInit方法中

void restOfInit(IClientConfig clientConfig) {
    boolean primeConnection = this.isEnablePrimingConnections();
    // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
    this.setEnablePrimingConnections(false);
    enableAndInitLearnNewServersFeature();  //開啟定時任務動態更新

    updateListOfServers();
    if (primeConnection && this.getPrimeConnections() != null) {
        this.getPrimeConnections()
            .primeConnections(getReachableServers());
    }
    this.setEnablePrimingConnections(primeConnection);
    LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
public void enableAndInitLearnNewServersFeature() {
    LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
    serverListUpdater.start(updateAction);
}

注意,這里會啟動一個定時任務,而定時任務所執行的程序是updateAction,它是一個匿名內部類,定義如下。

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
    @Override
    public void doUpdate() {
        updateListOfServers();
    }
};

定時任務的啟動方法如下,這個任務每隔30s執行一次。

public synchronized void start(final UpdateAction updateAction) {
    if (isActive.compareAndSet(false, true)) {
        final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {
                if (!isActive.get()) {
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {
                    updateAction.doUpdate();  //執行具體的任務。
                    lastUpdated = System.currentTimeMillis();
                } catch (Exception e) {
                    logger.warn("Failed one update cycle", e);
                }
            }
        };

        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
            wrapperRunnable,
            initialDelayMs,  //1000
            refreshIntervalMs,  //30000 
            TimeUnit.MILLISECONDS 
        );
    } else {
        logger.info("Already active, no-op");
    }
}

當30s之后觸發了doUpdate方法后,最終進入到updateAllServerList方法

protected void updateAllServerList(List<T> ls) {
    // other threads might be doing this - in which case, we pass
    if (serverListUpdateInProgress.compareAndSet(false, true)) {
        try {
            for (T s : ls) {
                s.setAlive(true); // set so that clients can start using these
                // servers right away instead
                // of having to wait out the ping cycle.
            }
            setServersList(ls);
            super.forceQuickPing();
        } finally {
            serverListUpdateInProgress.set(false);
        }
    }
}

其中,會調用 super.forceQuickPing();進行心跳健康檢測。

public void forceQuickPing() {
    if (canSkipPing()) {
        return;
    }
    logger.debug("LoadBalancer [{}]:  forceQuickPing invoking", name);

    try {
        new Pinger(pingStrategy).runPinger();
    } catch (Exception e) {
        logger.error("LoadBalancer [{}]: Error running forceQuickPing()", name, e);
    }
}

RibbonLoadBalancerClient.execute

經過上述分析,再回到RibbonLoadBalancerClient.execute方法!

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
    throws IOException {
    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
    Server server = getServer(loadBalancer, hint);
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                                                 isSecure(server, serviceId),
                                                 serverIntrospector(serviceId).getMetadata(server));

    return execute(serviceId, ribbonServer, request);
}

此時, Server server = getServer(loadBalancer, hint);這行代碼,會返回一個具體的目標服務器。

其中,在調用execute方法之前,會包裝一個RibbonServer對象傳遞下去,它的主要作用是用來記錄請求的負載信息。

@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
                     LoadBalancerRequest<T> request) throws IOException {
    Server server = null;
    if (serviceInstance instanceof RibbonServer) {
        server = ((RibbonServer) serviceInstance).getServer();
    }
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }

    RibbonLoadBalancerContext context = this.clientFactory
        .getLoadBalancerContext(serviceId);
    RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

    try {
        T returnVal = request.apply(serviceInstance);
        statsRecorder.recordStats(returnVal);  //記錄請求狀態
        return returnVal;
    }
    // catch IOException and rethrow so RestTemplate behaves correctly
    catch (IOException ex) {
        statsRecorder.recordStats(ex); //記錄請求狀態
        throw ex;
    }
    catch (Exception ex) {
        statsRecorder.recordStats(ex);
        ReflectionUtils.rethrowRuntimeException(ex);
    }
    return null;
}

request.apply

request是LoadBalancerRequest接口,它里面提供了一個apply方法,但是從代碼中我們發現這個方法並沒有實現類,那么它是在哪里實現的呢?

繼續又往前分析發現,這個request對象是從LoadBalancerInterceptor的intercept方法中傳遞過來的.

public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
    URI originalUri = request.getURI();
    String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
    return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}

而request的傳遞,是通過this.requestFactory.createRequest(request, body, execution)創建而來,於是我們找到這個方法。

public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) {
    return (instance) -> {
        HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
        LoadBalancerRequestTransformer transformer;
        if (this.transformers != null) {
            for(Iterator var6 = this.transformers.iterator(); var6.hasNext(); serviceRequest = transformer.transformRequest((HttpRequest)serviceRequest, instance)) {
                transformer = (LoadBalancerRequestTransformer)var6.next();
            }
        }

        return execution.execute((HttpRequest)serviceRequest, body);
    };
}

從代碼中發現,它是一個用lambda表達式實現的匿名內部類。在該內部類中,創建了一個ServiceRequestWrapper,這個ServiceRequestWrapper實際上就是HttpRequestWrapper的一個子類,ServiceRequestWrapper重寫了HttpRequestWrapper的getURI()方法,重寫的URI實際上就是通過調用LoadBalancerClient接口的reconstructURI函數來重新構建一個URI進行訪問。

InterceptingClientHttpRequest.execute

上述代碼執行的execution.execute,又會進入到InterceptingClientHttpRequest.execute方法中,代碼如下。

public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
    if (this.iterator.hasNext()) {
        ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
        return nextInterceptor.intercept(request, body, this);
    }
    else {
        HttpMethod method = request.getMethod();
        Assert.state(method != null, "No standard HTTP method");
        ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method); //注意這里
        request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
        if (body.length > 0) {
            if (delegate instanceof StreamingHttpOutputMessage) {
                StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
            }
            else {
                StreamUtils.copy(body, delegate.getBody());
            }
        }
        return delegate.execute();
    }
}

此時需要注意,request對象的實例是HttpRequestWrapper

request.getURI()

當調用request.getURI()獲取目標地址創建http請求時,會調用ServiceRequestWrapper中的.getURI()方法。

@Override
public URI getURI() {
    URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
    return uri;
}

在這個方法中,調用RibbonLoadBalancerClient實例中的reconstructURI方法,根據service-id生成目標服務地址。

RibbonLoadBalancerClient.reconstructURI

public URI reconstructURI(ServiceInstance instance, URI original) {
		Assert.notNull(instance, "instance can not be null");
		String serviceId = instance.getServiceId(); //獲取實例id,也就是服務名稱
		RibbonLoadBalancerContext context = this.clientFactory
				.getLoadBalancerContext(serviceId); //獲取RibbonLoadBalancerContext上下文,這個是從spring容器中獲取的對象實例。

		URI uri;
		Server server;
		if (instance instanceof RibbonServer) { //如果instance為RibbonServer
			RibbonServer ribbonServer = (RibbonServer) instance;
			server = ribbonServer.getServer();  //獲取目標服務器的Server信息
			uri = updateToSecureConnectionIfNeeded(original, ribbonServer); //判斷是否需要更新成一個安全連接。
		}
		else { //如果是一個普通的http地址
			server = new Server(instance.getScheme(), instance.getHost(),
					instance.getPort());
			IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
			ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
			uri = updateToSecureConnectionIfNeeded(original, clientConfig,
					serverIntrospector, server);
		}
		return context.reconstructURIWithServer(server, uri);  //調用這個方法拼接成一個真實的目標服務器地址。
}

版權聲明:本博客所有文章除特別聲明外,均采用 CC BY-NC-SA 4.0 許可協議。轉載請注明來自 Mic帶你學架構
如果本篇文章對您有幫助,還請幫忙點個關注和贊,您的堅持是我不斷創作的動力。歡迎關注「跟着Mic學架構」公眾號公眾號獲取更多技術干貨!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM