dubbo源碼分析三:consumer注冊及生成代理對象


本章我們將分析一下consumer向注冊中心注冊,並獲取服務端相應的信息,根據這些信息生產代理對象的過程和源碼。

1.類圖 

 

上圖展示了部分消費者注冊及生成代理對象過程中需要使用到的類和接口,其中:

    spring適配涉及到的類:DubboNamespaceHandler、DubboBeanDefinitionParser、ReferenceBean;

    配置信息存儲:ReferenceConfig、RegistryConfig、MonitorConfig、ProtocolConfig、ConsumerConfig等;

    應用協議:Protocol、DubboProtocol、HessianProtocol、ThriftProtocol、RmiProtocol、AbstractProxyProtocol、AbstractProtocol等;

    注冊相關:RegistryProtocol、RegistryFactory、Registry、ZookeeperRegistryFactory、ZookeeperRegistry等

    代理和集群相關:Proxy、JdcProxyFactory、AbstractProxyFactory、InvokerInvocationHandler、Cluster、FailoverCluster、FailoverClusterInvoker、AbstractClusterInvoker、Invoker等;

2.時序圖 

    我們通過時序圖來分析一下在consumer注冊及生產代理對象的過程中,上面的類是如何串聯在一起的:

    a.spring容器通過DubboBeanDefinitionParser類的對象來解析xml文件中的標簽,生成ReferenceConfig等配置對象;

    b.ReferenceConfig的init()等方法被調用;

    c.通過spi機制確定Protocol接口的實現對象為RegistryProtocol的對象,調用它的refer()方法;

    d.通過spi機制確定RegistryFactory接口的實現對象為ZookeeperRegistryFactory,調用它的getRegistry()方法,生產ZookeeperRegistry對象;

    e.調用RegistryProtocol對象的doRefer()方法后,並調用FailoverCluster的join()方法,生成FailoverClusterInvoker的對象;

    f.調用JdkProxyFactory的getProxy()方法,生成consumer使用接口的代理對象。

3.核心代碼  

  1 private void init() {
  2     if (initialized) {//判斷是否初始化完畢
  3         return;
  4     }
  5     initialized = true;
  6     if (interfaceName == null || interfaceName.length() == 0) {
  7         throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
  8     }
  9     // 獲取消費者全局配置
 10     checkDefault();//檢查
 11     appendProperties(this);
 12     if (getGeneric() == null && getConsumer() != null) {
 13         setGeneric(getConsumer().getGeneric());
 14     }
 15     if (ProtocolUtils.isGeneric(getGeneric())) {
 16         interfaceClass = GenericService.class;
 17     } else {
 18         try {
 19             interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
 20                     .getContextClassLoader());
 21         } catch (ClassNotFoundException e) {
 22             throw new IllegalStateException(e.getMessage(), e);
 23         }
 24         checkInterfaceAndMethods(interfaceClass, methods);
 25     }
 26     String resolve = System.getProperty(interfaceName);
 27     String resolveFile = null;
 28     if (resolve == null || resolve.length() == 0) {
 29         resolveFile = System.getProperty("dubbo.resolve.file");
 30         if (resolveFile == null || resolveFile.length() == 0) {
 31             File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
 32             if (userResolveFile.exists()) {
 33                 resolveFile = userResolveFile.getAbsolutePath();
 34             }
 35         }
 36         if (resolveFile != null && resolveFile.length() > 0) {
 37             Properties properties = new Properties();
 38             FileInputStream fis = null;
 39             try {
 40                 fis = new FileInputStream(new File(resolveFile));
 41                 properties.load(fis);
 42             } catch (IOException e) {
 43                 throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
 44             } finally {
 45                 try {
 46                     if(null != fis) fis.close();
 47                 } catch (IOException e) {
 48                     logger.warn(e.getMessage(), e);
 49                 }
 50             }
 51             resolve = properties.getProperty(interfaceName);
 52         }
 53     }
 54     if (resolve != null && resolve.length() > 0) {
 55         url = resolve;
 56         if (logger.isWarnEnabled()) {
 57             if (resolveFile != null && resolveFile.length() > 0) {
 58                 logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
 59             } else {
 60                 logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
 61             }
 62         }
 63     }
 64     if (consumer != null) {
 65         if (application == null) {
 66             application = consumer.getApplication();
 67         }
 68         if (module == null) {
 69             module = consumer.getModule();
 70         }
 71         if (registries == null) {
 72             registries = consumer.getRegistries();//獲取注冊信息
 73         }
 74         if (monitor == null) {
 75             monitor = consumer.getMonitor();//獲取監控信息
 76         }
 77     }
 78     if (module != null) {
 79         if (registries == null) {
 80             registries = module.getRegistries();
 81         }
 82         if (monitor == null) {
 83             monitor = module.getMonitor();
 84         }
 85     }
 86     if (application != null) {
 87         if (registries == null) {
 88             registries = application.getRegistries();
 89         }
 90         if (monitor == null) {
 91             monitor = application.getMonitor();
 92         }
 93     }
 94     checkApplication();
 95     checkStubAndMock(interfaceClass);
 96     Map<String, String> map = new HashMap<String, String>();
 97     Map<Object, Object> attributes = new HashMap<Object, Object>();
 98     map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
 99     map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
100     map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
101     if (ConfigUtils.getPid() > 0) {
102         map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
103     }
104     if (! isGeneric()) {
105         String revision = Version.getVersion(interfaceClass, version);
106         if (revision != null && revision.length() > 0) {
107             map.put("revision", revision);
108         }
109  
110         String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
111         if(methods.length == 0) {
112             logger.warn("NO method found in service interface " + interfaceClass.getName());
113             map.put("methods", Constants.ANY_VALUE);
114         }
115         else {
116             map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
117         }
118     }
119     map.put(Constants.INTERFACE_KEY, interfaceName);
120     appendParameters(map, application);
121     appendParameters(map, module);
122     appendParameters(map, consumer, Constants.DEFAULT_KEY);
123     appendParameters(map, this);
124     String prifix = StringUtils.getServiceKey(map);
125     if (methods != null && methods.size() > 0) {
126         for (MethodConfig method : methods) {
127             appendParameters(map, method, method.getName());
128             String retryKey = method.getName() + ".retry";
129             if (map.containsKey(retryKey)) {
130                 String retryValue = map.remove(retryKey);
131                 if ("false".equals(retryValue)) {
132                     map.put(method.getName() + ".retries", "0");
133                 }
134             }
135             appendAttributes(attributes, method, prifix + "." + method.getName());
136             checkAndConvertImplicitConfig(method, map, attributes);
137         }
138     }
139     //attributes通過系統context進行存儲.
140     StaticContext.getSystemContext().putAll(attributes);
141     ref = createProxy(map);//獲取代理對象
142 }
143  
144 private T createProxy(Map<String, String> map) {
145     URL tmpUrl = new URL("temp", "localhost", 0, map);
146     final boolean isJvmRefer;
147     if (isInjvm() == null) {
148         if (url != null && url.length() > 0) { //指定URL的情況下,不做本地引用
149             isJvmRefer = false;
150         } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
151             //默認情況下如果本地有服務暴露,則引用本地服務.
152             isJvmRefer = true;
153         } else {
154             isJvmRefer = false;
155         }
156     } else {
157         isJvmRefer = isInjvm().booleanValue();
158     }
159      
160     if (isJvmRefer) {
161         URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
162         invoker = refprotocol.refer(interfaceClass, url);//獲取invoker對象
163         if (logger.isInfoEnabled()) {
164             logger.info("Using injvm service " + interfaceClass.getName());
165         }
166     } else {
167         if (url != null && url.length() > 0) { // 用戶指定URL,指定的URL可能是對點對直連地址,也可能是注冊中心URL
168             String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
169             if (us != null && us.length > 0) {
170                 for (String u : us) {
171                     URL url = URL.valueOf(u);
172                     if (url.getPath() == null || url.getPath().length() == 0) {
173                         url = url.setPath(interfaceName);
174                     }
175                     if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
176                         urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
177                     } else {
178                         urls.add(ClusterUtils.mergeUrl(url, map));
179                     }
180                 }
181             }
182         } else { // 通過注冊中心配置拼裝URL
183             List<URL> us = loadRegistries(false);
184             if (us != null && us.size() > 0) {
185                 for (URL u : us) {
186                     URL monitorUrl = loadMonitor(u);
187                     if (monitorUrl != null) {
188                         map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
189                     }
190                     urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
191                 }
192             }
193             if (urls == null || urls.size() == 0) {
194                 throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", 
195                 please config <dubbo:registry address=\"...\" /> to your spring config.");
196             }
197         }
198         for(URL invo : urls){
199             System.out.println("invoker's url : "+invo.toFullString());
200         }
201         if (urls.size() == 1) {
202             invoker = refprotocol.refer(interfaceClass, urls.get(0));//獲取invoker對象
203         } else {
204             List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
205             URL registryURL = null;
206             for (URL url : urls) {
207                 invokers.add(refprotocol.refer(interfaceClass, url));
208                 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
209                     registryURL = url; // 用了最后一個registry url
210                 }
211             }
212             if (registryURL != null) { // 有 注冊中心協議的URL
213                 // 對有注冊中心的Cluster 只用 AvailableCluster
214                 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
215                 invoker = cluster.join(new StaticDirectory(u, invokers));
216             }  else { // 不是 注冊中心的URL
217                 invoker = cluster.join(new StaticDirectory(invokers));
218             }
219              
220         }
221     }
222     System.out.println("real invoker's url :"+invoker.getUrl().toFullString());
223     Boolean c = check;
224     if (c == null && consumer != null) {
225         c = consumer.isCheck();
226     }
227     if (c == null) {
228         c = true; // default true
229     }
230     if (c && ! invoker.isAvailable()) {
231         throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " +
232                                         (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() 
233                                          + 
234                                          " use dubbo version " + Version.getVersion());
235     }
236     if (logger.isInfoEnabled()) {
237         logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
238     }
239     // 創建服務代理
240     return (T) proxyFactory.getProxy(invoker);
241 }
ReferenceConfig

創建代理的過程:

1.      獲取消費者配置

2.      獲取配置的注冊中心,通過配置中心配置拼裝URL,線上應該是個配置中心集群

3.      遍歷注冊中心List<URL>集合

加載監控中心URL,如果配置了監控中心在注冊中心url加上MONITOR_KEY

根據配置的引用服務參數給注冊中URL上加上REFER_KEY

4.      遍歷注冊中心List<URL>集合,這里注冊中心url包含了monitorUrl和referUrl

protocol.refer(interface,url)調用protocol引用服務返回invoker可執行對象(這個invoker並不是簡單的DubboInvoker, 而是由RegistryProtocol構建基於目錄服務的集群策略Invoker, 這個invoker可以通過目錄服務list出真正可調用的遠程服務invoker)

對於注冊中心Url設置集群策略為AvailableCluster, 由AvailableCluster將所有對象注冊中調用的invoker偽裝成一個invoker

5.      通過代理工廠創建遠程服務代理返回給使用着proxyFactory.getProxy(invoker);

 

procotol.refer(interface, url) 引用服務的過程

1.      經過ProtocolListenerWrapper, ProtocolFilterWrapper由於是注冊中心url調用RegistryProtocol.refer

2.      獲取注冊中心協議zookeeper, Redis, 還是dubbo, 並根據注冊中心協議通過注冊器工廠RegistryFactory.getRegistry(url)獲取注冊器Registry用來跟注冊中心交互

3.      根據配置的group分組

4.      創建注冊服務目錄RegistryDirectory並設置注冊器

5.      構建訂閱服務的subscribeUrl

6.      通過注冊器registry向注冊中心注冊subscribeUrl消費端url

7.      目錄服務registryDirectory.subscribe(subscribeUrl)訂閱服務(這里我們以開源版本zookeeper為注冊中心為例來講解, dubbo協議的注冊中心有點不一樣)

其實內部也是通過注冊器registry.subscribe(url,this) 這里this就是registryDirectory它實現了NotifyListener。

服務提供者在向zookeeper注冊服務/dubbo/com.alibaba.dubbo.demo.DemoService/providers/節點下寫下自己的URL地址

服務消費者向zookeepr注冊服務/dubbo/com.alibaba.dubbo.demo.DemoService/consumers/節點下寫下自己的URL地址

服務消費者向zookeeper 訂閱服務/dubbo/com.alibaba.dubbo.demo.DemoService/ providers /節點下所有服務提供者URL地址

Zookeeper通過watcher機制實現對節點的監聽,節點數據變化通過節點上的watcher回調客戶端, 重新生成對服務的refer

在訂閱的過程中通過獲取/dubbo/com.alibaba.dubbo.demo.DemoService/providers /下的所有服務提供者的urls(類似dubbo://10.33. 37.8:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&owner=william&pid=7356&side=consumer&timestamp=1416971340626),主動回調NotifyListener來根據urls生成對服務提供者的引用生成可執行invokers,供目錄服務持有着,

看下如下RegistryDirectory.notify(urls)方法中的代碼實現

 

8.      通過cluster.join(directory) 合並invoker並提供集群調用策略

 

DubboProtocol.refer過程:

1.      經過ProtocolListenerWrapper, ProtocolFilterWrapper構建監聽器鏈和過濾器鏈。

2.      DubboProtocol根據url獲取ExchangeClient對象,如果是share存在就返回不存在創建新對象不是share直接創建。ExchangeClient是底層通信的客戶端,對於通信層的創建功能不在這里講解。

3.      創建DubboInvoker, 這個invoker對象包含對遠程服務提供者的長鏈接,是真正執行遠程服務調用的可執行對象

4.      將創建的invoker返回給目錄服務

 

 

官方文檔的應用服務的序列圖

 

引用服務活動圖:

 

 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM