客戶端負載均衡Ribbon之源碼解析


什么是負載均衡器?

假設有一個分布式系統,該系統由在不同計算機上運行的許多服務組成。但是,當用戶數量很大時,通常會為服務創建多個副本。每個副本都在另一台計算機上運行。此時,出現 “Load Balancer(負載均衡器)”。它有助於在服務器之間平均分配傳入流量。

服務器端負載均衡器

傳統上,Load Balancers(例如Nginx、F5)是放置在服務器端的組件。當請求來自 客戶端 時,它們將轉到負載均衡器,負載均衡器將為請求指定 服務器。負載均衡器使用的最簡單的算法是隨機指定。在這種情況下,大多數負載平衡器是用於控制負載平衡的硬件集成軟件。

重點:

  • 對客戶端不透明,客戶端不知道服務器端的服務列表,甚至不知道自己發送請求的目標地址存在負載均衡器。
  • 服務器端維護負載均衡服務器,控制負載均衡策略和算法。

客戶端負載均衡器

當負載均衡器位於 客戶端 時,客戶端得到可用的服務器列表然后按照特定的負載均衡策略,分發請求到不同的 服務器

重點:

  • 對客戶端透明,客戶端需要知道服務器端的服務列表,需要自行決定請求要發送的目標地址。
  • 客戶端維護負載均衡服務器,控制負載均衡策略和算法。
  • 目前單獨提供的客戶端實現比較少( 我用過的只有Ribbon),大部分都是在框架內部自行實現。

Ribbon

簡介

Ribbon是Netflix公司開源的一個客戶單負載均衡的項目,可以自動與 Eureka 進行交互。它提供下列特性:

  • 負載均衡
  • 容錯
  • 以異步和反應式模型執行多協議 (HTTP, TCP, UDP)
  • 緩存和批量

Ribbon中的關鍵組件

  • ServerList:可以響應客戶端的特定服務的服務器列表。
  • ServerListFilter:可以動態獲得的具有所需特征的候選服務器列表的過濾器。
  • ServerListUpdater:用於執行動態服務器列表更新。
  • Rule:負載均衡策略,用於確定從服務器列表返回哪個服務器。
  • Ping:客戶端用於快速檢查服務器當時是否處於活動狀態。
  • LoadBalancer:負載均衡器,負責負載均衡調度的管理。

源碼分析

LoadBalancerClient

實際應用中,通常將 RestTemplate 和 Ribbon 結合使用,例如:

@Configuration
public class RibbonConfig {
    @Bean
    @LoadBalanced
    RestTemplate restTemplate() {
        return new RestTemplate();
    }
}

消費者調用服務接口:

@Service
public class RibbonService {
    @Autowired
    private RestTemplate restTemplate;
    public String hi(String name) {
        return restTemplate.getForObject("http://service-hi/hi?name="+name,String.class);
    }
}

@LoadBalanced,通過源碼可以發現這是一個標記注解:

/**
 * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
 * @author Spencer Gibb
 */
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}

通過注釋可以知道@LoadBalanced注解是用來給RestTemplate做標記,方便我們對RestTemplate添加一個LoadBalancerClient,以實現客戶端負載均衡。

根據spring boot的自動配置原理,可以知道同包下的LoadBalancerAutoConfiguration,應該是實現客戶端負載均衡器的自動化配置類。代碼如下:

@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

	@LoadBalanced
	@Autowired(required = false)
	private List<RestTemplate> restTemplates = Collections.emptyList();

	@Bean
	public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
			final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
		return () -> restTemplateCustomizers.ifAvailable(customizers -> {
            for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                for (RestTemplateCustomizer customizer : customizers) {
                    customizer.customize(restTemplate);
                }
            }
        });
	}

	@Autowired(required = false)
	private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

	@Bean
	@ConditionalOnMissingBean
	public LoadBalancerRequestFactory loadBalancerRequestFactory(
			LoadBalancerClient loadBalancerClient) {
		return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
	}

	@Configuration
	@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
	static class LoadBalancerInterceptorConfig {
		@Bean
		public LoadBalancerInterceptor ribbonInterceptor(
				LoadBalancerClient loadBalancerClient,
				LoadBalancerRequestFactory requestFactory) {
			return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
		}
	
		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final LoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
		}
	}
	
	@Configuration
	@ConditionalOnClass(RetryTemplate.class)
	public static class RetryAutoConfiguration {

		@Bean
		@ConditionalOnMissingBean
		public LoadBalancedRetryFactory loadBalancedRetryFactory() {
			return new LoadBalancedRetryFactory() {};
		}
	}
	
	@Configuration
	@ConditionalOnClass(RetryTemplate.class)
	public static class RetryInterceptorAutoConfiguration {
		@Bean
		@ConditionalOnMissingBean
		public RetryLoadBalancerInterceptor ribbonInterceptor(
				LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties,
				LoadBalancerRequestFactory requestFactory,
				LoadBalancedRetryFactory loadBalancedRetryFactory) {
			return new RetryLoadBalancerInterceptor(loadBalancerClient, properties,
					requestFactory, loadBalancedRetryFactory);
		}

		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
		}
	}
}

從代碼可以看出,這個類作用主要是使用RestTemplateCustomizer對所有標注了@LoadBalanced的RestTemplate Bean添加了一個LoadBalancerInterceptor攔截器,而這個攔截器的作用就是對請求的URI進行轉換獲取到具體應該請求哪個服務實例。

那再看看添加的攔截器LoadBalancerInterceptor的代碼,如下:

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

	private LoadBalancerClient loadBalancer;
	private LoadBalancerRequestFactory requestFactory;

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
		this.loadBalancer = loadBalancer;
		this.requestFactory = requestFactory;
	}

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
		// for backwards compatibility
		this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
	}

	@Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
		final URI originalUri = request.getURI();
		String serviceName = originalUri.getHost();
		Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
		return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
	}
}

從代碼可以看出 LoadBalancerInterceptor 攔截了請求后,通過LoadBalancerClient執行具體的請求發送。

打開LoadBalancerClient,發現它是一個接口:

public interface LoadBalancerClient {

	ServiceInstance choose(String serviceId);

	<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

	<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;

	URI reconstructURI(ServiceInstance instance, URI original);
}

接口說明:

  • ServiceInstance choose(String serviceId):根據傳入的服務id,從負載均衡器中為指定的服務選擇一個服務實例。
  • T execute(String serviceId, LoadBalancerRequest request):根據傳入的服務id,指定的負載均衡器中的服務實例執行請求。
  • T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request):根據傳入的服務實例,執行請求。

LoadBalancerClient 有一個唯一的實現類 RibbonLoadBalancerClient,關鍵代碼如下:

public class RibbonLoadBalancerClient implements LoadBalancerClient {

	public ServiceInstance choose(String serviceId) {
	    Server server = this.getServer(serviceId);
	    return server == null ? null : new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
	}
	
	public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
	    ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
	    Server server = this.getServer(loadBalancer);
	    if (server == null) {
	        throw new IllegalStateException("No instances available for " + serviceId);
	    } else {
	        RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
	        return this.execute(serviceId, ribbonServer, request);
	    }
	}
	
	protected Server getServer(String serviceId) {
	    return this.getServer(this.getLoadBalancer(serviceId));
	}
	
	protected Server getServer(ILoadBalancer loadBalancer) {
	    return loadBalancer == null ? null : loadBalancer.chooseServer("default");
	}
	
	protected ILoadBalancer getLoadBalancer(String serviceId) {
	    return this.clientFactory.getLoadBalancer(serviceId);
	}
	
	//省略...

}

負載均衡器

從 RibbonLoadBalancerClient 代碼可以看出,實際負載均衡的是通過 ILoadBalancer 來實現的。

ILoadBalancer 接口代碼如下:

public interface ILoadBalancer {

    public void addServers(List<Server> newServers);

    public Server chooseServer(Object key);

    public void markServerDown(Server server);

    public List<Server> getReachableServers();

    public List<Server> getAllServers();
}

接口說明:

  • addServers:向負載均衡器中添加一個服務實例集合。
  • chooseServer:跟據key,從負載均衡器獲取服務實例。
  • markServerDown:用來標記某個服務實例下線。
  • getReachableServers:獲取可用的服務實例集合。
  • getAllServers():獲取所有服務實例集合,包括下線的服務實例。

ILoadBalancer 的實現 依賴關系示意圖如下:

  • NoOpLoadBalancer:啥都不做
  • BaseLoadBalancer:
  • 一個負載均衡器的基本實現,其中有一個任意列表,可以將服務器設置為服務器池。
  • 可以設置一個ping來確定服務器的活力。
  • 在內部,該類維護一個“all”服務器列表,以及一個“up”服務器列表,並根據調用者的要求使用它們。
  • DynamicServerListLoadBalancer:
  • 通過動態的獲取服務器的候選列表的負載平衡器。
  • 可以通過篩選標准來傳遞服務器列表,以過濾不符合所需條件的服務器。
  • ZoneAwareLoadBalancer:
  • 用於測量區域條件的關鍵指標是平均活動請求,它根據每個rest客戶機和每個區域聚合。這是區域內未完成的請求總數除以可用目標實例的數量(不包括斷路器跳閘實例)。當在壞區上緩慢發生超時時,此度量非常有效。
  • 該負載均衡器將計算並檢查所有可用區域的區域狀態。如果任何區域的平均活動請求已達到配置的閾值,則該區域將從活動服務器列表中刪除。如果超過一個區域達到閾值,則將刪除每個服務器上活動請求最多的區域。一旦去掉最壞的區域,將在其余區域中選擇一個區域,其概率與其實例數成正比。服務器將使用給定的規則從所選區域返回。對於每個請求,將重復上述步驟。也就是說,每個與區域相關的負載平衡決策都是實時做出的,最新的統計數據可以幫助進行選擇。

那么在整合Ribbon的時候Spring Cloud默認采用了哪個具體實現呢?我們通過RibbonClientConfiguration配置類,可以知道在整合時默認采用了ZoneAwareLoadBalancer來實現負載均衡器。

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    return (ILoadBalancer)(this.propertiesFactory
    .isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory
    .get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater));
}

從這段代碼 ,也可以看出,負載均衡器所需的主要配置項是IClientConfig, ServerList, ServerListFilter, IRule, IPing, ServerListUpdater。下面逐一分析他們。

IClientConfig

IClientConfig 用於對客戶端或者負載均衡的配置,它的默認實現類為 DefaultClientConfigImpl。

IRule

為LoadBalancer定義“負載均衡策略”的接口。

public interface IRule{

    public Server choose(Object key);
    
    public void setLoadBalancer(ILoadBalancer lb);
    
    public ILoadBalancer getLoadBalancer();    
}

IRule 的實現 依賴關系示意圖如下:

  • BestAvailableRule:選擇具有最低並發請求的服務器。
  • ClientConfigEnabledRoundRobinRule:輪詢。
  • RandomRule:隨機選擇一個服務器。
  • RoundRobinRule:輪詢選擇服務器。
  • RetryRule:具備重試機制的輪詢。
  • WeightedResponseTimeRule:根據使用平均響應時間去分配一個weight(權重) ,weight越低,被選擇的可能性就越低。
  • ZoneAvoidanceRule:根據區域和可用性篩選,再輪詢選擇服務器。

IPing

定義如何 “ping” 服務器以檢查其是否存活。

public interface IPing {
    public boolean isAlive(Server server);
}

IPing 的實現 依賴關系示意圖如下:

  • PingUrl:真實的去ping 某個url,判斷其是否alive。
  • PingConstant:固定返回某服務是否可用,默認返回true,即可用
  • NoOpPing:不去ping,直接返回true,即可用。
  • DummyPing:繼承抽象類AbstractLoadBalancerPing,認為所以服務都是存活狀態,返回true,即可用。
  • NIWSDiscoveryPing:結合eureka使用時,如果Discovery Client在線,則認為心跳檢測通過。

ServerList

定義獲取所有的服務實例清單。

public interface ServerList<T extends Server> {
    public List<T> getInitialListOfServers();
    public List<T> getUpdatedListOfServers();   
}

ServerList 的實現 依賴關系示意圖如下:

  • DomainExtractingServerList:代理類,根據傳入的ServerList的值,實現具體的邏輯。
  • ConfigurationBasedServerList:從配置文件中加載服務器列表。
  • DiscoveryEnabledNIWSServerList:從Eureka注冊中心中獲取服務器列表。
  • StaticServerList:通過靜態配置來維護服務器列表。

ServerListFilter

允許根據過濾配置動態獲得的具有所需特性的候選服務器列表。

public interface ServerListFilter<T extends Server> {
    public List<T> getFilteredListOfServers(List<T> servers);
}

ServerListFilter 的實現 依賴關系示意圖如下:

  • DefaultNIWSServerListFilter:完全繼承自ZoneAffinityServerListFilter。
  • ZonePreferenceServerListFilter:EnableZoneAffinity 或 EnableZoneExclusivity 開啟狀態使用,默認關閉。處理基於區域感知的過濾服務器,過濾掉不和客戶端在相同zone的服務,若不存在相同zone,則不進行過濾。
  • ServerListSubsetFilter:服務器列表篩選器,它將負載平衡器使用的服務器數量限制為所有服務器的子集。如果服務器機群很大(例如數百個),並且不需要使用每一個機群並將連接保存在http客戶機的連接池中,那么這是非常有用的。它還可以通過比較總的網絡故障和並發連接來驅逐相對不健康的服務器。

ServerListUpdater

用於執行動態服務器列表更新。

public interface ServerListUpdater {

    public interface UpdateAction {
        void doUpdate();
    }

    void start(UpdateAction updateAction);

    void stop();

    String getLastUpdate();

    long getDurationSinceLastUpdateMs();

    int getNumberMissedCycles();

    int getCoreThreads();
}

ServerListUpdater 的實現 依賴關系示意圖如下:

  • PollingServerListUpdater:默認的實現策略,會啟動一個定時線程池,定時執行更新策略。
  • EurekaNotificationServerListUpdater:利用Eureka的事件監聽器來驅動服務列表的更新操作。

參考資料

https://github.com/Netflix/ribbon/wiki

http://tech.lede.com/2018/01/11/rd/server/NetflixRibbon/

http://blog.didispace.com/springcloud-sourcecode-ribbon/

https://www.fangzhipeng.com/springcloud/2017/08/11/Ribbon-resources.html

https://blog.csdn.net/Tincox/article/details/79210309




歡迎掃碼或微信搜索公眾號《程序員果果》關注我,關注有驚喜~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM