Spring Cloud Ribbon源碼分析---負載均衡實現


上一篇結合 Eureka 和 Ribbon 搭建了服務注冊中心,利用Ribbon實現了可配置負載均衡的服務調用。這一篇我們來分析Ribbon實現負載均衡的過程。

從 @LoadBalanced入手

還記得前面配置 RestTemplate:

@Bean
@LoadBalanced
RestTemplate restTemplate() {
    return new RestTemplate();
}

在消費端使用Spring 提供的 RestTemplate 來發出請求,而Ribbon 在 RestTemplate 上添加了@LoadBalanced 注解就可以實現負載均衡,這是怎樣做到的呢?

首先看 LoadBalanced 注解:

/**
 * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
 * @author Spencer Gibb
 */
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}

該注解的作用是為了使用 LoadBalancerClient。

LoadBalancerClient 是一個接口,包含兩個方法:

public interface LoadBalancerClient extends ServiceInstanceChooser {

	//針對來自LoadBalancer的特定的服務使用ServiceInstance來執行請求
    //根據 傳入的 服務名 和 負載均衡請求,從 服務實例中選出一個實例發送請求
	<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
	//針對來自LoadBalancer的特定的服務使用ServiceInstance來執行請求
    //指定了服務實例,直接對該實例發送請求
	<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;

    //使用真實的主機和端口創建適當的URI,以供系統使用。某些系統使用帶有邏輯服務名稱的URI作為主機
    //即將我們在  RestTemplate 中寫的 服務名換為真實的 ip + 端口 的形式
	URI reconstructURI(ServiceInstance instance, URI original);
}

LoadBalancerClient 接口只有一個實現類:RibbonLoadBalancerClient,點擊查看這個類的調用,會看到在:RibbonAutoConfiguration 類中實例化bean:

@Configuration
@ConditionalOnClass({ IClient.class, RestTemplate.class, AsyncRestTemplate.class, Ribbon.class})
@RibbonClients
@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
@AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
@EnableConfigurationProperties({RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class})
public class RibbonAutoConfiguration {

    @Bean
	@ConditionalOnMissingBean(LoadBalancerClient.class)
	public LoadBalancerClient loadBalancerClient() {
		return new RibbonLoadBalancerClient(springClientFactory());
	}
    
	@Configuration
	@ConditionalOnClass(HttpRequest.class)
	@ConditionalOnRibbonRestClient
	protected static class RibbonClientHttpRequestFactoryConfiguration {

		@Autowired
		private SpringClientFactory springClientFactory;

		@Bean
		public RestTemplateCustomizer restTemplateCustomizer(
				final RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory) {
			return restTemplate -> restTemplate.setRequestFactory(ribbonClientHttpRequestFactory);
		}

		@Bean
		public RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory() {
			return new RibbonClientHttpRequestFactory(this.springClientFactory);
		}
	}
    ......
    ......
    ......
}

在這里還實例化了RestTemplate。在類頭部的配置信息中 有個注解:@AutoConfigureBefore,即在該類初始化之前需要初始化的類有兩個:LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class。AsyncLoadBalancerAutoConfiguration LoadBalancerAutoConfiguration的異步實現版本,所以看一下 LoadBalancerAutoConfiguration 的實現即可:

@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

	@LoadBalanced
	@Autowired(required = false)
	private List<RestTemplate> restTemplates = Collections.emptyList();

	@Bean
	public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
			final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
		return () -> restTemplateCustomizers.ifAvailable(customizers -> {
            for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                for (RestTemplateCustomizer customizer : customizers) {
                    customizer.customize(restTemplate);
                }
            }
        });
	}

	@Autowired(required = false)
	private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

	@Bean
	@ConditionalOnMissingBean
	public LoadBalancerRequestFactory loadBalancerRequestFactory(
			LoadBalancerClient loadBalancerClient) {
		return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
	}

	@Configuration
	@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
	static class LoadBalancerInterceptorConfig {
		@Bean
		public LoadBalancerInterceptor ribbonInterceptor(
				LoadBalancerClient loadBalancerClient,
				LoadBalancerRequestFactory requestFactory) {
			return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
		}

		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final LoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
		}
	}

	@Configuration
	@ConditionalOnClass(RetryTemplate.class)
	public static class RetryAutoConfiguration {

		@Bean
		@ConditionalOnMissingBean
		public LoadBalancedRetryFactory loadBalancedRetryFactory() {
			return new LoadBalancedRetryFactory() {};
		}
	}

	@Configuration
	@ConditionalOnClass(RetryTemplate.class)
	public static class RetryInterceptorAutoConfiguration {
		@Bean
		@ConditionalOnMissingBean
		public RetryLoadBalancerInterceptor ribbonInterceptor(
				LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties,
				LoadBalancerRequestFactory requestFactory,
				LoadBalancedRetryFactory loadBalancedRetryFactory) {
			return new RetryLoadBalancerInterceptor(loadBalancerClient, properties,
					requestFactory, loadBalancedRetryFactory);
		}

		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                        restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
		}
	}
}

在LoadBalancerAutoConfiguration類中,創建了一個LoadBalancerInterceptor攔截器,還維護了一個被@LoadBalanced修飾的RestTemplate列表,在初始化的時候,會為每個restTemplate實例添加LoadBalancerInterceptor攔截器。另外上面還能看到 Ribbon在請求失敗使用的重試框架是Spring的Retry,重試的時候也會走配置好的負載均衡攔截器。繼續跟蹤 LoadBalancerInterceptor:

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

	private LoadBalancerClient loadBalancer;
	private LoadBalancerRequestFactory requestFactory;

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
		this.loadBalancer = loadBalancer;
		this.requestFactory = requestFactory;
	}

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
		// for backwards compatibility
		this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
	}

	@Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
		final URI originalUri = request.getURI();
		String serviceName = originalUri.getHost();
		Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
		return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
	}
}

上面可以看到主要的邏輯在intercept方法中,從originalUri.getHost()中得到的就是我們在 restTemplate中填寫的服務名,然后會調用LoadBalancerClient 的execute方法,即上面 LoadBalancerClient 的實現類: RibbonLoadBalancerClient,上面我們提了一嘴然后就跑到了LoadBalancerClient 的實例化邏輯里面去了,最終跟蹤到現在,才發原來是在實例化LoadBalancerClient 的過程中塞入了一個攔截器。

public class RibbonLoadBalancerClient implements LoadBalancerClient {

	private SpringClientFactory clientFactory;

	public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
		this.clientFactory = clientFactory;
	}

	@Override
	public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
		ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
		Server server = getServer(loadBalancer);
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
				serviceId), serverIntrospector(serviceId).getMetadata(server));

		return execute(serviceId, ribbonServer, request);
	}
    
    protected ILoadBalancer getLoadBalancer(String serviceId) {
		return this.clientFactory.getLoadBalancer(serviceId);
	}
    
    ......

}

在execute方法中第一行返回了ILoadBalancer, ILoadBalancer 中定義了負載均衡器的操作的接口:

public interface ILoadBalancer {

	/**
	 *
	 * 可以用於服務器的初始列表。
	 * 同時也會將指定的服務器添加進服務器列表
	 * 同一邏輯服務器(host:port)本質上可以多次添加
	 * 
	 * @param newServers new servers to add
	 */
	public void addServers(List<Server> newServers);
	
	/**
	 * 根據指定的key選擇服務器
	 * 
	 * @return server chosen
	 */
	public Server chooseServer(Object key);
	
	/**
	 * 由負載均衡器的客戶端調用以通知服務器已關閉
	 * 否則,LB將認為它仍然有效直到下一個Ping周期
	 *(假設LB Impl執行ping操作)
	 * 
	 * @param server Server to mark as down
	 */
	public void markServerDown(Server server);
	
	/**
	 *
	 * 已過期
	 * 獲取服務器列表
	 *
	 * @param availableOnly if true, only live and available servers should be returned
	 */
	@Deprecated
	public List<Server> getServerList(boolean availableOnly);

	/**
	 * 獲取可用的服務器列表
	 * @return Only the servers that are up and reachable.
     */
    public List<Server> getReachableServers();

    /**
     * 獲取所有的服務器列表, 包含可用的和不可用的
     * @return All known servers, both reachable and unreachable.
     */
	public List<Server> getAllServers();
}

因為ILoadBalancer的實現類還是挺多的,我們先看一下完整的類圖:

從上面的類圖可以看到,ILoadBalancer 有三個實現類,他們之間是繼承關系,最終的實現類是 ZoneAwareLoadBalancer。因為很多東西都是定義在BaseLoadBalancer中的,為了能看懂,我們一起先從 BaseLoadBalancer 中的 變量開始看:

public class BaseLoadBalancer extends AbstractLoadBalancer implements
        PrimeConnections.PrimeConnectionListener, IClientConfigAware {
    
    
    private final static IRule DEFAULT_RULE = new RoundRobinRule();
    
    private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
    
    protected IRule rule = DEFAULT_RULE;

    protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;

    protected IPing ping = null;

    @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
    
    @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());
    
    private IClientConfig config; 
    
}

從上面的變量信息我們可以看出一些東西:

  1. 默認的 負載均衡策略是隨機負載均衡;
  2. 默認的Ping策略為串行化Ping;
  3. 使用了一個list來保存所有的服務列表,一個list來保存當前所有的存活狀態的服務列表;
  4. 定義了客戶端配置,用於初始化客戶端以及負載均衡配置 。

接着看 BaseLoadBalancer 默認的無參構造函數做了什么:

public BaseLoadBalancer() {
    this.name = DEFAULT_NAME;
    this.ping = null;
    setRule(DEFAULT_RULE);
    setupPingTask();
    lbStats = new LoadBalancerStats(DEFAULT_NAME);
}
  1. 將負載均衡的默認規則設置為隨機;
  2. 設置Ping 定時任務;
  3. 負載均衡的一些狀態設置,為下次執行負載均衡提供參考。

setupPingTask() 方法里面還是有一些東西的,我們一起看一下:

void setupPingTask() {
    if (canSkipPing()) {
        return;
    }
    if (lbTimer != null) {
        lbTimer.cancel();
    }
    lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                                       true);
    lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
    forceQuickPing();
}

首先是設置了一個定時任務,任務的執行間隔是每 10S 執行一次。

跟進到PingTask類中:

//    protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;

class PingTask extends TimerTask {
    public void run() {
        try {
            new Pinger(pingStrategy).runPinger();
        } catch (Exception e) {
            logger.error("LoadBalancer [{}]: Error pinging", name, e);
        }
    }
}

//默認Ping 的方式

private static class SerialPingStrategy implements IPingStrategy {

    @Override
    public boolean[] pingServers(IPing ping, Server[] servers) {
        int numCandidates = servers.length;
        boolean[] results = new boolean[numCandidates];

        logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);

        for (int i = 0; i < numCandidates; i++) {
            results[i] = false; /* Default answer is DEAD. */
            try {
                if (ping != null) {
                    results[i] = ping.isAlive(servers[i]);
                }
            } catch (Exception e) {
                logger.error("Exception while pinging Server: '{}'", servers[i], e);
            }
        }
        return results;
    }
}

設置Ping的策略為:DEFAULT_PING_STRATEGY,即上面提到的串行化 Ping的方式。

在 runPinger() 方法中會拿到當前 allServerList 中 所有的節點去Ping,在allServerList 中標記其存活狀態,用當前Ping的存活狀態 和 上次的 准或狀態比對,如果 狀態不同 ,發出通知 這些節點狀態有改變;如果狀態為存活,那么將 upServerList 中的節點用本次檢測出的所有存活節點替換。

class Pinger {

        private final IPingStrategy pingerStrategy;

        public Pinger(IPingStrategy pingerStrategy) {
            this.pingerStrategy = pingerStrategy;
        }

        public void runPinger() throws Exception {
            if (!pingInProgress.compareAndSet(false, true)) { 
                return; // Ping in progress - nothing to do
            }
            
            // we are "in" - we get to Ping

            Server[] allServers = null;
            boolean[] results = null;

            Lock allLock = null;
            Lock upLock = null;

            try {
                /*
                 * The readLock should be free unless an addServer operation is
                 * going on...
                 */
                allLock = allServerLock.readLock();
                allLock.lock();
                allServers = allServerList.toArray(new Server[allServerList.size()]);
                allLock.unlock();

                int numCandidates = allServers.length;
                results = pingerStrategy.pingServers(ping, allServers);

                final List<Server> newUpList = new ArrayList<Server>();
                final List<Server> changedServers = new ArrayList<Server>();

                for (int i = 0; i < numCandidates; i++) {
                    boolean isAlive = results[i];
                    Server svr = allServers[i];
                    boolean oldIsAlive = svr.isAlive();

                    svr.setAlive(isAlive);
					//如果當前檢測狀態和之前的狀態不一致,稍后會用於發送狀態變化通知
                    if (oldIsAlive != isAlive) {
                        changedServers.add(svr);
                        logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                    		name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                    }
					//如果是存活狀態,添加進list
                    if (isAlive) {
                        newUpList.add(svr);
                    }
                }
                upLock = upServerLock.writeLock();
                upLock.lock();
                upServerList = newUpList;
                upLock.unlock();
				//發送狀態變化通知消息
                notifyServerStatusChangeListener(changedServers);
            } finally {
                pingInProgress.set(false);
            }
        }
    }

看完初始化bean過程之后,我們接着看 ILoadBalancer 接口中的幾個方法在這里的實現:

addServers 方法:

@Override
public void addServers(List<Server> newServers) {
    if (newServers != null && newServers.size() > 0) {
        try {
            ArrayList<Server> newList = new ArrayList<Server>();
            newList.addAll(allServerList);
            newList.addAll(newServers);
            setServersList(newList);
        } catch (Exception e) {
            logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
        }
    }
}


public void setServersList(List lsrv) {
    Lock writeLock = allServerLock.writeLock();
    logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);

    ArrayList<Server> newServers = new ArrayList<Server>();
    writeLock.lock();
    try {
        ArrayList<Server> allServers = new ArrayList<Server>();
        for (Object server : lsrv) {
            if (server == null) {
                continue;
            }

            if (server instanceof String) {
                server = new Server((String) server);
            }
			//這里保存當前所有的server信息
            if (server instanceof Server) {
                logger.debug("LoadBalancer [{}]:  addServer [{}]", name, ((Server) server).getId());
                allServers.add((Server) server);
            } else {
                throw new IllegalArgumentException(
                    "Type String or Server expected, instead found:"
                    + server.getClass());
            }

        }
        boolean listChanged = false;
        //如果server信息有變化,給各個監聽事件發送監聽信息
        if (!allServerList.equals(allServers)) {
            listChanged = true;
            if (changeListeners != null && changeListeners.size() > 0) {
                List<Server> oldList = ImmutableList.copyOf(allServerList);
                List<Server> newList = ImmutableList.copyOf(allServers);                   
                for (ServerListChangeListener l: changeListeners) {
                    try {
                        l.serverListChanged(oldList, newList);
                    } catch (Exception e) {
                        logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);
                    }
                }
            }
        }
        if (isEnablePrimingConnections()) {
            for (Server server : allServers) {
                if (!allServerList.contains(server)) {
                    server.setReadyToServe(false);
                    newServers.add((Server) server);
                }
            }
            if (primeConnections != null) {
                primeConnections.primeConnectionsAsync(newServers, this);
            }
        }
        // This will reset readyToServe flag to true on all servers
        // regardless whether
        // previous priming connections are success or not
        //這里直接將以前的 serverList 用現在的覆蓋掉, 是沒有做唯一性校驗的 注意*************
        allServerList = allServers;
        //下面接着去執行 Ping 
        if (canSkipPing()) {
            //如果當前沒有默認的Ping策略,默認將所有的節點存活狀態設置為可用
            for (Server s : allServerList) {
                s.setAlive(true);
            }
            upServerList = allServerList;
        } else if (listChanged) {
            //執行默認的Ping 策略
            forceQuickPing();
        }
    } finally {
        writeLock.unlock();
    }
}

直接將 newServers 添加進 allServerList ,這里是沒有做唯一性過濾的。

chooseServer方法:

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

這里的代碼很簡單,最主要的代碼在:rule.choose(key)這一句,rule 為 默認的 RoundRobinRule,點進去 choose()方法,你就會進入到IRule接口,這里就是負載均衡規則的路由類, 后面我專門列一個章節講所有的負載均衡算法,這里就先跳過。

markServerDown方法:

public void markServerDown(Server server) {
    if (server == null || !server.isAlive()) {
        return;
    }

    logger.error("LoadBalancer [{}]:  markServerDown called on [{}]", name, server.getId());
    //這里將服務的狀態設置為不可用
    server.setAlive(false);
    // forceQuickPing();
	//給監聽器發送通知
    notifyServerStatusChangeListener(singleton(server));
}

getReachableServers方法:

@Override
public List<Server> getReachableServers() {
    return Collections.unmodifiableList(upServerList);
}

直接返回可用的服務list。

getAllServers方法:

@Override
public List<Server> getAllServers() {
    return Collections.unmodifiableList(allServerList);
}

直接返回所有的服務list。

分析完 BaseLoadBalancer ,DynamicServerListLoadBalancer 和 ZoneAwareLoadBalancer 基本大同小異:

  • DynamicServerListLoadBalancer :使用動態源的服務器, 即服務器列表可能是在運行時更改。 通過一些Filter函數來動態的過濾掉指定的服務器列表;
  • ZoneAwareLoadBalancer :這個負載均衡器適用於異地多機房的情況,在選擇服務器的時候可以避免整個區域。LoadBalancer將計算並檢查所有可用區域的區域統計信息。如果任何區域的“平均活動請求數”已達到配置的閾值,則該區域將從活動服務器列表中刪除。如果多個區域已達到閾值,則將刪除每台服務器上最活躍請求的區域。一旦刪除了最壞的區域,將在其余區域中選擇一個區域,其概率與其實例數成正比。服務器將從具有指定規則的選定區域返回。對於每個請求都將重復上述步驟,也就是說,每個區域相關的負載平衡決策都是在最新統計信息的幫助下實時做的。

以上,總結一下Ribbon的負載均衡如何實現:

在 LoadBalancerClient 類中 配置了LoadBalancerInterceptor,攔截器主要調用 ILoadBalancer 來進行 負載均衡邏輯處理。在 ILoadBalancer 中配置了 Ping 策略,負載均衡策略,維護當前可用的服務端列表信息。 RestTemplate 因為被 @LoadBalance 修飾以后,實例信息被添加進 LoadBalancerClient 的緩存中,然后被 RestTemplateCustomizer 包裝,里面封裝了攔截器信息所以能夠進行負載均衡配置。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM