Spring Cloud Ribbon服務列表的緩存與更新(二)


前面講到RestTemplate的應用也分析了他的實現,接着通過RestTemplate引出了負載均衡,上個篇幅分析了攔截器、負載均衡器的獲取及解析,下面我們接着上次內容講解。

首先看源碼一定要學會找入口

 

 點擊getForObject

 

 點擊doExecute后到達下圖所示位置

 

 

@Nullable
    protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
            @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

        Assert.notNull(url, "URI is required");
        Assert.notNull(method, "HttpMethod is required");
        ClientHttpResponse response = null;
        try {
            ClientHttpRequest request = createRequest(url, method);
            if (requestCallback != null) {
                requestCallback.doWithRequest(request);
            }
//發起了一次網絡通信請求,我們向上看可以發現這個發起請求的request是由上面createRequest(url,method)創建的 response
= request.execute(); handleResponse(url, method, response); return (responseExtractor != null ? responseExtractor.extractData(response) : null); } catch (IOException ex) { String resource = url.toString(); String query = url.getRawQuery(); resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource); throw new ResourceAccessException("I/O error on " + method.name() + " request for \"" + resource + "\": " + ex.getMessage(), ex); } finally { if (response != null) { response.close(); } } }

因為request.execute();的實現有很多,所以我們為了搞清楚它的實現到底是哪一個那我們就一定要進入createRequest()方法看下,發現它是調用了HttpAccessor的createRequest方法,它返回的request對像是什么對像取絕於

ClientHttpRequest request = getRequestFactory().createRequest(url, method);其中getRequestFactory()就是一個工廠模式,這個工廠調用的是什么我們可以看下

 

發現這個工廠有兩個方法,一個是本地方法一個子類方法,我們這里面調用的是它的子類的實現InterceptingHttpAccessor,點擊進入

 

 

至於為啥是調用子類不是調用本地這跟他的接口有關,如果它有子類它一定會被他的子類覆蓋掉,所以這里面看到有兩個直接找它的子類實現,下面的子類也比較簡單,它就是覆蓋父類的方法

 

 

下面不看具體業務流程,就看下關鍵的幾個點

@Override
    public ClientHttpRequestFactory getRequestFactory() {
//得到一個攔截器的集合 List
<ClientHttpRequestInterceptor> interceptors = getInterceptors();
//判空
if (!CollectionUtils.isEmpty(interceptors)) { ClientHttpRequestFactory factory = this.interceptingRequestFactory; if (factory == null) {
//有了上一篇幅的了解,應該很清楚在Bean就已經加載了一個攔截器,所以這里不可能為空 factory
= new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors); this.interceptingRequestFactory = factory; } return factory; } else { return super.getRequestFactory(); } }

都搞到了這里那就再看下getInterceptors()方法是怎么來的,點進去看下

 

 我們一路找,找到了下圖,看到了private final List<ClientHttpRequestInterceptor> interceptors = new ArrayList<>();但目前為止這個interceptors是空的,前面我們說過攔截器集合不為空,所以接下來我們要看下這個集合是在哪里被賦值的。

 

 

 要想找到List<ClientHttpRequestInterceptor> interceptors賦值的地方就要去重新看上一篇幅看過的類LoadBalancerAutoConfiguration

 

 

 所以前面說的List<ClientHttpRequestInterceptor> interceptors = getInterceptors();結果肯定不會為空,我們再回退到上一級,退到

ClientHttpRequest request = getRequestFactory().createRequest(url, method);方法

 

 我們來分析下這個ClientHttpRequest類的調用關系圖,至於為什么分析這個類關系是因為前面說過request.execute();的實現有很多,為了搞清楚它的實現到底是哪一個那我們就一定要進入createRequest()方法看下,我們一直跟進跟到了這個類方法了。在下面眾多的實現中我發現了一個攔截器的實現InterceptingClientHttpRequest

 

 那接下來我肯定是要去找到那攔截器的實現看他搞了啥,但很可惜,下面眾多的實現中沒有找到我想要的攔截器實現,這時一臉懵逼,這是不是逗我,上面關系圖跟我說你有但找你代碼的實現確找不到??這時候我們就要換個思路,找不到我們想要的實現類那去找他爹問下他兒子去哪了唄

 

 所以接下來我們選擇他爹,AbstractBufferingClientHttpRequest看下,會發現他又調用了createRequest方法,他這里面是用到了設計模式中的模板模式,如果對模板模式不太清楚的可以自己補下,他定義了一個抽象的方法,他這個抽象的方法通過我們的抽像類去調用自己的方法,因為抽像的方法必須由子類去實現所以他又會去調用子類

 

 所以我們點擊看下createRequest有沒有調用子類,點擊后一看果然和我猜想的一樣

 

點擊進入后發現他返回的是InterceptingClientHttpRequest,所以跟了這么久終於知道了ClientHttpRequest request = getRequestFactory().createRequest(url, method);得到的InterceptingClientHttpRequest

 

 

現在搞清楚了這一點我們再回退到RestTemplate方法的doExecute中去,我們跟進response = request.execute();方法,會發現它很多實現,如果他沒有真正的實現時會找到父類,所以我們進入他的父類抽像方法,

 

 走到這一步我們會發現executeInternal(this.headers);又有很多實現,老規矩找爹,如果不想這么直接想穩點那就要明白,這個executeInternal的父類方法是什么,從他的關系結構圖找

 

 

 找到他爹后進來截圖如下

 接下來我們再向下走,點擊execyteInternal又是調用模板方法,但這時他應該調用的是他真正義意上的實現類InterceptingClientHttpRequest

 

 

 所以他最終到了下面所示圖中的方法中

 

 

 點擊上圖中execute,進入下圖所示方法中,下圖中有個this.iterator,我們先不管iterator是啥,但從字面上我們可以看出ClientHttpRequestInterceptor這玩意給我們存的應該是一個客戶端請求攔截器,而且通過Bebugger可以發現這個攔截器一定會包含我上一篇中分析說到的LoadBalancerInterceptor,這樣一步步的看就跟上一篇幅簡講說的連上了。

 

 

 選擇LoadBalancerInterceptor進去

 

 

 進來后一看,和上一篇幅中的時序圖完美接上了,后面的方法其實在上一篇幅中全都說過

 

 

點擊上圖的execute進入下圖位置

 

 再點擊上圖的execute到下圖位置,熟悉的配方,這流程上一篇中都跑過,如果這里再來一次就沒意思了

 

 上次說到了已經拿到了server列表 在getServer里面

 

這里面有一個負載均衡的計算,在這段代碼最后他有一句return super.chooseServer(key);這明顯是調用了他的父類,那我們在選擇時就需要明白這父子關系圖,要不然沒辦法選呀

 

 

 所以下面在繼續走之前看下他的父子關系圖

 

 

 通過類關系圖,很清楚的知道調用的是BaseLoadBalancer父類

 

這里面rule.choose里面有很多規則的執行,上次好像我是說過其中兩個規則的執行

 

 

 進入rule.choose方法,在上一次中有說到這下圖紅框中提到的List<Server>緩存列表,接下來,這篇幅重點就是講解下這個列表是從哪里拿來,又是從哪里去更新的

 

 首先要說下這個list是從哪來的,我們點擊getReachableServers可以發現,他在ILoadBalancer中有兩個接口,竟然有接口那么就有實現,下面就要搞清楚是誰實現這玩意的

 

 要是我我就猜它是BaseLoadBalancer實現的

 

 至於是不是真的,先放着,看代碼沒有次次搞對,總要憑自己看代碼的經驗猜上一猜,我們往上翻會發現在BaseLoadBalancer中有兩個地方存了這個List<Server>,這兩個玩意就是服務器端地址列表在客戶端內存中的一個緩存,所以這個List一定存儲了服務器端提供者的一個地址列表,竟然有列表存進來那一定會有初始化及更新的過程,下面就來看下他是如何初始化,又是如何更新

 

 要想搞清楚這個接下來就需要帶着猜想去看下邏輯,上一篇幅的時序中有畫ILoadBalancer的真正實例是ZoneAwareLoadBalancer,而在我剛剛畫的ZoneAwareLoadBalancer的類關系圖中可以發現他的父類是DynamicServerListLoadBalancer(如果有忘記的可以向上翻一翻),再往上就是的父類就是BaseLoadBalancer,這樣一來初始化過程就和我猜的一樣了,有了這個概念后我們進入DynamicServerListLoadBalancer方法,在它的構造方法中可以發現一個restOfInit(clientConfig);方法,這是進入服務列表進入初始化的一個流程,觸發點就是DynamicServerListLoadBalancer被初始化和實例化以后它會在構造方法中觸發一個restOfInit(clientConfig);

restOfInit(clientConfig)最終會調用updateAllServerList去更新,然而這更新有兩個地方,一個是更新本地服務另一個是從Eureka中動態獲取服務代表,到這里初始化和更新問題就解決了

 

 

 更新下時序圖

 

 

 

 更新完時序圖后點擊restOfInit(clientConfig)往下走,里面有兩個操作的方法一個是定時更新Eureka實例列表的enableAndInitLearnNewServersFeature();另一個是獲取所有Eureka實例列表的updateListOfServers

 

 

 進入updateListOfServers,這里面的更新是去更新我們啟動的服務的列表,這里面有兩個實現,之前我說過這里面的更新一個是更新本地服務另一個是更新Eureka實例列表,那么按理說這里面也應該有Eureka的實現,這是因為我當前還沒有添加Eureka的依賴包,所以就沒有這個實現類,所以我們這里看本地配置更新ConfigurationBaseServerList

 

 

 進入ConfigurationBaseServerList后會由listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);得到一個listOfServers , 這里面的IClientConfig是從上一篇中說的RibbonClientConfiguration中完成初始化的

點擊上圖中ListOfServers可以發現是從客戶端的本地配置拿到服務的,不信可以Debug一下

 

 

 

 

 

 

 

 

 更新下時序圖

 

 

 

 

 

 這一條鏈路說完后,再返回到enableAndInitLearnNewServersFeature();這個定時更新Eureka實例列表這個方法中來看,通過看下面截圖可以很清楚的看到,他之所以能動態拿取是因為它啟動了一個定時任務,竟然是定時任務那就要看下它任務的updateAction是啥

 

 

 點擊updateAction后發現他是一個內部類,這里在面的UpdateListOfServers()又是前面說到過的獲取所有實例的方法

 

 

 

 

 

 

 回到前面的serverListUpdater.start(updateAction);看下start里面定義的是啥

 

 

 

  @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
//最終執行的任務 wrapperRunnable,
//下面的是延時時間 initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); }
else { logger.info("Already active, no-op"); } }

 

更新下時序圖

 

 

 

 

 

 前面通過updateListOfServers() 中的servers = serverListImpl.getUpdatedListOfServers();就已經獲取到了服務的列表,在列表獲取后要做的事就是把列表給放到緩存中去,接下來看updateListOfServers()的updateAllServerList(servers);是怎么做到的,進入后發現他有一個compareAndSet操作,如果看過我前面寫的多線程的文章的話可以很清楚的知道他這是做了一個鎖的操作,之所以這么操作是為了防止已經有其它線程在執行這個代碼

 

 

 然后進入setServersList(ls)方法,如果不知道怎么選那就又要看類關系圖了

 

 

 因為現在在DynamicServerListLoadBalancer中,所以很明顯是找父類BaseLoadBalancer

 

 

 通過這樣分析很明了的發現是在BaseLoadBalancer類中完成賦值的,和我之前的猜想完美接合上了

 

 

 更新下時序圖

 

 

 現在看來從服務的解析到服務列表的找尋及緩存都解決了,看擬問題都解決了,但是還有一個問題,那就是服務的存活問題,因為在生產環境中有服務掛機的情況,所以這里面的設計應該還有一個定時去ping下服務是否運轉正常,如果ping的結果發現服務有異常那一定會去改變我們的ILoadBalancer的服務列表,把它下線,這個代碼是寫在BaseLoadBalancer類中,它在構造方法中通過setupPingTask()方法啟動一個任務

 

 

 點擊setupPingTask(),下面代碼也很簡短

 

void setupPingTask() {
//是否可以跳過
if (canSkipPing()) { return; }
//如果不等空偏鎖
if (lbTimer != null) { lbTimer.cancel(); }
//構造任務 lbTimer
= new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true);
//schedule是任務,pingIntervalSeconds是任務間隔時間,這里面是10S lbTimer.schedule(
new PingTask(), 0, pingIntervalSeconds * 1000); forceQuickPing(); }

先進入new PingTask()看做了什么

  class PingTask extends TimerTask {
        public void run() {
            try {
//策略模式,通過傳入的值決定是用哪個ping去完成
new Pinger(pingStrategy).runPinger(); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error pinging", name, e); } } }
public void runPinger() throws Exception {
            if (!pingInProgress.compareAndSet(false, true)) { 
                return; // Ping in progress - nothing to do
            }
            
            // we are "in" - we get to Ping

            Server[] allServers = null;
            boolean[] results = null;

            Lock allLock = null;
            Lock upLock = null;

            try {
                /*
                 * The readLock should be free unless an addServer operation is
                 * going on...
                 */
                allLock = allServerLock.readLock();
                allLock.lock();
                allServers = allServerList.toArray(new Server[allServerList.size()]);
                allLock.unlock();

                int numCandidates = allServers.length;
//最終ping的是allServers中的一個服務的節點,然后通過調用一個策略去ping,下面就是要看pingerStrategy調的策略是誰 results
= pingerStrategy.pingServers(ping, allServers); final List<Server> newUpList = new ArrayList<Server>(); final List<Server> changedServers = new ArrayList<Server>(); for (int i = 0; i < numCandidates; i++) { boolean isAlive = results[i]; Server svr = allServers[i]; boolean oldIsAlive = svr.isAlive(); svr.setAlive(isAlive); if (oldIsAlive != isAlive) { changedServers.add(svr); logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}", name, svr.getId(), (isAlive ? "ALIVE" : "DEAD")); } if (isAlive) { newUpList.add(svr); } } upLock = upServerLock.writeLock(); upLock.lock(); upServerList = newUpList; upLock.unlock(); notifyServerStatusChangeListener(changedServers); } finally { pingInProgress.set(false); } }

分析完上面的邏輯后發現這個策略是IPingStrategy,但是在這個類中找不到他是在哪初始化的,此時就要猜了,根據時序圖找鏈路,我猜是在RibbonClientConfiguration中完成初始化,在配置類中找,找到了如下的ping他先判斷是否設置過,如果設置過就用設置的,如果沒有就用默認的,這個就是初始化過程

 

 

 初始化完成了,那接下來要關心的就是這個IPing是在哪里被賦值給了ILoadBalancer中去,向下找會發現如下圖,由這里我就知道了這個ping的初始化、賦值及它如果沒有設置它默認返回的是DummyPing()了(這個默認的ping是不做任務處理的,如果不信可以看他的類方法)

 

 有了上面的概念后那繼續跟着results = pingerStrategy.pingServers(ping, allServers);邏輯走,點擊pingServers看它ping的邏輯

 

 進入PingUrl看下它的心跳寫法,里面就是每隔一段時間就發起一次請求,這里面發起的是HTTP請求,然后根據返回的狀態碼進行判斷

 根據官網資料https://docs.spring.io/spring-cloud-netflix/docs/2.2.5.RELEASE/reference/html/#spring-cloud-ribbon,它這個ping是可以擴展的,下面就這個擴展來寫下

 

 

 

 

 

 

 

 Debug訪問一下,可以看到自定義的已經加載進去了

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM