Dubbo源碼學習筆記 之 Consumer 啟動&配置變更監聽


  在前面的文章中,分享記錄 ExtensionLoader擴展機制、服務的發布過程、Netty 啟動監聽服務 等內容,相比今天要寫的客戶端, 服務端的發布、啟動還是比較清晰,好理解的。 

      客戶端的ref生成,個人也是梳理好久,相對服務端來說,主要是有幾個地方比較麻煩:

     1. 是在客戶端的啟動過程中,涉及到幾個的zk path、data變更訂閱,多個listener,經常會搞混。

         2. 監聽器很多都是 java8 里面的lamada表達式,寫起來快,理解調試起來就不輕松

         3. 客戶端,會涉及到 配置 參數重載與覆蓋,以 provider -> consumer ->配置中心動態configs 的順序進行取值覆蓋,取這個里面最后的設置為准。 

 

   一、Client 調用簡圖

                      

            在ReferenceConfig 下面的Protocol 流程里面,先是有個RegisterProtocol 注冊協議,這個也是非常重要,先在注冊協議里面做了很多工作,然后在在里面執行Protocol,執行具體的交互協議。

 

二、 RegistryProtocol 的doRefer

     Client啟動的主流程在RegistryProtocol 類的doRefer方法里面。

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);  //1. Directory是客戶端啟動的一個關鍵類,負責Invoker生成,配置信息變更監聽等操作,
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
            directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
            registry.register(directory.getRegisteredConsumerUrl()); // 2. zk上面生成路徑,注冊Consumer 
        }
        directory.buildRouterChain(subscribeUrl); // 3. 實際構建 RouterChain ,默認為4個 
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));  //4. directory執行訂閱, 啟動很多重要邏輯都在此方法里面

        Invoker invoker = cluster.join(directory); // 5. 對directory 進行包裝,加上mock、failover功能 
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);  //6. 對invoker、directory信息簡單聚合,存入本地map
        return invoker;
    }

 在客戶端里面,RegistoryDirectory 是一個比較重要的類, 封裝了節點、配置變更處理,實際調用Invoker的啟動等邏輯。

 1. 注釋1. new 一個RegistoryDirectory實例出來,內部邏輯比較少,主要是url轉換為基礎數據

 2. 注釋2. 將conumser的路徑,在zk上面注冊。

 3.  注釋3.  調用ExtensionLoader獲取RouterFactory的ActiveExtension(),然后調用工廠方法,獲取實際的Router,排序生成router鏈。依次是:MockInvokersSelector,TagRouter,AppRouter,ServiceRouter,功能也很好理解,Router的作用,是獲取可以被調用的provider列表。 Tag、App、Service 3個Router都會監控ZK的data數據變化。但這里沒有啟動監控, 這個后面再講。

4. 注釋4. 這一行代碼,后面的動作可就多了,大概包括: a) 注冊zk上各種data、path監聽 b)實際交互協議初始化, c) 網絡 client 啟動,連接server

5. 注釋5. 對directory進行包裝,加上mock、failover功能。 這行的cluster是個adaptive ,默認會進入 MockClusterWrapper 里面的join方法。 

 

三、 RegisterDirectory類

     RegisterDirectory 這個類,主要就是zk的監聽器,然后執行相應的動作。 當然底層的dobboprotocol 也是在這里啟動,netty client 也是由監聽里面的動作方法啟動的。

    RegisterDirectory類,有幾個主要方法:1、subscribe(url)  2、notify(List urls)  3、refreshOverrideAndInvoker(List urls)  4.refreshInvoker(List urls) ,調用關系是:1->2 ->3 ->4 這樣逐級調用的,下面逐個講講。

 

 1. subscribe 方法

接上面的方法:  directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,  PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); 

public void subscribe(URL url) {
        setConsumerUrl(url); // 注釋1.設置consumer 的url
        CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); // 注釋2. 將本directory 放入cunsumer config的監聽器,
        serviceConfigurationListener = new ReferenceConfigurationListener(this, url); //注釋3. 監聽demoservice的config
        registry.subscribe(url, this); //注釋4. 監聽config、provider、router 3個path ,並調用notify方法
    }

 注釋1. 將consumer的url保存下來。

 注釋2,與注釋3 ,2個都是將本directory 作為listener,去監聽zk上面data的變化。 

   注釋2,監聽的是:/dubbo/config/demo-consumer.configurators 的data變化, demo-consumer是 dubbo client 應用的注冊名稱

  注釋3 ,監聽的是  /dubbo/config/org.apache.dubbo.demo.DemoService.configurators  的data變化。這里demoService是 接口的名稱

  對於多個服務來說,consumer的應用名 是不變的,接口是變化 的,所以注釋2 是將本directory 加入listener數組,多個directory監聽同一個path data數據變化, 注釋3 是又新建了一個ReferenceConfigureListener,各個Service接口的數據變化,各自direcotry 去監聽。  數據變化后,都是調用 RegistryDirectory的 refreshInvoker 方法。 當配置變更后,看看是否重新刷新Invoker

 注釋4, 這里的registry 是ZookeeperRegistry ,會去監聽並獲取路徑下面的節點。監聽的路徑是: /dubbo/org.apache.dubbo.demo.DemoService/providers 、/dubbo/org.apache.dubbo.demo.DemoService/configurators、/dubbo/org.apache.dubbo.demo.DemoService/routers  ,即監聽provider、configurators、routers 3個節點下面的子節點變動。

監聽執行的方法是ZookeeperRegistry.this.notify。  在注釋4里面,最終會執行RegisterDirectory 里面的notify(List urls) 方法。

 

  2. notify(List urls)

    notify方法, 最開始的Invoker啟動 與監聽到配置變更,都會復用這個方法。會進行協議轉換,配置的覆蓋,Invoker的啟動。

   notify 方法的入參urls ,不管是首次手動調用(第一次從注冊中心獲取后調用),還是后續因為zk的path節點變更,listener調用,urls 源頭都是 zk的path 數據。

 public synchronized void notify(List<URL> urls) {
        Map<String, List<URL>> categoryUrls = urls.stream()
                .filter(Objects::nonNull)
                .filter(this::isValidCategory)
                .filter(this::isNotCompatibleFor26x)
                .collect(Collectors.groupingBy(url -> {
                    if (UrlUtils.isConfigurator(url)) {
                        return CONFIGURATORS_CATEGORY;
                    } else if (UrlUtils.isRoute(url)) {
                        return ROUTERS_CATEGORY;
                    } else if (UrlUtils.isProvider(url)) {
                        return PROVIDERS_CATEGORY;
                    }
                    return "";
                }));  // 1. 一系列 stream 操作,對url 過濾校驗,分組
        
        // 下面幾行代碼對 configs、routers、providers 的數據分別處理
        List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
        this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
        List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
        toRouters(routerURLs).ifPresent(this::addRouters); //2. routers 節點變更,會重新加入新的 router 路由
        
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());//3. providers變更會繼續下傳
        refreshOverrideAndInvoker(providerURLs); //4. 重新刷新數據配置,並重啟動Invoker(如有需要)
    }

RegisterDirectory類的 notify 方法,對變更后的path 節點進行處理,  因為節點path 已經在ZookeeperRegister 的notify方法里面進行了一定的轉換處理,所以不是原始的path節點。

入參urls , config、router 的協議為empty(無數據), provider的協議是具體的發布協議,例如范例為dubbo。   empty的后續會被識別,特殊處理。

以上代碼: 

   注釋1.  對url列表進行校驗、過濾,然后分成 config、router、provider 3個分組map

   注釋2. 如果router 路由節點有變化,則從新將router 下的數據生成router,加入之前生成的4個 active 路由鏈里面去。

   注釋3. 如果provider 節點有變化,則獲取url 

   注釋4. 調用下層方法,對配置重新刷新,如有需要重新啟動Invoker 。

 

 3. refreshOverrideAndInvoker

    這個方法里面就2個方法調用。先是調用overrideDirectoryUrl(),  就是依次用 /dubbo/org.apache.dubbo.demo.DemoService/configurators 下節點path配置、 /dubbo/config/consumer.configurators 的data配置,/dubbo/config/org.apache.dubbo.demo.DemoService.configurators 下data 配置 來調整 原始consumer的url。   

private void refreshOverrideAndInvoker(List<URL> urls) {
        // mock zookeeper://xxx?mock=return null
        overrideDirectoryUrl();  // 1. 逐個調用注冊中心里面的配置,覆蓋原來的url,組成最新的url 放入overrideDirectoryUrl 存儲
refreshInvoker(urls); //2.根據 provider urls,重新刷新Invoker
}

 

4.  refreshInvoker(urls); 

    refreshInvoker 代碼以及子代碼比較長,就不貼了, 主要做3個操作。

   1.   Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);   

          這行代碼,就是根據providerUrls ,在toInvoker方法內部,幾乎再次調用了一遍配置覆蓋邏輯。最后如果配置有修改,就重新生成Invoker,沒有就用舊的Invoker。同時返回新的url <->Invoker 的映射map

   2.  this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;   

      這行 代碼,就是如果服務有分組,則將分組下的 provider,包裝成StaticDirectory,組成1個Invoker ,返回。 

       例如,服務Demo ,分為 Group A,GroupB, GroupA下有1、2、3 ,GroupB下有2、4、5, 那么groupA,GroupB分別包裝為一個Invoker,就是2個Invoker。

     實際調用的時候,先用GroupA,GroupB,去路由、負載均衡一次,選到Group A, 然后再在Group A下,路由,負載一次,選擇1、2、3下面的一個節點提供服務。

  3.  destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);  

        這行代碼,就是看看舊的url 是否在新map里面存在,不存在,就是銷毀url對應的Invoker 。  這個好理解,比如 有個provider下線了,就要消減。 另外,同1provider配置變更,新的url與舊的不一樣,舊的也會銷毀,因為新的        url會生成一個Invoker。

 

三、 Consumer 端的 配置監聽路徑 與 執行方法

      分別在 AbstractZookeeperClient類的 addChildListener() , addDataListener() 方法,ZookeeperDynamicConfiguration 類的 addListener() 方法 打印 監控的key,group,listener 。

      監控path變更:

        /dubbo/org.apache.dubbo.demo.DemoService/providers

   /dubbo/org.apache.dubbo.demo.DemoService/configurators    

       /dubbo/org.apache.dubbo.demo.DemoService/routers

     以上3個path 下面節點如果有變更,都會調用注冊時的lamda表達式 org.apache.dubbo.registry.zookeeper.ZookeeperRegistry$$Lambda$ ,執行方法是ZookeeperRegistry.this.notify,最終會調 RegisterDirectory.notify(urls),將path轉換成對應的url,更新路由,url配置覆蓋更新,重新生成新配置的Invoker,配置分組,刪除舊Invoker等 上面講的邏輯。  節點path變更,執行了2次配置信息覆蓋。

    

     監控data變更: data path = /dubbo/config 

    data變更,是一個TreeCache Listener,然后注冊了子監聽,各自的listener 監聽自己感興趣的path。

   有以下2類:

   1.config data監聽:

      demo-consumer.configurators                                         ->listener = RegistryDirectory$ConsumerConfigurationListener

     org.apache.dubbo.demo.DemoService.configurators             ->listener = RegistryDirectory$ReferenceConfigurationListener 

  2. router data 監聽:

     demo-consumer.condition-router                                      -->listener = AppRouter

    org.apache.dubbo.demo.DemoService.condition-router          -->listener = ServiceRouter

    demo-provider.tag-router                                                  -->listener = TagRouter

   

    configData 變更,最終都是調用  RegisterDirectory.refreshInvoker ,但傳入的url是Collections.emptyList . 根據代碼推斷,就是將zk的data保存到本地,url采用cacheUrls, 調用toInvoker 方法,內部會用最新的zk data,覆蓋cacheUrl里面的參數配置,然后重新生成Invoker。 這個是之前跳過代碼,沒搞懂的邏輯,

   router data 變更,會將zk 的最新data取回,放入 本地保存, 后續 服務調用時就會生效。

  

 providers、configurators、routers 路徑下節點的變更, 比demo-consumer.configurators 的data config變更 listener會多一些處理邏輯。 但都會執行配置覆蓋,invoker重新發布。 

   

四、總結

    Consumer端啟動,主要是記錄了個人之前理解比較困難的地方,好多地方都略過去了。

    Dubbo Client Invoker 的啟動沒有記錄,這個更Server端啟動幾乎是一樣的 ,有過Server的經驗,理解起來比較容易,Channel、ChannelHandler的包裝,幾乎一模一樣。 netty Handler 的處理都是Dubbo 內部的requestHandler ,不同的是,client 最終會調到received方法,server會調reply方法。

   在consumer 比provider啟動多的就是 幾個配置變更訂閱,路由、分組包裝。 配置變更,listener 傳遞的層次比較深,不記錄下,開始比較容易搞混。

      

  寫的比較粗糙,如有問題,歡迎各位提出來,一起討論!

   

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM