在前面的文章中,分享記錄 ExtensionLoader擴展機制、服務的發布過程、Netty 啟動監聽服務 等內容,相比今天要寫的客戶端, 服務端的發布、啟動還是比較清晰,好理解的。
客戶端的ref生成,個人也是梳理好久,相對服務端來說,主要是有幾個地方比較麻煩:
1. 是在客戶端的啟動過程中,涉及到幾個的zk path、data變更訂閱,多個listener,經常會搞混。
2. 監聽器很多都是 java8 里面的lamada表達式,寫起來快,理解調試起來就不輕松
3. 客戶端,會涉及到 配置 參數重載與覆蓋,以 provider -> consumer ->配置中心動態configs 的順序進行取值覆蓋,取這個里面最后的設置為准。
一、Client 調用簡圖
在ReferenceConfig 下面的Protocol 流程里面,先是有個RegisterProtocol 注冊協議,這個也是非常重要,先在注冊協議里面做了很多工作,然后在在里面執行Protocol,執行具體的交互協議。
二、 RegistryProtocol 的doRefer
Client啟動的主流程在RegistryProtocol 類的doRefer方法里面。
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); //1. Directory是客戶端啟動的一個關鍵類,負責Invoker生成,配置信息變更監聽等操作, directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); registry.register(directory.getRegisteredConsumerUrl()); // 2. zk上面生成路徑,注冊Consumer } directory.buildRouterChain(subscribeUrl); // 3. 實際構建 RouterChain ,默認為4個 directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); //4. directory執行訂閱, 啟動很多重要邏輯都在此方法里面 Invoker invoker = cluster.join(directory); // 5. 對directory 進行包裝,加上mock、failover功能 ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); //6. 對invoker、directory信息簡單聚合,存入本地map return invoker; }
在客戶端里面,RegistoryDirectory 是一個比較重要的類, 封裝了節點、配置變更處理,實際調用Invoker的啟動等邏輯。
1. 注釋1. new 一個RegistoryDirectory實例出來,內部邏輯比較少,主要是url轉換為基礎數據
2. 注釋2. 將conumser的路徑,在zk上面注冊。
3. 注釋3. 調用ExtensionLoader獲取RouterFactory的ActiveExtension(),然后調用工廠方法,獲取實際的Router,排序生成router鏈。依次是:MockInvokersSelector,TagRouter,AppRouter,ServiceRouter,功能也很好理解,Router的作用,是獲取可以被調用的provider列表。 Tag、App、Service 3個Router都會監控ZK的data數據變化。但這里沒有啟動監控, 這個后面再講。
4. 注釋4. 這一行代碼,后面的動作可就多了,大概包括: a) 注冊zk上各種data、path監聽 b)實際交互協議初始化, c) 網絡 client 啟動,連接server
5. 注釋5. 對directory進行包裝,加上mock、failover功能。 這行的cluster是個adaptive ,默認會進入 MockClusterWrapper 里面的join方法。
三、 RegisterDirectory類
RegisterDirectory 這個類,主要就是zk的監聽器,然后執行相應的動作。 當然底層的dobboprotocol 也是在這里啟動,netty client 也是由監聽里面的動作方法啟動的。
RegisterDirectory類,有幾個主要方法:1、subscribe(url) 2、notify(List urls) 3、refreshOverrideAndInvoker(List urls) 4.refreshInvoker(List urls) ,調用關系是:1->2 ->3 ->4 這樣逐級調用的,下面逐個講講。
1. subscribe 方法
接上面的方法: directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
public void subscribe(URL url) { setConsumerUrl(url); // 注釋1.設置consumer 的url CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); // 注釋2. 將本directory 放入cunsumer config的監聽器, serviceConfigurationListener = new ReferenceConfigurationListener(this, url); //注釋3. 監聽demoservice的config registry.subscribe(url, this); //注釋4. 監聽config、provider、router 3個path ,並調用notify方法 }
注釋1. 將consumer的url保存下來。
注釋2,與注釋3 ,2個都是將本directory 作為listener,去監聽zk上面data的變化。
注釋2,監聽的是:/dubbo/config/demo-consumer.configurators 的data變化, demo-consumer是 dubbo client 應用的注冊名稱
注釋3 ,監聽的是 /dubbo/config/org.apache.dubbo.demo.DemoService.configurators 的data變化。這里demoService是 接口的名稱
對於多個服務來說,consumer的應用名 是不變的,接口是變化 的,所以注釋2 是將本directory 加入listener數組,多個directory監聽同一個path data數據變化, 注釋3 是又新建了一個ReferenceConfigureListener,各個Service接口的數據變化,各自direcotry 去監聽。 數據變化后,都是調用 RegistryDirectory的 refreshInvoker 方法。 當配置變更后,看看是否重新刷新Invoker
注釋4, 這里的registry 是ZookeeperRegistry ,會去監聽並獲取路徑下面的節點。監聽的路徑是: /dubbo/org.apache.dubbo.demo.DemoService/providers 、/dubbo/org.apache.dubbo.demo.DemoService/configurators、/dubbo/org.apache.dubbo.demo.DemoService/routers ,即監聽provider、configurators、routers 3個節點下面的子節點變動。
監聽執行的方法是ZookeeperRegistry.this.notify。 在注釋4里面,最終會執行RegisterDirectory 里面的notify(List urls) 方法。
2. notify(List urls)
notify方法, 最開始的Invoker啟動 與監聽到配置變更,都會復用這個方法。會進行協議轉換,配置的覆蓋,Invoker的啟動。
notify 方法的入參urls ,不管是首次手動調用(第一次從注冊中心獲取后調用),還是后續因為zk的path節點變更,listener調用,urls 源頭都是 zk的path 數據。
public synchronized void notify(List<URL> urls) { Map<String, List<URL>> categoryUrls = urls.stream() .filter(Objects::nonNull) .filter(this::isValidCategory) .filter(this::isNotCompatibleFor26x) .collect(Collectors.groupingBy(url -> { if (UrlUtils.isConfigurator(url)) { return CONFIGURATORS_CATEGORY; } else if (UrlUtils.isRoute(url)) { return ROUTERS_CATEGORY; } else if (UrlUtils.isProvider(url)) { return PROVIDERS_CATEGORY; } return ""; })); // 1. 一系列 stream 操作,對url 過濾校驗,分組 // 下面幾行代碼對 configs、routers、providers 的數據分別處理 List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); toRouters(routerURLs).ifPresent(this::addRouters); //2. routers 節點變更,會重新加入新的 router 路由 List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());//3. providers變更會繼續下傳 refreshOverrideAndInvoker(providerURLs); //4. 重新刷新數據配置,並重啟動Invoker(如有需要) }
RegisterDirectory類的 notify 方法,對變更后的path 節點進行處理, 因為節點path 已經在ZookeeperRegister 的notify方法里面進行了一定的轉換處理,所以不是原始的path節點。
入參urls , config、router 的協議為empty(無數據), provider的協議是具體的發布協議,例如范例為dubbo。 empty的后續會被識別,特殊處理。
以上代碼:
注釋1. 對url列表進行校驗、過濾,然后分成 config、router、provider 3個分組map
注釋2. 如果router 路由節點有變化,則從新將router 下的數據生成router,加入之前生成的4個 active 路由鏈里面去。
注釋3. 如果provider 節點有變化,則獲取url
注釋4. 調用下層方法,對配置重新刷新,如有需要重新啟動Invoker 。
3. refreshOverrideAndInvoker
這個方法里面就2個方法調用。先是調用overrideDirectoryUrl(), 就是依次用 /dubbo/org.apache.dubbo.demo.DemoService/configurators 下節點path配置、 /dubbo/config/consumer.configurators 的data配置,/dubbo/config/org.apache.dubbo.demo.DemoService.configurators 下data 配置 來調整 原始consumer的url。
private void refreshOverrideAndInvoker(List<URL> urls) { // mock zookeeper://xxx?mock=return null overrideDirectoryUrl(); // 1. 逐個調用注冊中心里面的配置,覆蓋原來的url,組成最新的url 放入overrideDirectoryUrl 存儲
refreshInvoker(urls); //2.根據 provider urls,重新刷新Invoker
}
4. refreshInvoker(urls);
refreshInvoker 代碼以及子代碼比較長,就不貼了, 主要做3個操作。
1. Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
這行代碼,就是根據providerUrls ,在toInvoker方法內部,幾乎再次調用了一遍配置覆蓋邏輯。最后如果配置有修改,就重新生成Invoker,沒有就用舊的Invoker。同時返回新的url <->Invoker 的映射map
2. this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
這行 代碼,就是如果服務有分組,則將分組下的 provider,包裝成StaticDirectory,組成1個Invoker ,返回。
例如,服務Demo ,分為 Group A,GroupB, GroupA下有1、2、3 ,GroupB下有2、4、5, 那么groupA,GroupB分別包裝為一個Invoker,就是2個Invoker。
實際調用的時候,先用GroupA,GroupB,去路由、負載均衡一次,選到Group A, 然后再在Group A下,路由,負載一次,選擇1、2、3下面的一個節點提供服務。
3. destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
這行代碼,就是看看舊的url 是否在新map里面存在,不存在,就是銷毀url對應的Invoker 。 這個好理解,比如 有個provider下線了,就要消減。 另外,同1provider配置變更,新的url與舊的不一樣,舊的也會銷毀,因為新的 url會生成一個Invoker。
三、 Consumer 端的 配置監聽路徑 與 執行方法
分別在 AbstractZookeeperClient類的 addChildListener() , addDataListener() 方法,ZookeeperDynamicConfiguration 類的 addListener() 方法 打印 監控的key,group,listener 。
監控path變更:
/dubbo/org.apache.dubbo.demo.DemoService/providers
/dubbo/org.apache.dubbo.demo.DemoService/configurators
/dubbo/org.apache.dubbo.demo.DemoService/routers
以上3個path 下面節點如果有變更,都會調用注冊時的lamda表達式 org.apache.dubbo.registry.zookeeper.ZookeeperRegistry$$Lambda$ ,執行方法是ZookeeperRegistry.this.notify,最終會調 RegisterDirectory.notify(urls),將path轉換成對應的url,更新路由,url配置覆蓋更新,重新生成新配置的Invoker,配置分組,刪除舊Invoker等 上面講的邏輯。 節點path變更,執行了2次配置信息覆蓋。
監控data變更: data path = /dubbo/config
data變更,是一個TreeCache Listener,然后注冊了子監聽,各自的listener 監聽自己感興趣的path。
有以下2類:
1.config data監聽:
demo-consumer.configurators ->listener = RegistryDirectory$ConsumerConfigurationListener
org.apache.dubbo.demo.DemoService.configurators ->listener = RegistryDirectory$ReferenceConfigurationListener
2. router data 監聽:
demo-consumer.condition-router -->listener = AppRouter
org.apache.dubbo.demo.DemoService.condition-router -->listener = ServiceRouter
demo-provider.tag-router -->listener = TagRouter
configData 變更,最終都是調用 RegisterDirectory.refreshInvoker ,但傳入的url是Collections.emptyList . 根據代碼推斷,就是將zk的data保存到本地,url采用cacheUrls, 調用toInvoker 方法,內部會用最新的zk data,覆蓋cacheUrl里面的參數配置,然后重新生成Invoker。 這個是之前跳過代碼,沒搞懂的邏輯,
router data 變更,會將zk 的最新data取回,放入 本地保存, 后續 服務調用時就會生效。
providers、configurators、routers 路徑下節點的變更, 比demo-consumer.configurators 的data config變更 listener會多一些處理邏輯。 但都會執行配置覆蓋,invoker重新發布。
四、總結
Consumer端啟動,主要是記錄了個人之前理解比較困難的地方,好多地方都略過去了。
Dubbo Client Invoker 的啟動沒有記錄,這個更Server端啟動幾乎是一樣的 ,有過Server的經驗,理解起來比較容易,Channel、ChannelHandler的包裝,幾乎一模一樣。 netty Handler 的處理都是Dubbo 內部的requestHandler ,不同的是,client 最終會調到received方法,server會調reply方法。
在consumer 比provider啟動多的就是 幾個配置變更訂閱,路由、分組包裝。 配置變更,listener 傳遞的層次比較深,不記錄下,開始比較容易搞混。
寫的比較粗糙,如有問題,歡迎各位提出來,一起討論!