本章我們將分析一下consumer向注冊中心注冊,並獲取服務端相應的信息,根據這些信息生產代理對象的過程和源碼。
1.類圖
上圖展示了部分消費者注冊及生成代理對象過程中需要使用到的類和接口,其中:
spring適配涉及到的類:DubboNamespaceHandler、DubboBeanDefinitionParser、ReferenceBean;
配置信息存儲:ReferenceConfig、RegistryConfig、MonitorConfig、ProtocolConfig、ConsumerConfig等;
應用協議:Protocol、DubboProtocol、HessianProtocol、ThriftProtocol、RmiProtocol、AbstractProxyProtocol、AbstractProtocol等;
注冊相關:RegistryProtocol、RegistryFactory、Registry、ZookeeperRegistryFactory、ZookeeperRegistry等
代理和集群相關:Proxy、JdcProxyFactory、AbstractProxyFactory、InvokerInvocationHandler、Cluster、FailoverCluster、FailoverClusterInvoker、AbstractClusterInvoker、Invoker等;
2.時序圖
我們通過時序圖來分析一下在consumer注冊及生產代理對象的過程中,上面的類是如何串聯在一起的:
a.spring容器通過DubboBeanDefinitionParser類的對象來解析xml文件中的標簽,生成ReferenceConfig等配置對象;
b.ReferenceConfig的init()等方法被調用;
c.通過spi機制確定Protocol接口的實現對象為RegistryProtocol的對象,調用它的refer()方法;
d.通過spi機制確定RegistryFactory接口的實現對象為ZookeeperRegistryFactory,調用它的getRegistry()方法,生產ZookeeperRegistry對象;
e.調用RegistryProtocol對象的doRefer()方法后,並調用FailoverCluster的join()方法,生成FailoverClusterInvoker的對象;
f.調用JdkProxyFactory的getProxy()方法,生成consumer使用接口的代理對象。
3.核心代碼

1 private void init() { 2 if (initialized) {//判斷是否初始化完畢 3 return; 4 } 5 initialized = true; 6 if (interfaceName == null || interfaceName.length() == 0) { 7 throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!"); 8 } 9 // 獲取消費者全局配置 10 checkDefault();//檢查 11 appendProperties(this); 12 if (getGeneric() == null && getConsumer() != null) { 13 setGeneric(getConsumer().getGeneric()); 14 } 15 if (ProtocolUtils.isGeneric(getGeneric())) { 16 interfaceClass = GenericService.class; 17 } else { 18 try { 19 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() 20 .getContextClassLoader()); 21 } catch (ClassNotFoundException e) { 22 throw new IllegalStateException(e.getMessage(), e); 23 } 24 checkInterfaceAndMethods(interfaceClass, methods); 25 } 26 String resolve = System.getProperty(interfaceName); 27 String resolveFile = null; 28 if (resolve == null || resolve.length() == 0) { 29 resolveFile = System.getProperty("dubbo.resolve.file"); 30 if (resolveFile == null || resolveFile.length() == 0) { 31 File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); 32 if (userResolveFile.exists()) { 33 resolveFile = userResolveFile.getAbsolutePath(); 34 } 35 } 36 if (resolveFile != null && resolveFile.length() > 0) { 37 Properties properties = new Properties(); 38 FileInputStream fis = null; 39 try { 40 fis = new FileInputStream(new File(resolveFile)); 41 properties.load(fis); 42 } catch (IOException e) { 43 throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e); 44 } finally { 45 try { 46 if(null != fis) fis.close(); 47 } catch (IOException e) { 48 logger.warn(e.getMessage(), e); 49 } 50 } 51 resolve = properties.getProperty(interfaceName); 52 } 53 } 54 if (resolve != null && resolve.length() > 0) { 55 url = resolve; 56 if (logger.isWarnEnabled()) { 57 if (resolveFile != null && resolveFile.length() > 0) { 58 logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service."); 59 } else { 60 logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service."); 61 } 62 } 63 } 64 if (consumer != null) { 65 if (application == null) { 66 application = consumer.getApplication(); 67 } 68 if (module == null) { 69 module = consumer.getModule(); 70 } 71 if (registries == null) { 72 registries = consumer.getRegistries();//獲取注冊信息 73 } 74 if (monitor == null) { 75 monitor = consumer.getMonitor();//獲取監控信息 76 } 77 } 78 if (module != null) { 79 if (registries == null) { 80 registries = module.getRegistries(); 81 } 82 if (monitor == null) { 83 monitor = module.getMonitor(); 84 } 85 } 86 if (application != null) { 87 if (registries == null) { 88 registries = application.getRegistries(); 89 } 90 if (monitor == null) { 91 monitor = application.getMonitor(); 92 } 93 } 94 checkApplication(); 95 checkStubAndMock(interfaceClass); 96 Map<String, String> map = new HashMap<String, String>(); 97 Map<Object, Object> attributes = new HashMap<Object, Object>(); 98 map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); 99 map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion()); 100 map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); 101 if (ConfigUtils.getPid() > 0) { 102 map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); 103 } 104 if (! isGeneric()) { 105 String revision = Version.getVersion(interfaceClass, version); 106 if (revision != null && revision.length() > 0) { 107 map.put("revision", revision); 108 } 109 110 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); 111 if(methods.length == 0) { 112 logger.warn("NO method found in service interface " + interfaceClass.getName()); 113 map.put("methods", Constants.ANY_VALUE); 114 } 115 else { 116 map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); 117 } 118 } 119 map.put(Constants.INTERFACE_KEY, interfaceName); 120 appendParameters(map, application); 121 appendParameters(map, module); 122 appendParameters(map, consumer, Constants.DEFAULT_KEY); 123 appendParameters(map, this); 124 String prifix = StringUtils.getServiceKey(map); 125 if (methods != null && methods.size() > 0) { 126 for (MethodConfig method : methods) { 127 appendParameters(map, method, method.getName()); 128 String retryKey = method.getName() + ".retry"; 129 if (map.containsKey(retryKey)) { 130 String retryValue = map.remove(retryKey); 131 if ("false".equals(retryValue)) { 132 map.put(method.getName() + ".retries", "0"); 133 } 134 } 135 appendAttributes(attributes, method, prifix + "." + method.getName()); 136 checkAndConvertImplicitConfig(method, map, attributes); 137 } 138 } 139 //attributes通過系統context進行存儲. 140 StaticContext.getSystemContext().putAll(attributes); 141 ref = createProxy(map);//獲取代理對象 142 } 143 144 private T createProxy(Map<String, String> map) { 145 URL tmpUrl = new URL("temp", "localhost", 0, map); 146 final boolean isJvmRefer; 147 if (isInjvm() == null) { 148 if (url != null && url.length() > 0) { //指定URL的情況下,不做本地引用 149 isJvmRefer = false; 150 } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { 151 //默認情況下如果本地有服務暴露,則引用本地服務. 152 isJvmRefer = true; 153 } else { 154 isJvmRefer = false; 155 } 156 } else { 157 isJvmRefer = isInjvm().booleanValue(); 158 } 159 160 if (isJvmRefer) { 161 URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); 162 invoker = refprotocol.refer(interfaceClass, url);//獲取invoker對象 163 if (logger.isInfoEnabled()) { 164 logger.info("Using injvm service " + interfaceClass.getName()); 165 } 166 } else { 167 if (url != null && url.length() > 0) { // 用戶指定URL,指定的URL可能是對點對直連地址,也可能是注冊中心URL 168 String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); 169 if (us != null && us.length > 0) { 170 for (String u : us) { 171 URL url = URL.valueOf(u); 172 if (url.getPath() == null || url.getPath().length() == 0) { 173 url = url.setPath(interfaceName); 174 } 175 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 176 urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); 177 } else { 178 urls.add(ClusterUtils.mergeUrl(url, map)); 179 } 180 } 181 } 182 } else { // 通過注冊中心配置拼裝URL 183 List<URL> us = loadRegistries(false); 184 if (us != null && us.size() > 0) { 185 for (URL u : us) { 186 URL monitorUrl = loadMonitor(u); 187 if (monitorUrl != null) { 188 map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); 189 } 190 urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); 191 } 192 } 193 if (urls == null || urls.size() == 0) { 194 throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", 195 please config <dubbo:registry address=\"...\" /> to your spring config."); 196 } 197 } 198 for(URL invo : urls){ 199 System.out.println("invoker's url : "+invo.toFullString()); 200 } 201 if (urls.size() == 1) { 202 invoker = refprotocol.refer(interfaceClass, urls.get(0));//獲取invoker對象 203 } else { 204 List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); 205 URL registryURL = null; 206 for (URL url : urls) { 207 invokers.add(refprotocol.refer(interfaceClass, url)); 208 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 209 registryURL = url; // 用了最后一個registry url 210 } 211 } 212 if (registryURL != null) { // 有 注冊中心協議的URL 213 // 對有注冊中心的Cluster 只用 AvailableCluster 214 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 215 invoker = cluster.join(new StaticDirectory(u, invokers)); 216 } else { // 不是 注冊中心的URL 217 invoker = cluster.join(new StaticDirectory(invokers)); 218 } 219 220 } 221 } 222 System.out.println("real invoker's url :"+invoker.getUrl().toFullString()); 223 Boolean c = check; 224 if (c == null && consumer != null) { 225 c = consumer.isCheck(); 226 } 227 if (c == null) { 228 c = true; // default true 229 } 230 if (c && ! invoker.isAvailable()) { 231 throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + 232 (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() 233 + 234 " use dubbo version " + Version.getVersion()); 235 } 236 if (logger.isInfoEnabled()) { 237 logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); 238 } 239 // 創建服務代理 240 return (T) proxyFactory.getProxy(invoker); 241 }
創建代理的過程:
1. 獲取消費者配置
2. 獲取配置的注冊中心,通過配置中心配置拼裝URL,線上應該是個配置中心集群
3. 遍歷注冊中心List<URL>集合
加載監控中心URL,如果配置了監控中心在注冊中心url加上MONITOR_KEY
根據配置的引用服務參數給注冊中URL上加上REFER_KEY
4. 遍歷注冊中心List<URL>集合,這里注冊中心url包含了monitorUrl和referUrl
protocol.refer(interface,url)調用protocol引用服務返回invoker可執行對象(這個invoker並不是簡單的DubboInvoker, 而是由RegistryProtocol構建基於目錄服務的集群策略Invoker, 這個invoker可以通過目錄服務list出真正可調用的遠程服務invoker)
對於注冊中心Url設置集群策略為AvailableCluster, 由AvailableCluster將所有對象注冊中調用的invoker偽裝成一個invoker
5. 通過代理工廠創建遠程服務代理返回給使用着proxyFactory.getProxy(invoker);
procotol.refer(interface, url) 引用服務的過程
1. 經過ProtocolListenerWrapper, ProtocolFilterWrapper由於是注冊中心url調用RegistryProtocol.refer
2. 獲取注冊中心協議zookeeper, Redis, 還是dubbo, 並根據注冊中心協議通過注冊器工廠RegistryFactory.getRegistry(url)獲取注冊器Registry用來跟注冊中心交互
3. 根據配置的group分組
4. 創建注冊服務目錄RegistryDirectory並設置注冊器
5. 構建訂閱服務的subscribeUrl
6. 通過注冊器registry向注冊中心注冊subscribeUrl消費端url
7. 目錄服務registryDirectory.subscribe(subscribeUrl)訂閱服務(這里我們以開源版本zookeeper為注冊中心為例來講解, dubbo協議的注冊中心有點不一樣)
其實內部也是通過注冊器registry.subscribe(url,this) 這里this就是registryDirectory它實現了NotifyListener。
服務提供者在向zookeeper注冊服務/dubbo/com.alibaba.dubbo.demo.DemoService/providers/節點下寫下自己的URL地址
服務消費者向zookeepr注冊服務/dubbo/com.alibaba.dubbo.demo.DemoService/consumers/節點下寫下自己的URL地址
服務消費者向zookeeper 訂閱服務/dubbo/com.alibaba.dubbo.demo.DemoService/ providers /節點下所有服務提供者URL地址
Zookeeper通過watcher機制實現對節點的監聽,節點數據變化通過節點上的watcher回調客戶端, 重新生成對服務的refer
在訂閱的過程中通過獲取/dubbo/com.alibaba.dubbo.demo.DemoService/providers /下的所有服務提供者的urls(類似dubbo://10.33. 37.8:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&owner=william&pid=7356&side=consumer×tamp=1416971340626),主動回調NotifyListener來根據urls生成對服務提供者的引用生成可執行invokers,供目錄服務持有着,
看下如下RegistryDirectory.notify(urls)方法中的代碼實現
8. 通過cluster.join(directory) 合並invoker並提供集群調用策略
DubboProtocol.refer過程:
1. 經過ProtocolListenerWrapper, ProtocolFilterWrapper構建監聽器鏈和過濾器鏈。
2. DubboProtocol根據url獲取ExchangeClient對象,如果是share存在就返回不存在創建新對象不是share直接創建。ExchangeClient是底層通信的客戶端,對於通信層的創建功能不在這里講解。
3. 創建DubboInvoker, 這個invoker對象包含對遠程服務提供者的長鏈接,是真正執行遠程服務調用的可執行對象
4. 將創建的invoker返回給目錄服務
官方文檔的應用服務的序列圖
引用服務活動圖: