【一起學源碼-微服務】Ribbon 源碼三:Ribbon與Eureka整合原理分析


前言

前情回顧

上一篇講了Ribbon的初始化過程,從LoadBalancerAutoConfigurationRibbonAutoConfiguration 再到RibbonClientConfiguration,我們找到了ILoadBalancer默認初始化的對象等。

本講目錄

這一講我們會進一步往下探究Ribbon和Eureka是如何結合的。

通過上一講ILoadBalancer 我們已經可以拿到一個服務所有的服務節點信息了,這里面是怎么把服務的名稱轉化為對應的具體host請求信息的呢?

通過這一講 我們來一探究竟

目錄如下:

  1. EurekaClientAutoConfiguration.getLoadBalancer()回顧
  2. 再次梳理Ribbon初始化過程
  3. ServerList實現類初始化過程
  4. getUpdatedListOfServers()獲取注冊表列表分析
  5. ribbon如何更新自己保存的注冊表信息?

說明

原創不易,如若轉載 請標明來源!

博客地址:一枝花算不算浪漫
微信公眾號:壹枝花算不算浪漫

源碼閱讀

EurekaClientAutoConfiguration.getLoadBalancer()回顧

上一講我們已經深入的講解過getLoadBalancer() 方法的實現,每個serviceName都對應一個自己的SpringContext上下文信息,然后通過ILoadBalancer.class從上下文信息中獲取默認的LoadBalancer:ZoneAwareLoadBalancer, 我們看下這個類的構造函數:

public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                             IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
                             ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}

繼續跟父類DynamicServerListLoadBalancer的初始化方法:

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
	volatile ServerList<T> serverListImpl;

	volatile ServerListFilter<T> filter;

	public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
	                                     ServerList<T> serverList, ServerListFilter<T> filter,
	                                     ServerListUpdater serverListUpdater) {
	    super(clientConfig, rule, ping);
	    this.serverListImpl = serverList;
	    this.filter = filter;
	    this.serverListUpdater = serverListUpdater;
	    if (filter instanceof AbstractServerListFilter) {
	        ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
	    }
	    restOfInit(clientConfig);
	}

	void restOfInit(IClientConfig clientConfig) {
	    boolean primeConnection = this.isEnablePrimingConnections();
	    // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
	    this.setEnablePrimingConnections(false);
	    enableAndInitLearnNewServersFeature();

	    updateListOfServers();
	    if (primeConnection && this.getPrimeConnections() != null) {
	        this.getPrimeConnections()
	                .primeConnections(getReachableServers());
	    }
	    this.setEnablePrimingConnections(primeConnection);
	    LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
	}

	@VisibleForTesting
	public void updateListOfServers() {
	    List<T> servers = new ArrayList<T>();
	    if (serverListImpl != null) {
	        servers = serverListImpl.getUpdatedListOfServers();
	        LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
	                getIdentifier(), servers);

	        if (filter != null) {
	            servers = filter.getFilteredListOfServers(servers);
	            LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
	                    getIdentifier(), servers);
	        }
	    }
	    updateAllServerList(servers);
	}
}

構造方法中有個restOfInit()方法,進去后又會有updateListOfServers() 方法,看方法名就知道這個肯定是和server注冊表相關的,繼續往后看,servers = serverListImpl.getUpdatedListOfServers();,這里直接調用getUpdatedListOfServers()就獲取到了所有的注冊表信息。

0.jpeg

可以看到ServerList有四個實現類,這個到底是該調用哪個實現類的getUpdatedListOfServers()方法呢?接着往下看。

再次梳理Ribbon初始化過程

第二講我們已經見過Ribbon的初始化過程,並畫了圖整理,這里針對於之前的圖再更新一下:

這里主要是增加了RibbonEurekaAutoConfigurationEurekaRibbonClientConfiguration兩個配置類的初始化。

ServerList實現類初始化過程

上面已經梳理過 Ribbon初始化的過程,其中在EurekaRibbonClientConfiguration 會初始化RibbonServerList,代碼如下:

@Configuration
	public class EurekaRibbonClientConfiguration {
	@Bean
	@ConditionalOnMissingBean
	public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
		if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
			return this.propertiesFactory.get(ServerList.class, config, serviceId);
		}
		DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
				config, eurekaClientProvider);
		DomainExtractingServerList serverList = new DomainExtractingServerList(
				discoveryServerList, config, this.approximateZoneFromHostname);
		return serverList;
	}
}

這里實際的ServerList實際就是DiscoveryEnabledNIWSServerList,我們看下這個類:

public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{

}

public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {

}

所以可以看出來ServerList 實際就是在這里進行初始化的,上面那個serverListImpl.getUpdatedListOfServers();即為調用DiscoveryEnabledNIWSServerList.getUpdatedListOfServers() 方法了,繼續往下看。

getUpdatedListOfServers()獲取注冊表分析

直接看DiscoveryEnabledNIWSServerList.getUpdatedListOfServers()源代碼:

@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
    return obtainServersViaDiscovery();
}

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
    List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

    if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
        logger.warn("EurekaClient has not been initialized yet, returning an empty list");
        return new ArrayList<DiscoveryEnabledServer>();
    }

    EurekaClient eurekaClient = eurekaClientProvider.get();
    if (vipAddresses!=null){
        for (String vipAddress : vipAddresses.split(",")) {
            // if targetRegion is null, it will be interpreted as the same region of client
            List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
            for (InstanceInfo ii : listOfInstanceInfo) {
                if (ii.getStatus().equals(InstanceStatus.UP)) {

                    if(shouldUseOverridePort){
                        if(logger.isDebugEnabled()){
                            logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                        }

                        // copy is necessary since the InstanceInfo builder just uses the original reference,
                        // and we don't want to corrupt the global eureka copy of the object which may be
                        // used by other clients in our system
                        InstanceInfo copy = new InstanceInfo(ii);

                        if(isSecure){
                            ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                        }else{
                            ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                        }
                    }

                    DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                    des.setZone(DiscoveryClient.getZone(ii));
                    serverList.add(des);
                }
            }
            if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
            }
        }
    }
    return serverList;
}

看到這里代碼就已經很明顯了,我們來解讀下這段代碼:

  1. 通過eurekaClientProvider獲取對應EurekaClient
  2. 通過vipAdress(實際就是serviceName)獲取對應注冊表集合信息
  3. 將注冊信息組裝成DiscoveryEnabledServer列表

再回到DynamicServerListLoadBalancer.updateListOfServers() 中,這里獲取到對應的DiscoveryEnabledServer list后調用updateAllServerList()方法,一路跟蹤這里最終會調用BaseLoadBalancer.setServersList()

public class BaseLoadBalancer extends AbstractLoadBalancer implements
        PrimeConnections.PrimeConnectionListener, IClientConfigAware {

	@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections
            .synchronizedList(new ArrayList<Server>());
            
	public void setServersList(List lsrv) {
	    Lock writeLock = allServerLock.writeLock();
	    logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);
	    
	    ArrayList<Server> newServers = new ArrayList<Server>();
	    writeLock.lock();
	    try {
	        ArrayList<Server> allServers = new ArrayList<Server>();
	        for (Object server : lsrv) {
	            if (server == null) {
	                continue;
	            }

	            if (server instanceof String) {
	                server = new Server((String) server);
	            }

	            if (server instanceof Server) {
	                logger.debug("LoadBalancer [{}]:  addServer [{}]", name, ((Server) server).getId());
	                allServers.add((Server) server);
	            } else {
	                throw new IllegalArgumentException(
	                        "Type String or Server expected, instead found:"
	                                + server.getClass());
	            }

	        }
	        boolean listChanged = false;
	        if (!allServerList.equals(allServers)) {
	            listChanged = true;
	            if (changeListeners != null && changeListeners.size() > 0) {
	               List<Server> oldList = ImmutableList.copyOf(allServerList);
	               List<Server> newList = ImmutableList.copyOf(allServers);                   
	               for (ServerListChangeListener l: changeListeners) {
	                   try {
	                       l.serverListChanged(oldList, newList);
	                   } catch (Exception e) {
	                       logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);
	                   }
	               }
	            }
	        }
	        if (isEnablePrimingConnections()) {
	            for (Server server : allServers) {
	                if (!allServerList.contains(server)) {
	                    server.setReadyToServe(false);
	                    newServers.add((Server) server);
	                }
	            }
	            if (primeConnections != null) {
	                primeConnections.primeConnectionsAsync(newServers, this);
	            }
	        }
	        // This will reset readyToServe flag to true on all servers
	        // regardless whether
	        // previous priming connections are success or not
	        allServerList = allServers;
	        if (canSkipPing()) {
	            for (Server s : allServerList) {
	                s.setAlive(true);
	            }
	            upServerList = allServerList;
	        } else if (listChanged) {
	            forceQuickPing();
	        }
	    } finally {
	        writeLock.unlock();
	    }
	}
}

這個過程最后用一張圖總結為:

ribbon如何更新自己保存的注冊表信息?

上面已經講了 Ribbon是如何通過serviceName拉取到注冊表的,我們知道EurekaClient默認是30s拉取一次注冊表信息的,因為Ribbon要關聯注冊表信息,那么Ribbon該如何更新自己存儲的注冊表信息呢?

繼續回到DynamicSeverListLoadBalancer.restOfInit()方法中:

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {

	protected volatile ServerListUpdater serverListUpdater;

	void restOfInit(IClientConfig clientConfig) {
	    boolean primeConnection = this.isEnablePrimingConnections();
	    // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
	    this.setEnablePrimingConnections(false);
	    enableAndInitLearnNewServersFeature();

	    updateListOfServers();
	    if (primeConnection && this.getPrimeConnections() != null) {
	        this.getPrimeConnections()
	                .primeConnections(getReachableServers());
	    }
	    this.setEnablePrimingConnections(primeConnection);
	    LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
	}

	public void enableAndInitLearnNewServersFeature() {
	    LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
	    serverListUpdater.start(updateAction);
	}
}

重點查看enableAndInitLearnNewServersFeature()方法,從名字我們就可以看出來這意思為激活和初始化學習新服務的功能,這里實際上就啟動serverListUpdater中的一個線程。

在最上面Ribbon初始化的過程中我們知道,在RibbonClientConfiguration中默認初始化的ServerListUpdaterPollingServreListUpdater,我們繼續跟這個類的start方法:

@Override
public synchronized void start(final UpdateAction updateAction) {
    if (isActive.compareAndSet(false, true)) {
        final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {
                if (!isActive.get()) {
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {
                    updateAction.doUpdate();
                    lastUpdated = System.currentTimeMillis();
                } catch (Exception e) {
                    logger.warn("Failed one update cycle", e);
                }
            }
        };

        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                wrapperRunnable,
                initialDelayMs,
                refreshIntervalMs,
                TimeUnit.MILLISECONDS
        );
    } else {
        logger.info("Already active, no-op");
    }
}

這里只要是執行updateAction.doUpdate();,然后后面啟動了一個調度任務,默認30s執行一次。

繼續往后跟doUpdate()方法:

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
	protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
	    @Override
	    public void doUpdate() {
	        updateListOfServers();
	    }
	};
}

這里又調用了之前通過serviceName獲取對應注冊服務列表的方法了。

總結到一張圖如下:

注冊表服務

總結

本文主要是重新梳理了Ribbon的初始化過程,主要是幾個Configure初始化的過程,然后是Ribbon與Eureka的整合,這里也涉及到了注冊表的更新邏輯。

看到這里真是被Spring的各種AutoConfigure繞暈了,哈哈,但是最后分析完 還是覺得挺清晰的,對於復雜的業務畫張流程圖還挺容易理解的。

申明

本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!

感興趣的小伙伴可關注個人公眾號:壹枝花算不算浪漫

22.jpg


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM