一、前言
前面講了服務是如何導出到注冊中心的。其實Dubbo做的一件事就是將服務的URL發布到注冊中心上。那現在我們聊一聊消費者一方如何從注冊中心訂閱服務並進行遠程調用的。
二、引用服務時序圖
首先總的來用文字說一遍內部的大致機制

Actor:可以當做我們的消費者。當我們使用@Reference注解將對應服務注入到其他類中這時候Spring會第一時間調用getObject方法,而getObject中只有一個方法就是get()。這里可以理解為消費者開始引入服務了。
餓漢式:在 Spring 容器調用 ReferenceBean 的 afterPropertiesSet 方法時引用服務。
懶漢式:在 ReferenceBean 對應的服務被注入到其他類中時引用。Dubbo默認使用懶漢式。
ReferenceConfig:通過get方法其實是進入到ReferenceConfig類中執行init()方法。在這個方法里主要做了下面幾件事情:
1,、對@Reference標注的接口查看是否合法,檢查該接口是不是存在泛型
2、在系統中拿到dubbo.resolve.file這個文件,這個文件是進行配置consumer的接口的。將配置好的consumer信息存到URL中
3、將配置好的ApplicationConfig、ConsumerConfig、ReferenceConfig、MethodConfig,以及消費者的IP地址存到系統的上下文中
4、接下來開始創建代理對象進入到ReferenceConfig的createProxy 。這里還是在ReferenceConfig類中。上面的那些配置統統傳入該方法中。上面有提到resolve解析consumer為URL,現在就根據這個URL首先判斷是否遠程調用還是本地調用。
4.1若是本地調用,則調用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 實例
4.2若是遠程調用,則讀取直連配置項,或注冊中心 url,並將讀取到的 url 存儲到 urls 中。然后根據 urls 元素數量進行后續操作。若 urls 元素數量為1,則直接通過 Protocol 自適應拓展類即RegistryProtocol類或者DubboProtocol構建 Invoker 實例接口,這得看URL前面的是registry://開頭還是以dubbo://。若 urls 元素數量大於1,即存在多個注冊中心或服務直連 url,此時先根據 url 構建 Invoker。然后再通過 Cluster 合並即merge多個 Invoker,最后調用 ProxyFactory 生成代理類。
RegistryProtocol:在refer方法中首先為 url 設置協議頭,然后根據 url 參數加載注冊中心實例。然后獲取 group 配置,根據 group 配置決定 doRefer 第一個參數的類型。doRefer 方法創建一個 RegistryDirectory 實例,然后生成服務消費者鏈接,通過registry.register方法向注冊中心注冊消費者的鏈接,然后通過directory.subscribe向注冊中心訂閱 providers、configurators、routers 等節點下的數據。完成訂閱后,RegistryDirectory 會收到這幾個節點下的子節點信息。由於一個服務可能部署在多台服務器上,這樣就會在 providers 產生多個節點,這個時候就需要 Cluster 將多個服務節點合並為一個,並生成一個 Invoker。同樣Invoker創建過程先不分析,后面會拿一章專門介紹。
ProxyFactory:Invoker 創建完畢后,接下來要做的事情是為服務接口生成代理對象。有了代理對象,即可進行遠程調用。代理對象生成的入口方法為的getProxy。獲取需要創建的接口列表,組合成數組。而后將該接口數組傳入 Proxy 的 getProxy 方法獲取 Proxy 子類,然后創建 InvokerInvocationHandler 對象,並將該對象傳給 newInstance 生成 Proxy 實例。InvokerInvocationHandler 實現 JDK 的 InvocationHandler 接口,具體的用途是攔截接口類調用。可以理解為AOP或攔截器。也就是在獲取該對象之前會調用到Proxy實例而不會調用到服務提供者對應的類。至於如何創建proxy實例,請看后面源碼的注釋。
三、Dubbo源碼
服務引用入口源碼ReferenceBean的getObject方法:
1 public Object getObject() throws Exception { 2 return get(); 3 } 4 5 public synchronized T get() { 6 if (destroyed) { 7 throw new IllegalStateException("Already destroyed!"); 8 } 9 // 檢測 ref 是否為空,為空則通過 init 方法創建 10 if (ref == null) { 11 // init 方法主要用於處理配置,以及調用 createProxy 生成代理類 12 init(); 13 } 14 return ref; 15 }
ReferenceConfig 的 init 進行消費者一方的配置:
對源碼進行了分割,方便理清邏輯
1 private void init() { 2 // 避免重復初始化 3 if (initialized) { 4 return; 5 } 6 initialized = true; 7 // 檢測接口名合法性 8 if (interfaceName == null || interfaceName.length() == 0) { 9 throw new IllegalStateException("interface not allow null!"); 10 } 11 12 // 檢測 consumer 變量是否為空,為空則創建 13 checkDefault(); 14 appendProperties(this); 15 if (getGeneric() == null && getConsumer() != null) { 16 // 設置 generic 17 setGeneric(getConsumer().getGeneric()); 18 } 19 20 // 檢測是否為泛化接口 21 if (ProtocolUtils.isGeneric(getGeneric())) { 22 interfaceClass = GenericService.class; 23 } else { 24 try { 25 // 加載類 26 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() 27 .getContextClassLoader()); 28 } catch (ClassNotFoundException e) { 29 throw new IllegalStateException(e.getMessage(), e); 30 } 31 checkInterfaceAndMethods(interfaceClass, methods); 32 } 33 34 // -------------------------------分割線1------------------------------ 35 36 // 從系統變量中獲取與接口名對應的屬性值 37 String resolve = System.getProperty(interfaceName); 38 String resolveFile = null; 39 if (resolve == null || resolve.length() == 0) { 40 // 從系統屬性中獲取解析文件路徑 41 resolveFile = System.getProperty("dubbo.resolve.file"); 42 if (resolveFile == null || resolveFile.length() == 0) { 43 // 從指定位置加載配置文件 44 File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); 45 if (userResolveFile.exists()) { 46 // 獲取文件絕對路徑 47 resolveFile = userResolveFile.getAbsolutePath(); 48 } 49 } 50 if (resolveFile != null && resolveFile.length() > 0) { 51 Properties properties = new Properties(); 52 FileInputStream fis = null; 53 try { 54 fis = new FileInputStream(new File(resolveFile)); 55 // 從文件中加載配置 56 properties.load(fis); 57 } catch (IOException e) { 58 throw new IllegalStateException("Unload ..., cause:..."); 59 } finally { 60 try { 61 if (null != fis) fis.close(); 62 } catch (IOException e) { 63 logger.warn(e.getMessage(), e); 64 } 65 } 66 // 獲取與接口名對應的配置 67 resolve = properties.getProperty(interfaceName); 68 } 69 } 70 if (resolve != null && resolve.length() > 0) { 71 // 將 resolve 賦值給 url 72 url = resolve; 73 } 74 75 // -------------------------------分割線2------------------------------ 76 if (consumer != null) { 77 if (application == null) { 78 // 從 consumer 中獲取 Application 實例,下同 79 application = consumer.getApplication(); 80 } 81 if (module == null) { 82 module = consumer.getModule(); 83 } 84 if (registries == null) { 85 registries = consumer.getRegistries(); 86 } 87 if (monitor == null) { 88 monitor = consumer.getMonitor(); 89 } 90 } 91 if (module != null) { 92 if (registries == null) { 93 registries = module.getRegistries(); 94 } 95 if (monitor == null) { 96 monitor = module.getMonitor(); 97 } 98 } 99 if (application != null) { 100 if (registries == null) { 101 registries = application.getRegistries(); 102 } 103 if (monitor == null) { 104 monitor = application.getMonitor(); 105 } 106 } 107 108 // 檢測 Application 合法性 109 checkApplication(); 110 // 檢測本地存根配置合法性 111 checkStubAndMock(interfaceClass); 112 113 // -------------------------------分割線3------------------------------ 114 115 Map<String, String> map = new HashMap<String, String>(); 116 Map<Object, Object> attributes = new HashMap<Object, Object>(); 117 118 // 添加 side、協議版本信息、時間戳和進程號等信息到 map 中 119 map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); 120 map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); 121 map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); 122 if (ConfigUtils.getPid() > 0) { 123 map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); 124 } 125 126 // 非泛化服務 127 if (!isGeneric()) { 128 // 獲取版本 129 String revision = Version.getVersion(interfaceClass, version); 130 if (revision != null && revision.length() > 0) { 131 map.put("revision", revision); 132 } 133 134 // 獲取接口方法列表,並添加到 map 中 135 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); 136 if (methods.length == 0) { 137 map.put("methods", Constants.ANY_VALUE); 138 } else { 139 map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); 140 } 141 } 142 map.put(Constants.INTERFACE_KEY, interfaceName); 143 // 將 ApplicationConfig、ConsumerConfig、ReferenceConfig 等對象的字段信息添加到 map 中 144 appendParameters(map, application); 145 appendParameters(map, module); 146 appendParameters(map, consumer, Constants.DEFAULT_KEY); 147 appendParameters(map, this); 148 149 // -------------------------------分割線4------------------------------ 150 151 String prefix = StringUtils.getServiceKey(map); 152 if (methods != null && !methods.isEmpty()) { 153 // 遍歷 MethodConfig 列表 154 for (MethodConfig method : methods) { 155 appendParameters(map, method, method.getName()); 156 String retryKey = method.getName() + ".retry"; 157 // 檢測 map 是否包含 methodName.retry 158 if (map.containsKey(retryKey)) { 159 String retryValue = map.remove(retryKey); 160 if ("false".equals(retryValue)) { 161 // 添加重試次數配置 methodName.retries 162 map.put(method.getName() + ".retries", "0"); 163 } 164 } 165 166 // 添加 MethodConfig 中的“屬性”字段到 attributes 167 // 比如 onreturn、onthrow、oninvoke 等 168 appendAttributes(attributes, method, prefix + "." + method.getName()); 169 checkAndConvertImplicitConfig(method, map, attributes); 170 } 171 } 172 173 // -------------------------------✨ 分割線5 ✨------------------------------ 174 175 // 獲取服務消費者 ip 地址 176 String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY); 177 if (hostToRegistry == null || hostToRegistry.length() == 0) { 178 hostToRegistry = NetUtils.getLocalHost(); 179 } else if (isInvalidLocalHost(hostToRegistry)) { 180 throw new IllegalArgumentException("Specified invalid registry ip from property..." ); 181 } 182 map.put(Constants.REGISTER_IP_KEY, hostToRegistry); 183 184 // 存儲 attributes 到系統上下文中 185 StaticContext.getSystemContext().putAll(attributes); 186 187 // 創建代理類 188 ref = createProxy(map); 189 190 // 根據服務名,ReferenceConfig,代理類構建 ConsumerModel, 191 // 並將 ConsumerModel 存入到 ApplicationModel 中 192 ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods()); 193 ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel); 194 }
ReferenceConfig 的 createProxy 創建代理對象:
但是不是在這個方法內創建proxy實例,而是對URL進行解析后分三種創建Invoker線路,包括InjvmProtocol中的refer、DubboProtocol的refer與RegistryProtocol中的refer,最后再調用ProxyFactory來對proxy實例進行創建:
1 private T createProxy(Map<String, String> map) { 2 URL tmpUrl = new URL("temp", "localhost", 0, map); 3 final boolean isJvmRefer; 4 if (isInjvm() == null) { 5 // url 配置被指定,則不做本地引用 6 if (url != null && url.length() > 0) { 7 isJvmRefer = false; 8 // 根據 url 的協議、scope 以及 injvm 等參數檢測是否需要本地引用 9 // 比如如果用戶顯式配置了 scope=local,此時 isInjvmRefer 返回 true 10 } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { 11 isJvmRefer = true; 12 } else { 13 isJvmRefer = false; 14 } 15 } else { 16 // 獲取 injvm 配置值 17 isJvmRefer = isInjvm().booleanValue(); 18 } 19 20 // 本地引用 21 if (isJvmRefer) { 22 // 生成本地引用 URL,協議為 injvm 23 URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); 24 // 調用 refer 方法構建 InjvmInvoker 實例 25 invoker = refprotocol.refer(interfaceClass, url); 26 27 // 遠程引用 28 } else { 29 // url 不為空,表明用戶可能想進行點對點調用 30 if (url != null && url.length() > 0) { 31 // 當需要配置多個 url 時,可用分號進行分割,這里會進行切分 32 String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); 33 if (us != null && us.length > 0) { 34 for (String u : us) { 35 URL url = URL.valueOf(u); 36 if (url.getPath() == null || url.getPath().length() == 0) { 37 // 設置接口全限定名為 url 路徑 38 url = url.setPath(interfaceName); 39 } 40 41 // 檢測 url 協議是否為 registry,若是,表明用戶想使用指定的注冊中心 42 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 43 // 將 map 轉換為查詢字符串,並作為 refer 參數的值添加到 url 中 44 urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); 45 } else { 46 // 合並 url,移除服務提供者的一些配置(這些配置來源於用戶配置的 url 屬性), 47 // 比如線程池相關配置。並保留服務提供者的部分配置,比如版本,group,時間戳等 48 // 最后將合並后的配置設置為 url 查詢字符串中。 49 urls.add(ClusterUtils.mergeUrl(url, map)); 50 } 51 } 52 } 53 } else { 54 // 加載注冊中心 url 55 List<URL> us = loadRegistries(false); 56 if (us != null && !us.isEmpty()) { 57 for (URL u : us) { 58 URL monitorUrl = loadMonitor(u); 59 if (monitorUrl != null) { 60 map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); 61 } 62 // 添加 refer 參數到 url 中,並將 url 添加到 urls 中 63 urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); 64 } 65 } 66 67 // 未配置注冊中心,拋出異常 68 if (urls.isEmpty()) { 69 throw new IllegalStateException("No such any registry to reference..."); 70 } 71 } 72 73 // 單個注冊中心或服務提供者(服務直連,下同) 74 if (urls.size() == 1) { 75 // 調用 RegistryProtocol 的 refer 構建 Invoker 實例 76 invoker = refprotocol.refer(interfaceClass, urls.get(0)); 77 78 // 多個注冊中心或多個服務提供者,或者兩者混合 79 } else { 80 List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); 81 URL registryURL = null; 82 83 // 獲取所有的 Invoker 84 for (URL url : urls) { 85 // 通過 refprotocol 調用 refer 構建 Invoker,refprotocol 會在運行時 86 // 根據 url 協議頭加載指定的 Protocol 實例,並調用實例的 refer 方法 87 invokers.add(refprotocol.refer(interfaceClass, url)); 88 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 89 registryURL = url; 90 } 91 } 92 if (registryURL != null) { 93 // 如果注冊中心鏈接不為空,則將使用 AvailableCluster 94 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 95 // 創建 StaticDirectory 實例,並由 Cluster 對多個 Invoker 進行合並 96 invoker = cluster.join(new StaticDirectory(u, invokers)); 97 } else { 98 invoker = cluster.join(new StaticDirectory(invokers)); 99 } 100 } 101 } 102 103 Boolean c = check; 104 if (c == null && consumer != null) { 105 c = consumer.isCheck(); 106 } 107 if (c == null) { 108 c = true; 109 } 110 111 // invoker 可用性檢查 112 if (c && !invoker.isAvailable()) { 113 throw new IllegalStateException("No provider available for the service..."); 114 } 115 116 // 生成代理類 117 return (T) proxyFactory.getProxy(invoker); 118 }
同樣Invoker的創建后面會專門拿一篇來講。暫時先把Invoker創建看成一個黑盒,只要我們調用即可。
RegistryProtocol中的refer:
1 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 2 // 取 registry 參數值,並將其設置為協議頭 3 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); 4 // 獲取注冊中心實例 5 Registry registry = registryFactory.getRegistry(url); 6 if (RegistryService.class.equals(type)) { 7 return proxyFactory.getInvoker((T) registry, type, url); 8 } 9 10 // 將 url 查詢字符串轉為 Map 11 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); 12 // 獲取 group 配置 13 String group = qs.get(Constants.GROUP_KEY); 14 if (group != null && group.length() > 0) { 15 if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 16 || "*".equals(group)) { 17 // 通過 SPI 加載 MergeableCluster 實例,並調用 doRefer 繼續執行服務引用邏輯 18 return doRefer(getMergeableCluster(), registry, type, url); 19 } 20 } 21 22 // 調用 doRefer 繼續執行服務引用邏輯 23 return doRefer(cluster, registry, type, url); 24 } 25 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { 26 // 創建 RegistryDirectory 實例 27 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); 28 // 設置注冊中心和協議 29 directory.setRegistry(registry); 30 directory.setProtocol(protocol); 31 Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); 32 // 生成服務消費者鏈接 33 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); 34 35 // 注冊服務消費者,在 consumers 目錄下新節點 36 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) 37 && url.getParameter(Constants.REGISTER_KEY, true)) { 38 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, 39 Constants.CHECK_KEY, String.valueOf(false))); 40 } 41 42 // 訂閱 providers、configurators、routers 等節點數據 43 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 44 Constants.PROVIDERS_CATEGORY 45 + "," + Constants.CONFIGURATORS_CATEGORY 46 + "," + Constants.ROUTERS_CATEGORY)); 47 48 // 一個注冊中心可能有多個服務提供者,因此這里需要將多個服務提供者合並為一個 49 Invoker invoker = cluster.join(directory); 50 ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); 51 return invoker; 52 }
在Invoker創建完后會返回到ReferenceConfig中,然后進入ProxyFactory中的getProxy方法。
ProxyFactory中的getProxy方法:
1 public <T> T getProxy(Invoker<T> invoker) throws RpcException { 2 // 調用重載方法 3 return getProxy(invoker, false); 4 } 5 6 public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException { 7 Class<?>[] interfaces = null; 8 // 獲取接口列表 9 String config = invoker.getUrl().getParameter("interfaces"); 10 if (config != null && config.length() > 0) { 11 // 切分接口列表 12 String[] types = Constants.COMMA_SPLIT_PATTERN.split(config); 13 if (types != null && types.length > 0) { 14 interfaces = new Class<?>[types.length + 2]; 15 // 設置服務接口類和 EchoService.class 到 interfaces 中 16 interfaces[0] = invoker.getInterface(); 17 interfaces[1] = EchoService.class; 18 for (int i = 0; i < types.length; i++) { 19 // 加載接口類 20 interfaces[i + 1] = ReflectUtils.forName(types[i]); 21 } 22 } 23 } 24 if (interfaces == null) { 25 interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class}; 26 } 27 28 // 為 http 和 hessian 協議提供泛化調用支持,參考 pull request #1827 29 if (!invoker.getInterface().equals(GenericService.class) && generic) { 30 int len = interfaces.length; 31 Class<?>[] temp = interfaces; 32 // 創建新的 interfaces 數組 33 interfaces = new Class<?>[len + 1]; 34 System.arraycopy(temp, 0, interfaces, 0, len); 35 // 設置 GenericService.class 到數組中 36 interfaces[len] = GenericService.class; 37 } 38 39 // 調用重載方法 40 return getProxy(invoker, interfaces); 41 } 42 43 public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
在上面的代碼主要是獲取接口數組再通過抽象的getProxy進入到Proxy中的getProxy。
1 public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { 2 // 生成 Proxy 子類(Proxy 是抽象類)。並調用 Proxy 子類的 newInstance 方法創建 Proxy 實例 3 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); 4 }
代碼具體分析上面有寫。接下來進入到Proxy子類的getProxy中
1 public static Proxy getProxy(Class<?>... ics) { 2 // 調用重載方法 3 return getProxy(ClassHelper.getClassLoader(Proxy.class), ics); 4 } 5 6 public static Proxy getProxy(ClassLoader cl, Class<?>... ics) { 7 if (ics.length > 65535) 8 throw new IllegalArgumentException("interface limit exceeded"); 9 10 StringBuilder sb = new StringBuilder(); 11 // 遍歷接口列表 12 for (int i = 0; i < ics.length; i++) { 13 String itf = ics[i].getName(); 14 // 檢測類型是否為接口 15 if (!ics[i].isInterface()) 16 throw new RuntimeException(itf + " is not a interface."); 17 18 Class<?> tmp = null; 19 try { 20 // 重新加載接口類 21 tmp = Class.forName(itf, false, cl); 22 } catch (ClassNotFoundException e) { 23 } 24 25 // 檢測接口是否相同,這里 tmp 有可能為空 26 if (tmp != ics[i]) 27 throw new IllegalArgumentException(ics[i] + " is not visible from class loader"); 28 29 // 拼接接口全限定名,分隔符為 ; 30 sb.append(itf).append(';'); 31 } 32 33 // 使用拼接后的接口名作為 key 34 String key = sb.toString(); 35 36 Map<String, Object> cache; 37 synchronized (ProxyCacheMap) { 38 cache = ProxyCacheMap.get(cl); 39 if (cache == null) { 40 cache = new HashMap<String, Object>(); 41 ProxyCacheMap.put(cl, cache); 42 } 43 } 44 45 Proxy proxy = null; 46 synchronized (cache) { 47 do { 48 // 從緩存中獲取 Reference<Proxy> 實例 49 Object value = cache.get(key); 50 if (value instanceof Reference<?>) { 51 proxy = (Proxy) ((Reference<?>) value).get(); 52 if (proxy != null) { 53 return proxy; 54 } 55 } 56 57 // 並發控制,保證只有一個線程可以進行后續操作 58 if (value == PendingGenerationMarker) { 59 try { 60 // 其他線程在此處進行等待 61 cache.wait(); 62 } catch (InterruptedException e) { 63 } 64 } else { 65 // 放置標志位到緩存中,並跳出 while 循環進行后續操作 66 cache.put(key, PendingGenerationMarker); 67 break; 68 } 69 } 70 while (true); 71 } 72 73 long id = PROXY_CLASS_COUNTER.getAndIncrement(); 74 String pkg = null; 75 ClassGenerator ccp = null, ccm = null; 76 try { 77 // 創建 ClassGenerator 對象 78 ccp = ClassGenerator.newInstance(cl); 79 80 Set<String> worked = new HashSet<String>(); 81 List<Method> methods = new ArrayList<Method>(); 82 83 for (int i = 0; i < ics.length; i++) { 84 // 檢測接口訪問級別是否為 protected 或 privete 85 if (!Modifier.isPublic(ics[i].getModifiers())) { 86 // 獲取接口包名 87 String npkg = ics[i].getPackage().getName(); 88 if (pkg == null) { 89 pkg = npkg; 90 } else { 91 if (!pkg.equals(npkg)) 92 // 非 public 級別的接口必須在同一個包下,否者拋出異常 93 throw new IllegalArgumentException("non-public interfaces from different packages"); 94 } 95 } 96 97 // 添加接口到 ClassGenerator 中 98 ccp.addInterface(ics[i]); 99 100 // 遍歷接口方法 101 for (Method method : ics[i].getMethods()) { 102 // 獲取方法描述,可理解為方法簽名 103 String desc = ReflectUtils.getDesc(method); 104 // 如果方法描述字符串已在 worked 中,則忽略。考慮這種情況, 105 // A 接口和 B 接口中包含一個完全相同的方法 106 if (worked.contains(desc)) 107 continue; 108 worked.add(desc); 109 110 int ix = methods.size(); 111 // 獲取方法返回值類型 112 Class<?> rt = method.getReturnType(); 113 // 獲取參數列表 114 Class<?>[] pts = method.getParameterTypes(); 115 116 // 生成 Object[] args = new Object[1...N] 117 StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];"); 118 for (int j = 0; j < pts.length; j++) 119 // 生成 args[1...N] = ($w)$1...N; 120 code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";"); 121 // 生成 InvokerHandler 接口的 invoker 方法調用語句,如下: 122 // Object ret = handler.invoke(this, methods[1...N], args); 123 code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);"); 124 125 // 返回值不為 void 126 if (!Void.TYPE.equals(rt)) 127 // 生成返回語句,形如 return (java.lang.String) ret; 128 code.append(" return ").append(asArgument(rt, "ret")).append(";"); 129 130 methods.add(method); 131 // 添加方法名、訪問控制符、參數列表、方法代碼等信息到 ClassGenerator 中 132 ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString()); 133 } 134 } 135 136 if (pkg == null) 137 pkg = PACKAGE_NAME; 138 139 // 構建接口代理類名稱:pkg + ".proxy" + id,比如 org.apache.dubbo.proxy0 140 String pcn = pkg + ".proxy" + id; 141 ccp.setClassName(pcn); 142 ccp.addField("public static java.lang.reflect.Method[] methods;"); 143 // 生成 private java.lang.reflect.InvocationHandler handler; 144 ccp.addField("private " + InvocationHandler.class.getName() + " handler;"); 145 146 // 為接口代理類添加帶有 InvocationHandler 參數的構造方法,比如: 147 // porxy0(java.lang.reflect.InvocationHandler arg0) { 148 // handler=$1; 149 // } 150 ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;"); 151 // 為接口代理類添加默認構造方法 152 ccp.addDefaultConstructor(); 153 154 // 生成接口代理類 155 Class<?> clazz = ccp.toClass(); 156 clazz.getField("methods").set(null, methods.toArray(new Method[0])); 157 158 // 構建 Proxy 子類名稱,比如 Proxy1,Proxy2 等 159 String fcn = Proxy.class.getName() + id; 160 ccm = ClassGenerator.newInstance(cl); 161 ccm.setClassName(fcn); 162 ccm.addDefaultConstructor(); 163 ccm.setSuperClass(Proxy.class); 164 // 為 Proxy 的抽象方法 newInstance 生成實現代碼,形如: 165 // public Object newInstance(java.lang.reflect.InvocationHandler h) { 166 // return new org.apache.dubbo.proxy0($1); 167 // } 168 ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }"); 169 // 生成 Proxy 實現類 170 Class<?> pc = ccm.toClass(); 171 // 通過反射創建 Proxy 實例 172 proxy = (Proxy) pc.newInstance(); 173 } catch (RuntimeException e) { 174 throw e; 175 } catch (Exception e) { 176 throw new RuntimeException(e.getMessage(), e); 177 } finally { 178 if (ccp != null) 179 // 釋放資源 180 ccp.release(); 181 if (ccm != null) 182 ccm.release(); 183 synchronized (cache) { 184 if (proxy == null) 185 cache.remove(key); 186 else 187 // 寫緩存 188 cache.put(key, new WeakReference<Proxy>(proxy)); 189 // 喚醒其他等待線程 190 cache.notifyAll(); 191 } 192 } 193 return proxy; 194 }
ccp 用於為服務接口生成代理類,比如我們有一個 DemoService 接口,這個接口代理類就是由 ccp 生成的。ccm 則是用於為 org.apache.dubbo.common.bytecode.Proxy 抽象類生成子類,主要是實現 Proxy 類的抽象方法。
這里需要重點講一下,因為用到了並發控制。機制是這樣的,synchronized中首先獲取緩存中 Reference<Proxy> 實例。因為緩存是HashMap結構來存取。key是Reference<Proxy> 實例對應的接口名稱,value就是Reference<Proxy> 實例,注意的是接口列表進行拼接了。當第一個線程進入時,key對應的是實例而不是PendingGenerationMarker。所以會進入到else中,else中則設置key的對應的value為標志位PendingGenerationMarker。這樣其他線程只能等待,而后對服務接口生產代理類和抽象類的子類。在最后釋放資源時,會喚醒其他線程,並且把已經生成過的Reference實例標志成弱引用對象,代表可以回收了。(注:弱引用,比軟引用更弱一點,被弱引用關聯的對象只能生存到下一次垃圾收集發生之前。當垃圾收集發生時無論內存是否足夠,都會回收弱引用對象。具體可以看JVM垃圾回收算法)
四、總結:
到這里應該算是講完了Dubbo內的服務引用機制。對於Invoker后面會再單獨講。這里還要補充一句如果是集群的話會啟動服務降級以及負載均衡每次只選擇一個Invoker調用,同樣這個后面會再做單獨介紹。
