Dubbo源碼學習總結系列七---注冊中心


        Dubbo注冊中心是框架的核心模塊,提供了服務注冊發現(包括服務提供者、消費者、路由策略、覆蓋規則)的功能,該功能集中體現了服務治理的特性。該模塊結合Cluster模塊實現了集群服務。Dubbo管理控制台查詢注冊的數據展現服務提供者、消費者、路由策略、覆蓋規則相關信息。監控中心從注冊中心訂閱相關信息實時監控調用鏈調用情況。

        那么,Registry模塊的職責我們總結為:

        (1)注冊:包括服務提供者、路由策略、覆蓋規則信息注冊到注冊中心;

        (2)訂閱:消費端從注冊中心訂閱相關信息;

        (3)通知(push):注冊信息有變化,注冊中心向訂閱者主動通知變更信息;

        (4)獲得服務列表(pull):消費方主動從注冊中心目錄服務拉取服務提供者列表等信息。

 

        從以下幾個架構圖可以看出Registry在Dubbo框架中所扮演的角色,架構圖摘自Dubbo開發者文檔:

        (1)依賴關系圖中1-3的步驟我們看到提供者和消費者對注冊中心的依賴;

        

        (2)調用鏈圖(一部分)顯示了組件調用Registry的場景:消費端refer時,從注冊中心目錄服務獲得服務提供者列表。

        (3)暴露服務時序圖中,服務提供者向注冊中心注冊或取消注冊服務提供者信息。

        (4)引用服務時序圖中消費端配置指向RegistryProtocol,實現向注冊中心訂閱提供者信息,銷毀時取消訂閱。

 

        注冊中心的配置方法如下:        

<!-- 定義注冊中心 -->
<dubbo:registry id="xxx1" address="xxx://ip:port" />
<!-- 引用注冊中心,如果沒有配置registry屬性,將在ApplicationContext中自動掃描registry配置 -->
<dubbo:service registry="xxx1" />
<!-- 引用注冊中心缺省值,當<dubbo:service>沒有配置registry屬性時,使用此配置 -->
<dubbo:provider registry="xxx1" />

 

        定義的接口有:

        RegistryFactory.java:

 1 public interface RegistryFactory {
 2     /**
 3      * 連接注冊中心.
 4      * 
 5      * 連接注冊中心需處理契約:<br>
 6      * 1. 當設置check=false時表示不檢查連接,否則在連接不上時拋出異常。<br>
 7      * 2. 支持URL上的username:password權限認證。<br>
 8      * 3. 支持backup=10.20.153.10備選注冊中心集群地址。<br>
 9      * 4. 支持file=registry.cache本地磁盤文件緩存。<br>
10      * 5. 支持timeout=1000請求超時設置。<br>
11      * 6. 支持session=60000會話超時或過期設置。<br>
12      * 
13      * @param url 注冊中心地址,不允許為空
14      * @return 注冊中心引用,總不返回空
15      */
16     Registry getRegistry(URL url); 
17 }

 

   RegistryService.java:

 1 public interface RegistryService { // Registry extends RegistryService 
 2     /**
 3      * 注冊服務.
 4      * 
 5      * 注冊需處理契約:<br>
 6      * 1. 當URL設置了check=false時,注冊失敗后不報錯,在后台定時重試,否則拋出異常。<br>
 7      * 2. 當URL設置了dynamic=false參數,則需持久存儲,否則,當注冊者出現斷電等情況異常退出時,需自動刪除。<br>
 8      * 3. 當URL設置了category=overrides時,表示分類存儲,缺省類別為providers,可按分類部分通知數據。<br>
 9      * 4. 當注冊中心重啟,網絡抖動,不能丟失數據,包括斷線自動刪除數據。<br>
10      * 5. 允許URI相同但參數不同的URL並存,不能覆蓋。<br>
11      * 
12      * @param url 注冊信息,不允許為空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
13      */
14     void register(URL url);
15 
16     /**
17      * 取消注冊服務.
18      * 
19      * 取消注冊需處理契約:<br>
20      * 1. 如果是dynamic=false的持久存儲數據,找不到注冊數據,則拋IllegalStateException,否則忽略。<br>
21      * 2. 按全URL匹配取消注冊。<br>
22      * 
23      * @param url 注冊信息,不允許為空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
24      */
25     void unregister(URL url);
26 
27     /**
28      * 訂閱服務.
29      * 
30      * 訂閱需處理契約:<br>
31      * 1. 當URL設置了check=false時,訂閱失敗后不報錯,在后台定時重試。<br>
32      * 2. 當URL設置了category=overrides,只通知指定分類的數據,多個分類用逗號分隔,並允許星號通配,表示訂閱所有分類數據。<br>
33      * 3. 允許以interface,group,version,classifier作為條件查詢,如:interface=com.alibaba.foo.BarService&version=1.0.0<br>
34      * 4. 並且查詢條件允許星號通配,訂閱所有接口的所有分組的所有版本,或:interface=*&group=*&version=*&classifier=*<br>
35      * 5. 當注冊中心重啟,網絡抖動,需自動恢復訂閱請求。<br>
36      * 6. 允許URI相同但參數不同的URL並存,不能覆蓋。<br>
37      * 7. 必須阻塞訂閱過程,等第一次通知完后再返回。<br>
38      * 
39      * @param url 訂閱條件,不允許為空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
40      * @param listener 變更事件監聽器,不允許為空
41      */
42     void subscribe(URL url, NotifyListener listener);
43 
44     /**
45      * 取消訂閱服務.
46      * 
47      * 取消訂閱需處理契約:<br>
48      * 1. 如果沒有訂閱,直接忽略。<br>
49      * 2. 按全URL匹配取消訂閱。<br>
50      * 
51      * @param url 訂閱條件,不允許為空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
52      * @param listener 變更事件監聽器,不允許為空
53      */
54     void unsubscribe(URL url, NotifyListener listener);
55 
56     /**
57      * 查詢注冊列表,與訂閱的推模式相對應,這里為拉模式,只返回一次結果。
58      * 
59      * @see com.alibaba.dubbo.registry.NotifyListener#notify(List)
60      * @param url 查詢條件,不允許為空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
61      * @return 已注冊信息列表,可能為空,含義同{@link com.alibaba.dubbo.registry.NotifyListener#notify(List<URL>)}的參數。
62      */
63     List<URL> lookup(URL url);
64 
65 }

         NotifyListener.java:

 1 public interface NotifyListener { 
 2     /**
 3      * 當收到服務變更通知時觸發。
 4      * 
 5      * 通知需處理契約:<br>
 6      * 1. 總是以服務接口和數據類型為維度全量通知,即不會通知一個服務的同類型的部分數據,用戶不需要對比上一次通知結果。<br>
 7      * 2. 訂閱時的第一次通知,必須是一個服務的所有類型數據的全量通知。<br>
 8      * 3. 中途變更時,允許不同類型的數據分開通知,比如:providers, consumers, routes, overrides,允許只通知其中一種類型,但該類型的數據必須是全量的,不是增量的。<br>
 9      * 4. 如果一種類型的數據為空,需通知一個empty協議並帶category參數的標識性URL數據。<br>
10      * 5. 通知者(即注冊中心實現)需保證通知的順序,比如:單線程推送,隊列串行化,帶版本對比。<br>
11      * 
12      * @param urls 已注冊信息列表,總不為空,含義同{@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}的返回值。
13      */
14     void notify(List<URL> urls);
15 
16 }

          Registry.java,集成了Node和RegistryService接口,定義了一個針對特定URL,有生命周期的注冊中心接口:

1 /**
2  * Registry. (SPI, Prototype, ThreadSafe)
3  *
4  * @see com.alibaba.dubbo.registry.RegistryFactory#getRegistry(URL)
5  * @see com.alibaba.dubbo.registry.support.AbstractRegistry
6  */
7 public interface Registry extends Node, RegistryService {
8 }

         register模塊在調用鏈中的位置是:消費端通過ClusterInvoker調用RegisterDirectory的list(url)方法得到Invoker列表,即從注冊中心獲得指定url的Invoker列表。服務提供者在注冊中心注冊了自己的服務信息,被消費端訂閱獲取。見下圖紅框中的紅色箭頭,即為調用過程。

        

        RegistoryDirectory.java為注冊中心目錄服務,繼承了AbstractDirectory,實現doList(invokation)方法,它是屬於消費端類。

        主要邏輯為:

        (1)從invokation中解析出要請求的URL;

        (2)根據配置中的protocol和Url查找緩存(HashMap,本地內存緩存)中的invokers,如果找到則返回,否則調用屬性protocol(初始化時注入的實例,如DubboProtocol實例)的refer()方法得到invokers並返回;

 1     public List<Invoker<T>> doList(Invocation invocation) {
 2         if (forbidden) {
 3             // 1. No service provider 2. Service providers are disabled
 4             throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
 5                 "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()
 6                     + " use dubbo version " + Version.getVersion() + ", may be providers disabled or not registered ?");
 7         }
 8         List<Invoker<T>> invokers = null;
//invoker本地緩存,方法名作為key的invoker Map
9 Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference 10 if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) { 11 String methodName = RpcUtils.getMethodName(invocation); 12 Object[] args = RpcUtils.getArguments(invocation); 13 if (args != null && args.length > 0 && args[0] != null 14 && (args[0] instanceof String || args[0].getClass().isEnum())) { 15 invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter 16 } 17 if (invokers == null) { 18 invokers = localMethodInvokerMap.get(methodName); 19 } 20 if (invokers == null) { 21 invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); 22 } 23 if (invokers == null) { 24 Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator(); 25 if (iterator.hasNext()) { 26 invokers = iterator.next(); 27 } 28 } 29 } 30 return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers; 31 }

         以下代碼實現了根據url列表構造invoker的map:

 1 /**
 2      * Turn urls into invokers, and if url has been refer, will not re-reference.
 3      *
 4      * @param urls
 5      * @return invokers
 6      */
 7     private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
 8         Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
 9         if (urls == null || urls.size() == 0) {
10             return newUrlInvokerMap;
11         }
12         Set<String> keys = new HashSet<String>();
13         String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
14         for (URL providerUrl : urls) {
15             // If protocol is configured at the reference side, only the matching protocol is selected
16             if (queryProtocols != null && queryProtocols.length() > 0) {
17                 boolean accept = false;
18                 String[] acceptProtocols = queryProtocols.split(",");
19                 for (String acceptProtocol : acceptProtocols) {
20                     if (providerUrl.getProtocol().equals(acceptProtocol)) {
21                         accept = true;
22                         break;
23                     }
24                 }
25                 if (!accept) {
26                     continue;
27                 }
28             }
29             if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
30                 continue;
31             }
32             if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
33                 logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
34                         + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
35                 continue;
36             }
37             URL url = mergeUrl(providerUrl);
38 
39             String key = url.toFullString(); // The parameter urls are sorted
40             if (keys.contains(key)) { // Repeated url
41                 continue;
42             }
43             keys.add(key);
44             // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
45             Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
46             Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
47             if (invoker == null) { // Not in the cache, refer again
48                 try {
49                     boolean enabled = true;
50                     if (url.hasParameter(Constants.DISABLED_KEY)) {
51                         enabled = !url.getParameter(Constants.DISABLED_KEY, false);
52                     } else {
53                         enabled = url.getParameter(Constants.ENABLED_KEY, true);
54                     }
55                     if (enabled) {
56                         invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
57                     }
58                 } catch (Throwable t) {
59                     logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
60                 }
61                 if (invoker != null) { // Put new invoker in cache
62                     newUrlInvokerMap.put(key, invoker);
63                 }
64             } else {
65                 newUrlInvokerMap.put(key, invoker);
66             }
67         }
68         keys.clear();
69         return newUrlInvokerMap;
70     }

         根據URL中的method參數構造一個以method為key的invoker的map,這是為了分組合並多個service的調用結果功能做准備。

 1     /**
 2      * Transform the invokers list into a mapping relationship with a method
 3      *
 4      * @param invokersMap Invoker Map
 5      * @return Mapping relation between Invoker and method
 6      */
 7     private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
 8         Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>();
 9         // According to the methods classification declared by the provider URL, the methods is compatible with the registry to execute the filtered methods
10         List<Invoker<T>> invokersList = new ArrayList<Invoker<T>>();
11         if (invokersMap != null && invokersMap.size() > 0) {
12             for (Invoker<T> invoker : invokersMap.values()) {
13                 String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY);
14                 if (parameter != null && parameter.length() > 0) {
15                     String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);
16                     if (methods != null && methods.length > 0) {
17                         for (String method : methods) {
18                             if (method != null && method.length() > 0
19                                     && !Constants.ANY_VALUE.equals(method)) {
20                                 List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
21                                 if (methodInvokers == null) {
22                                     methodInvokers = new ArrayList<Invoker<T>>();
23                                     newMethodInvokerMap.put(method, methodInvokers);
24                                 }
25                                 methodInvokers.add(invoker);
26                             }
27                         }
28                     }
29                 }
30                 invokersList.add(invoker);
31             }
32         }
33         List<Invoker<T>> newInvokersList = route(invokersList, null);
34         newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);
35         if (serviceMethods != null && serviceMethods.length > 0) {
36             for (String method : serviceMethods) {
37                 List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
38                 if (methodInvokers == null || methodInvokers.size() == 0) {
39                     methodInvokers = newInvokersList;
40                 }
41                 newMethodInvokerMap.put(method, route(methodInvokers, method));
42             }
43         }
44         // sort and unmodifiable
45         for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {
46             List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
47             Collections.sort(methodInvokers, InvokerComparator.getComparator());
48             newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));
49         }
50         return Collections.unmodifiableMap(newMethodInvokerMap);
51     }

 

        以上邏輯實現了Directory接口,構造了注冊中心目錄服務的功能。RegistoryDirectory類本身還實現了NotifyListener接口,實現了notify()方法。在注冊中心發布訂閱模式中,RegistoryDirectory可作為訂閱者訂閱一個事件,如提供者、路由策略、配置規則的變動事件,事件發生后接受通知,並觸發notify(urls)方法,接受新的提供者、路由策略、配置規則的url,並刷新本地url緩存。

 1     public synchronized void notify(List<URL> urls) {
 2         List<URL> invokerUrls = new ArrayList<URL>();
 3         List<URL> routerUrls = new ArrayList<URL>();
 4         List<URL> configuratorUrls = new ArrayList<URL>();
 5         for (URL url : urls) {
 6             String protocol = url.getProtocol();
 7             String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
 8             if (Constants.ROUTERS_CATEGORY.equals(category)
 9                     || Constants.ROUTE_PROTOCOL.equals(protocol)) {
10                 routerUrls.add(url);
11             } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
12                     || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
13                 configuratorUrls.add(url);
14             } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
15                 invokerUrls.add(url);
16             } else {
17                 logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
18             }
19         }
20         // 刷新配置規則configurators
21         if (configuratorUrls != null && configuratorUrls.size() > 0) {
22             this.configurators = toConfigurators(configuratorUrls);
23         }
24         // 刷新路由規則routers
25         if (routerUrls != null && routerUrls.size() > 0) {
26             List<Router> routers = toRouters(routerUrls);
27             if (routers != null) { // null - do nothing
28                 setRouters(routers);
29             }
30         }
31         List<Configurator> localConfigurators = this.configurators; // local reference
32         // 合並以覆蓋方式更新的參數merge override parameters
33         this.overrideDirectoryUrl = directoryUrl;
34         if (localConfigurators != null && localConfigurators.size() > 0) {
35             for (Configurator configurator : localConfigurators) {
36                 this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
37             }
38         }
39         // 刷新提供者invokers providers
40         refreshInvoker(invokerUrls);
41     }

 

        

        下面我們再討論一下RegistoryProtocol類在注冊中心所起的作用。如下圖所示,RegistoryProtocol類是消費端和提供者端共用的類,藍色虛線表示類初始化過程,即初始化時的組裝鏈。

        首先看看export()實現邏輯:

        (1)調用doLocalExport(originInvoker)方法得到本地緩存的exporter(如果緩存找不到則用protocol.export()創建並放入緩存);

        (2)調用getRegistry(originInvoker)方法得到當前originInvoker的注冊中心實例registry

 1     public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
 2         //從緩存中取得export,或者通過protocol.export()方法創建exporter export invoker
 3         final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
 4 
 5         URL registryUrl = getRegistryUrl(originInvoker);
 6 
 7         //registry provider
 8         final Registry registry = getRegistry(originInvoker);
 9         final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
10 
11         //取得是否立即注冊的配置,默認為true to judge to delay publish whether or not
12         boolean register = registedProviderUrl.getParameter("register", true);
13         //將originInvoker加入本地緩存
14         ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
15         //立即注冊
16         if (register) {
//根據registryUrl取得相應的registry實例,調用registry.register()方法注冊提供者url
17 register(registryUrl, registedProviderUrl); 18 ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); 19 } 20 21 //訂閱配置策略更新事件 Subscribe the override data 22 // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover. 23 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); 24 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); 25 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 26 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 27 //Ensure that a new exporter instance is returned every time export 28 return new Exporter<T>() { 29 public Invoker<T> getInvoker() { 30 return exporter.getInvoker(); 31 } 32 33 public void unexport() { 34 try { 35 exporter.unexport(); 36 } catch (Throwable t) { 37 logger.warn(t.getMessage(), t); 38 } 39 try { 40 registry.unregister(registedProviderUrl); 41 } catch (Throwable t) { 42 logger.warn(t.getMessage(), t); 43 } 44 try { 45 overrideListeners.remove(overrideSubscribeUrl); 46 registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); 47 } catch (Throwable t) { 48 logger.warn(t.getMessage(), t); 49 } 50 } 51 }; 52 }

         接下來我們分析一下refer()方法的實現邏輯。refer()方法是客戶端指向一個服務接口,用來獲取一個服務接口的invoker(調用對象)遠程請求服務方法的,顧名思義,RegisterProtocol協議類當然是從注冊中心訂閱獲取一個invoker對象,這就涉及到對注冊中心的操作。

實現邏輯大概是:

(1)通過URL得到相應的注冊中心對象Registry(通常為zookeeper注冊中心);

(2)如果請求的服務類別是RegistryService注冊服務(猜測可能是管理控制台如dubbo-admin或監控台monitor用到的),則直接返回注冊中心代理對象;

(3)請求的服務類別是普通服務接口,判斷url中的group屬性,如果是group="a,b" 或 group="*"的配置,則考慮分組聚合結果;

(4)調用doRefer()實現指向邏輯;

 1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
 2         url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//通過URL得到相應的注冊中心對象Registry(通常為zookeeper注冊中心)
3 Registry registry = registryFactory.getRegistry(url);
//如果請求的服務類別是RegistryService注冊服務(猜測可能是管理控制台如dubbo-admin或監控台monitor用到的),則直接返回注冊中心代理對象
4 if (RegistryService.class.equals(type)) { 5 return proxyFactory.getInvoker((T) registry, type, url); 6 } 7 8 // group="a,b" or group="*" 請求的服務類別是普通服務接口,判斷url中的group屬性,如果是group="a,b" 或 group="*"的配置,則考慮分組聚合結果 9 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); 10 String group = qs.get(Constants.GROUP_KEY); 11 if (group != null && group.length() > 0) { 12 if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 13 || "*".equals(group)) { 14 return doRefer(getMergeableCluster(), registry, type, url); 15 } 16 } 17 return doRefer(cluster, registry, type, url); 18 }

 

doRefer()方法實現指向普通遠程服務的邏輯:

(1)創建注冊目錄服務對象,並設置注冊中心對象和協議;

(2)向注冊中心注冊客戶端的服務指向事件,為管理控制台和監控台提供指向信息;

(3)從注冊中心訂閱所指向的服務接口提供列表;

(4)根據指定的集群策略,得到一個集群服務調用代理,該代理可根據指定的集群策略調用遠程服務接口,該接口是從注冊中心獲得的可用服務(所指定的某個服務接口)列表,具體原理參見集群邏輯;

(5)將Consumer包裝類對象存入本地緩存;

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//創建注冊目錄服務對象,並設置注冊中心對象和協議 RegistryDirectory
<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) {
//向注冊中心注冊客戶端的服務指向事件,為管理控制台和監控台提供指向信息 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(
false))); }
//從注冊中心訂閱所指向的服務接口提供列表 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); //根據指定的集群策略,得到一個集群服務調用代理,該代理可根據指定的集群策略調用遠程服務接口,該接口是從注冊中心獲得的可用服務(所指定的某個服務接口)列表,具體原理參見集群邏輯 Invoker invoker = cluster.join(directory);
//將Consumer包裝類對象存入本地緩存 ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
return invoker; }

 

        在介紹Zookeeper注冊中心之前,我們討論一下ZookeeperRegister的爺爺類AbstractRegistry類和父類FailbackRegistry類的實現原理(顧名思義,它實現了失敗自動重試的注冊服務。它本身繼承了AbstractRegistry類)。

AbstractRegistry類實現了Registry接口,將注冊、訂閱的url寫入HashMap結構的內存緩存,並在訂閱接受通知時將訂閱的url信息寫入本地文件(用戶文件夾.dubbo內)作為本地緩存。

        AbstractRegistry類的構造函數將本地文件url注冊信息緩存載入內存,並觸發通知notify()將新的url的集群備用地址寫入文件緩存和notifys內存緩存。

 1     public AbstractRegistry(URL url) {
 2         setUrl(url);
 3         // Start file save timer
 4         syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
 5         String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
 6         File file = null;
 7         if (ConfigUtils.isNotEmpty(filename)) {
 8             file = new File(filename);
 9             if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
10                 if (!file.getParentFile().mkdirs()) {
11                     throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
12                 }
13             }
14         }
15         this.file = file;
//將本地文件url緩存載入內存
16 loadProperties();
//將url的集群備用地址寫入本地文件和notifys內存緩存
17 notify(url.getBackupUrls()); 18 }

 

         以下是將本地properties內存緩存刷新到文件的代碼,為了解決同步寫入問題,增加了一個.lock文件作為鎖。同時通過版本號樂觀鎖控制讀取到最新的文件內容后再追加新的內容。

 1     public void doSaveProperties(long version) {
 2         if (version < lastCacheChanged.get()) {
 3             return;
 4         }
 5         if (file == null) {
 6             return;
 7         }
 8         // Save
 9         try {
//創建加鎖文件
10 File lockfile = new File(file.getAbsolutePath() + ".lock"); 11 if (!lockfile.exists()) { 12 lockfile.createNewFile(); 13 } 14 RandomAccessFile raf = new RandomAccessFile(lockfile, "rw"); 15 try { 16 FileChannel channel = raf.getChannel(); 17 try { 18 FileLock lock = channel.tryLock(); 19 if (lock == null) { 20 throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties"); 21 } 22 // Save 23 try { 24 if (!file.exists()) { 25 file.createNewFile(); 26 } 27 FileOutputStream outputFile = new FileOutputStream(file); 28 try {
//寫入內容
29 properties.store(outputFile, "Dubbo Registry Cache"); 30 } finally { 31 outputFile.close(); 32 } 33 } finally { 34 lock.release(); 35 } 36 } finally { 37 channel.close(); 38 } 39 } finally { 40 raf.close(); 41 } 42 } catch (Throwable e) { 43 if (version < lastCacheChanged.get()) { 44 return; 45 } else { 46 registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet())); 47 } 48 logger.warn("Failed to save registry store file, cause: " + e.getMessage(), e); 49 } 50 }

         register(url)方法將url寫入registered內存緩存,subscribe(URL url, NotifyListener listener)方法將url和listener鍵值對寫入listeners內存緩存,在zookeeperRegistry類的doSubscribe中觸發notify,觸發listener的notify方法。notify(urls)將urls列表寫入notified內存緩存中,並觸發對應url的listener.notify()方法。lookup(url)方法從notified本地內存緩存找到對應的url地址列表,如果不存在,則觸發subscribe()方法訂閱url。

         FailbackRegistry類是失敗自動嘗試恢復的注冊類,繼承了AbstractRegistry類。以下是構造方法,初始化了一個定時任務管理器,定時掃描注冊/取消注冊、訂閱/取消訂閱、通知的失敗列表,重試注冊/取消注冊、訂閱/取消訂閱、通知任務,成功后從失敗列表移除。在注冊/取消注冊、訂閱/取消訂閱、通知方法拋出異常時,自動將對應的url加入到失敗列表等待重試。

 1     public FailbackRegistry(URL url) {
 2         super(url);
 3         int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
 4         this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
 5             public void run() {
 6                 // Check and connect to the registry
 7                 try {
 8                     retry();
 9                 } catch (Throwable t) { // Defensive fault tolerance
10                     logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
11                 }
12             }
13         }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
14     }

 

         ZookeeperRegistry類基於Zookeeper目錄服務,將注冊的url信息寫入zk,從zk中訂閱url,從zk查找url對應的服務信息。

        該類主要實現了doRegister()、doUnRegister()、doSubscribe()、doUnSubscribe()、lookup()方法。

1     protected void doRegister(URL url) {
2         try {
//將url信息存入zk
3 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); 4 } catch (Throwable e) { 5 throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 6 } 7 }

 

 

1     protected void doUnregister(URL url) {
2         try {
//從zk刪除url信息
3 zkClient.delete(toUrlPath(url)); 4 } catch (Throwable e) { 5 throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 6 } 7 }

 

         doSubscribe()方法將訂閱的url存入zk,觸發notify()方法,調用listener.notify方法。      

 1     protected void doSubscribe(final URL url, final NotifyListener listener) {
 2         try {
 3             if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
 4                 String root = toRootPath();
 5                 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
 6                 if (listeners == null) {
 7                     zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
 8                     listeners = zkListeners.get(url);
 9                 }
10                 ChildListener zkListener = listeners.get(listener);
11                 if (zkListener == null) {
12                     listeners.putIfAbsent(listener, new ChildListener() {
13                         public void childChanged(String parentPath, List<String> currentChilds) {
14                             for (String child : currentChilds) {
15                                 child = URL.decode(child);
16                                 if (!anyServices.contains(child)) {
17                                     anyServices.add(child);
18                                     subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
19                                             Constants.CHECK_KEY, String.valueOf(false)), listener);
20                                 }
21                             }
22                         }
23                     });
24                     zkListener = listeners.get(listener);
25                 }
26                 zkClient.create(root, false);
27                 List<String> services = zkClient.addChildListener(root, zkListener);
28                 if (services != null && services.size() > 0) {
29                     for (String service : services) {
30                         service = URL.decode(service);
31                         anyServices.add(service);
32                         subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
33                                 Constants.CHECK_KEY, String.valueOf(false)), listener);
34                     }
35                 }
36             } else {
37                 List<URL> urls = new ArrayList<URL>();
38                 for (String path : toCategoriesPath(url)) {
39                     ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
40                     if (listeners == null) {
41                         zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
42                         listeners = zkListeners.get(url);
43                     }
44                     ChildListener zkListener = listeners.get(listener);
45                     if (zkListener == null) {
46                         listeners.putIfAbsent(listener, new ChildListener() {
47                             public void childChanged(String parentPath, List<String> currentChilds) {
48                                 ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
49                             }
50                         });
51                         zkListener = listeners.get(listener);
52                     }
//path信息加入zk
53 zkClient.create(path, false); 54 List<String> children = zkClient.addChildListener(path, zkListener); 55 if (children != null) { 56 urls.addAll(toUrlsWithEmpty(url, path, children)); 57 } 58 } 59 notify(url, listener, urls); 60 } 61 } catch (Throwable e) { 62 throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 63 } 64 }

         lookup(url)方法從zk中查詢所有相關的url服務列表:

 1     public List<URL> lookup(URL url) {
 2         if (url == null) {
 3             throw new IllegalArgumentException("lookup url == null");
 4         }
 5         try {
 6             List<String> providers = new ArrayList<String>();
 7             for (String path : toCategoriesPath(url)) {
//查詢所有相關的url列表
8 List<String> children = zkClient.getChildren(path); 9 if (children != null) { 10 providers.addAll(children); 11 } 12 } 13 return toUrlsWithoutEmpty(url, providers); 14 } catch (Throwable e) { 15 throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 16 } 17 }

 

 

         Dubbo出了實現基於zk的注冊中心,還實現了基於redis的注冊中心,multicast基於網絡廣播的注冊中心,基本原理類似,在此不再詳述。        

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM