前言
前情回顧
上一篇講了Ribbon的初始化過程,從LoadBalancerAutoConfiguration
到RibbonAutoConfiguration
再到RibbonClientConfiguration
,我們找到了ILoadBalancer
默認初始化的對象等。
本講目錄
這一講我們會進一步往下探究Ribbon和Eureka是如何結合的。
通過上一講ILoadBalancer
我們已經可以拿到一個服務所有的服務節點信息了,這里面是怎么把服務的名稱轉化為對應的具體host請求信息的呢?
通過這一講 我們來一探究竟
目錄如下:
- EurekaClientAutoConfiguration.getLoadBalancer()回顧
- 再次梳理Ribbon初始化過程
- ServerList實現類初始化過程
- getUpdatedListOfServers()獲取注冊表列表分析
- ribbon如何更新自己保存的注冊表信息?
說明
原創不易,如若轉載 請標明來源!
博客地址:一枝花算不算浪漫
微信公眾號:壹枝花算不算浪漫
源碼閱讀
EurekaClientAutoConfiguration.getLoadBalancer()回顧
上一講我們已經深入的講解過getLoadBalancer()
方法的實現,每個serviceName都對應一個自己的SpringContext上下文信息,然后通過ILoadBalancer.class
從上下文信息中獲取默認的LoadBalancer:ZoneAwareLoadBalancer
, 我們看下這個類的構造函數:
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}
繼續跟父類DynamicServerListLoadBalancer
的初始化方法:
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
volatile ServerList<T> serverListImpl;
volatile ServerListFilter<T> filter;
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
}
構造方法中有個restOfInit()
方法,進去后又會有updateListOfServers()
方法,看方法名就知道這個肯定是和server注冊表相關的,繼續往后看,servers = serverListImpl.getUpdatedListOfServers();
,這里直接調用getUpdatedListOfServers()
就獲取到了所有的注冊表信息。
可以看到ServerList
有四個實現類,這個到底是該調用哪個實現類的getUpdatedListOfServers()
方法呢?接着往下看。
再次梳理Ribbon初始化過程
第二講我們已經見過Ribbon的初始化過程,並畫了圖整理,這里針對於之前的圖再更新一下:
這里主要是增加了RibbonEurekaAutoConfiguration
和EurekaRibbonClientConfiguration
兩個配置類的初始化。
ServerList實現類初始化過程
上面已經梳理過 Ribbon
初始化的過程,其中在EurekaRibbonClientConfiguration
會初始化RibbonServerList
,代碼如下:
@Configuration
public class EurekaRibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
}
這里實際的ServerList
實際就是DiscoveryEnabledNIWSServerList
,我們看下這個類:
public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{
}
public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {
}
所以可以看出來ServerList
實際就是在這里進行初始化的,上面那個serverListImpl.getUpdatedListOfServers();
即為調用DiscoveryEnabledNIWSServerList.getUpdatedListOfServers()
方法了,繼續往下看。
getUpdatedListOfServers()獲取注冊表分析
直接看DiscoveryEnabledNIWSServerList.getUpdatedListOfServers()
源代碼:
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
看到這里代碼就已經很明顯了,我們來解讀下這段代碼:
- 通過eurekaClientProvider獲取對應EurekaClient
- 通過vipAdress(實際就是serviceName)獲取對應注冊表集合信息
- 將注冊信息組裝成
DiscoveryEnabledServer
列表
再回到DynamicServerListLoadBalancer.updateListOfServers()
中,這里獲取到對應的DiscoveryEnabledServer list后調用updateAllServerList()
方法,一路跟蹤這里最終會調用BaseLoadBalancer.setServersList()
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
public void setServersList(List lsrv) {
Lock writeLock = allServerLock.writeLock();
logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);
ArrayList<Server> newServers = new ArrayList<Server>();
writeLock.lock();
try {
ArrayList<Server> allServers = new ArrayList<Server>();
for (Object server : lsrv) {
if (server == null) {
continue;
}
if (server instanceof String) {
server = new Server((String) server);
}
if (server instanceof Server) {
logger.debug("LoadBalancer [{}]: addServer [{}]", name, ((Server) server).getId());
allServers.add((Server) server);
} else {
throw new IllegalArgumentException(
"Type String or Server expected, instead found:"
+ server.getClass());
}
}
boolean listChanged = false;
if (!allServerList.equals(allServers)) {
listChanged = true;
if (changeListeners != null && changeListeners.size() > 0) {
List<Server> oldList = ImmutableList.copyOf(allServerList);
List<Server> newList = ImmutableList.copyOf(allServers);
for (ServerListChangeListener l: changeListeners) {
try {
l.serverListChanged(oldList, newList);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);
}
}
}
}
if (isEnablePrimingConnections()) {
for (Server server : allServers) {
if (!allServerList.contains(server)) {
server.setReadyToServe(false);
newServers.add((Server) server);
}
}
if (primeConnections != null) {
primeConnections.primeConnectionsAsync(newServers, this);
}
}
// This will reset readyToServe flag to true on all servers
// regardless whether
// previous priming connections are success or not
allServerList = allServers;
if (canSkipPing()) {
for (Server s : allServerList) {
s.setAlive(true);
}
upServerList = allServerList;
} else if (listChanged) {
forceQuickPing();
}
} finally {
writeLock.unlock();
}
}
}
這個過程最后用一張圖總結為:
ribbon如何更新自己保存的注冊表信息?
上面已經講了 Ribbon是如何通過serviceName拉取到注冊表的,我們知道EurekaClient默認是30s拉取一次注冊表信息的,因為Ribbon要關聯注冊表信息,那么Ribbon該如何更新自己存儲的注冊表信息呢?
繼續回到DynamicSeverListLoadBalancer.restOfInit()
方法中:
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
protected volatile ServerListUpdater serverListUpdater;
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
}
重點查看enableAndInitLearnNewServersFeature()
方法,從名字我們就可以看出來這意思為激活和初始化學習新服務的功能,這里實際上就啟動serverListUpdater
中的一個線程。
在最上面Ribbon初始化的過程中我們知道,在RibbonClientConfiguration
中默認初始化的ServerListUpdater
為 PollingServreListUpdater
,我們繼續跟這個類的start方法:
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
這里只要是執行updateAction.doUpdate();
,然后后面啟動了一個調度任務,默認30s執行一次。
繼續往后跟doUpdate()
方法:
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
}
這里又調用了之前通過serviceName獲取對應注冊服務列表的方法了。
總結到一張圖如下:
總結
本文主要是重新梳理了Ribbon的初始化過程,主要是幾個Configure初始化的過程,然后是Ribbon與Eureka的整合,這里也涉及到了注冊表的更新邏輯。
看到這里真是被Spring的各種AutoConfigure繞暈了,哈哈,但是最后分析完 還是覺得挺清晰的,對於復雜的業務畫張流程圖還挺容易理解的。
申明
本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小伙伴可關注個人公眾號:壹枝花算不算浪漫