【Dubbo源碼閱讀系列】服務暴露之本地暴露


在上一篇文章中我們介紹 Dubbo 自定義標簽解析相關內容,其中我們自定義的 XML 標簽 <dubbo:service /> 會被解析為 ServiceBean 對象(傳送門:Dubbo XML 配置加載)。今天我們講述的內容和 ServiceBean 密切相關!
細心的讀者在閱讀 ServiceBean 類時會發現 onApplicationEvent() 方法和 afterPropertiesSet() 方法調用了一個共同的方法 export()。直覺告訴我們這個方法應該和服務的暴露有關,我們接下來就
從 export() 方法入手分析。

export()方法調用時機

為了解答 export() 調用時機問題,我們需要關注 ServiceBean 類中的三個方法

  1. setApplicationContext(ApplicationContext applicationContext)
    ServiceBean 實現了 ApplicationContextAware 接口,在 ServiceBean 初始化后,會調用 setApplicationContext 注入 Spring 上下文;
  2. afterPropertiesSet()
    注入 ApplicationConfig、registries、protocols 等屬性;
  3. onApplicationEvent(ContextRefreshedEvent event)
    這里接受的 event 事件類型為 ContextRefreshedEvent。當 applicationContext 被初始化或者刷新時,會調用該方法。
    這三個方法在 Spring 生命周期中被調用的順序大致如下圖所示
    setApplicationContext()——> afterPropertiesSet() ——> onApplicationEvent()
    我們結合代碼繼續看
public void setApplicationContext(ApplicationContext applicationContext) {
    this.applicationContext = applicationContext;
    SpringExtensionFactory.addApplicationContext(applicationContext);
    supportedApplicationListener = addApplicationListener(applicationContext, this);
}

public void onApplicationEvent(ContextRefreshedEvent event) {
    if (!isExported() && !isUnexported()) {
    	if (logger.isInfoEnabled()) {
    		logger.info("The service ready on spring started. service: " + getInterface());
    	}
    	export();
    }
}

public void afterPropertiesSet() throws Exception {
    // 省略...
    if (!supportedApplicationListener) {
    	export();
    }
}

代碼執行邏輯大致如下:

  1. 首先執行 setApplicationContext() 方法,注入上下文。這里的 supportedApplicationListener 用於判斷 Spring 是否支持 Spring 監聽機制。
  2. 執行 afterPropertiesSet() 方法。如果 supportedApplicationListener 值為 false,調用 export() 方法。
  3. 執行 onApplicationEvent() 方法。如果沒有執行過 export() 以及 unexport() 方法,調用 export() 方法。
    通過上面簡單的分析我們可以看到 export() 方法只會在 onApplicationEvent() 和 export() 方法中調用一次。

export() 方法解析

public synchronized void export() {
	if (provider != null) {
		if (export == null) {
			export = provider.getExport();
		}
		if (delay == null) {
			delay = provider.getDelay();
		}
	}
	if (export != null && !export) {
		return;
	}

	if (delay != null && delay > 0) {
		delayExportExecutor.schedule(new Runnable() {
			@Override
			public void run() {
				doExport();
			}
		}, delay, TimeUnit.MILLISECONDS);
	} else {
		doExport();
	}
}

export()方法比較簡單。注意這里有個 delay 變量,我們可以使用該變量延遲執行 export() 方法。
繼續看 doExport() 方法

protected synchronized void doExport() {
	// 省略...
	doExportUrls();
	ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), ref, interfaceClass);
	ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}

private void doExportUrls() {
	List<URL> registryURLs = loadRegistries(true);
	for (ProtocolConfig protocolConfig : protocols) {
		doExportUrlsFor1Protocol(protocolConfig, registryURLs);
	}
}

doExport()方法省略了很多 ServiceBean 配置校驗和初始化代碼。大家有興趣可以自行閱覽。這里直接划重點!!!分析 doExportUrls() 方法!!!
先看 loadRegistries() 方法:

loadRegistries()

protected List<URL> loadRegistries(boolean provider) {
	checkRegistry();
	List<URL> registryList = new ArrayList<URL>();
	// registries 在 afterPropertiesSet() 方法中初始化
	if (registries != null && !registries.isEmpty()) {
		for (RegistryConfig config : registries) {
			String address = config.getAddress();
			if (address == null || address.length() == 0) {
				address = Constants.ANYHOST_VALUE;
			}
			String sysaddress = System.getProperty("dubbo.registry.address");
			if (sysaddress != null && sysaddress.length() > 0) {
				address = sysaddress;
			}
			if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
				Map<String, String> map = new HashMap<String, String>();
				// 將 application/config 部分屬性整合到 map 中,詳細見:
				appendParameters(map, application);
				appendParameters(map, config);
				map.put("path", RegistryService.class.getName());
				map.put("dubbo", Version.getProtocolVersion());
				map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
				if (ConfigUtils.getPid() > 0) {
					map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
				}
				if (!map.containsKey("protocol")) {
					if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
						map.put("protocol", "remote");
					} else {
						map.put("protocol", "dubbo");
					}
				}
				// 構建 url ,返回結果類似 zookeeper://192.168.0.100:2181/org.apache.dubbo.registry.RegistryService?
				// application=demo-provider&dubbo=2.0.2&pid=22705&qos.port=22222&timestamp=1549005672530
				List<URL> urls = UrlUtils.parseURLs(address, map);
				for (URL url : urls) {
				    // 將此時 url 的 protocol 保存到 registry 參數中
					url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
					// 設置 url protcol 屬性為 registry
					url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
					if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
							|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
						registryList.add(url);
					}
				}
			}
		}
	}
	return registryList;
}

loadRegistries() 用於加載注冊中心。概括來說就是用於解析我們在配置文件中定義的 <dubbo:registry /> 標簽。
checkRegistry() 方法用於校驗注冊中心配置校驗,里面有一些版本兼容的代碼。appendParameters() 方法詳見 appendParameters() 小節。

本地暴露

介紹完 loadRegistries() 方法,我們接着看 doExportUrlsFor1Protocol()。doExportUrlsFor1Protocol() 方法比較長,這里我們挑出和本地暴露相關的內容進行分析。

if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
    // export to local if the config is not remote (export to remote only when config is remote)
    if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
        exportLocal(url);
    }
    if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
        // 遠程暴露相關內容,省略...
    }
}
private void exportLocal(URL url) {
    if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
        URL local = URL.valueOf(url.toFullString())
                .setProtocol(Constants.LOCAL_PROTOCOL)
                .setHost(LOCALHOST)
                .setPort(0);
        Exporter<?> exporter = protocol.export(
                proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
    }
}

看到 exportLocal() 方法,意味着我們已經快要直達本地服務暴露的核心了!更令人按捺不住的是!這里又用到了 Dubbo 中的 SPI 機制(詳見系列第一篇Dubbo SPI)。讓我們看看這里到底做了什么?

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

熟悉的配方熟悉的料,在這里我們獲取了 Protocol 和 ProxyFactory 對應的自適應擴展類。根據方法調用的嵌套邏輯,先來看 ProxyFactory 自適應擴展類 ProxyFactory$Adaptive 的 getInvoker() 方法。

核心方法 proxyFactory.getInvoker()

public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
    public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
        if (arg2 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        org.apache.dubbo.rpc.ProxyFactory extension = null;
        try {
            extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        }catch(Exception e){
            if (count.incrementAndGet() == 1) {
                logger.warn("Failed to find extension named " + extName + " for type org.apache.dubbo.rpc.ProxyFactory, will use default extension javassist instead.", e);
            }
            extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension("javassist");
        }
        return extension.getInvoker(arg0, arg1, arg2);
    }
}

這里我們實際會去調用 StubProxyFactoryWrapper 包裝類的 getInvoker() 方法,如果不明白可以先看下 【Dubbo源碼閱讀系列】之 Dubbo SPI 機制

public class StubProxyFactoryWrapper implements ProxyFactory {
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
        return proxyFactory.getInvoker(proxy, type, url);
    }
}
public class JavassistProxyFactory extends AbstractProxyFactory {
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

結合上面的代碼我們發現,發現最后調用的是 JavassistProxyFactory 類的 getInvoker() 方法。其中 wrapper 是動態生成的代理對象。最后返回一個 AbstractProxyInvoker 對象,doInvoke() 方法會調用 wrapper 代理類的 invokeMethod() 方法,其中 invokeMethod() 方法大概如下所示:

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
    org.apache.dubbo.demo.provider.DemoServiceImpl w;
    try {
        w = ((org.apache.dubbo.demo.provider.DemoServiceImpl) $1);
    } catch (Throwable e) {
        throw new IllegalArgumentException(e);
    }
    try {
        if ("sayHello".equals($2) && $3.length == 1) {
            return ($w) w.sayHello((java.lang.String) $4[0]);
        }
    } catch (Throwable e) {
        throw new java.lang.reflect.InvocationTargetException(e);
    }
    throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.");
}

稍微有一點繞,至少我們已經看完了 proxyFactory.getInvoker() 方法了,我們獲取到了一個包裝了動態代理類的 AbstractProxyInvoker 對象。接下來繼續看 protocol.export() 方法。

核心方法 protocol.export()

public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
    if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
    if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");org.apache.dubbo.common.URL url = arg0.getUrl();
    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    if(extName == null) throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    org.apache.dubbo.rpc.Protocol extension = null;
    try {
        extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
    }catch(Exception e){
        if (count.incrementAndGet() == 1) {
            logger.warn("Failed to find extension named " + extName + " for type org.apache.dubbo.rpc.Protocol, will use default extension dubbo instead.", e);
        }
        extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension("dubbo");
    }
    return extension.export(arg0);
}

由於此時的 url 中 protocol 值為 injvm(url 經過 setProtocol(LOCAL_PROTOCOL) 操作后 protocol 已經更新為 injvm),因此我們這里獲得的擴展類實際為包裝了 InjvmProtocol 的包裝類對象,對 wrapper 類有疑問的可以看下【Dubbo源碼閱讀系列】之 Dubbo SPI 機制
這里會涉及到一個方法 buildInvokerChain() 方,道它用於構建一個調用鏈。
整體調用時序簡圖如下所示:


最后 exportLocal() 方法中獲取到的是一個 InjvmExporter 對象,並將其添加到 ServiceConfig 類的 exporters 集合中。

buildInvokerChain()

ProtocolFilterWrapper.java
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
	Invoker<T> last = invoker;
	List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
	if (!filters.isEmpty()) {
		for (int i = filters.size() - 1; i >= 0; i--) {
			final Filter filter = filters.get(i);
			final Invoker<T> next = last;
			last = new Invoker<T>() {
				// 省略 Invoker 構建代碼...
				@Override
				public Result invoke(Invocation invocation) throws RpcException {
					return filter.invoke(next, invocation);
				}
				// 省略 Invoker 構建代碼...
			};
		}
	}
	return last;
}

buildInvokerChain() 方法用於構建調用鏈,初步瀏覽下來發現調用鏈應該是由 Filter 擴展類構成。那么這些 Filter 擴展類又從何而來呢?這行代碼很關鍵!!!

List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

對於這段代碼我們應該有很強的親切感,但仔細看又稍稍有所不同。實際上被 @Activate 注解標記的擴展類會被加載到 ExtensionLoader 類的 cachedActivates 集合中。
我們在調用 ExtensionLoader 類的 getActivateExtension() 時,會根據我們傳入的 key 和 group 值從 cachedActivates 集合中獲取滿足當前條件的 filter 對象。
拿到 filters 集合后,會用鏈表的形式拼接 filter 調用鏈,舉個例子:
假設當前獲取到的 filters 集合中保存的 filter 對象為 filter0、filter1、filter2。我們對 filters 集合進行倒序遍歷。最后獲得的 last 其實為新建的 ivk2 對象。如果我們調用 last 的 invoke 方法,調用鏈如下圖所示:

End

本文介紹了 Export() 方法被調用的時機以及基本流程。並且花了一定篇幅對 Dubbo 服務本地暴露進行了分析。其中摻雜了不少代碼的分析,可能沒有面面俱到吧。還是建議大家自己自己 Debug 一下,很多東西瞬間秒懂,有助於源碼理解。下一篇文章我們介紹 Dubbo 服務遠程暴露。

appendProperties()

protected static void appendProperties(AbstractConfig config) {
	if (config == null) {
		return;
	}
	// getTagName:獲取去除了 Bean/Config 結尾的小寫類名(ApplicationConfig->application)
	String prefix = "dubbo." + getTagName(config.getClass()) + ".";
	Method[] methods = config.getClass().getMethods();
	for (Method method : methods) {
		try {
			String name = method.getName();
			// 1、方法長度大於3;2、方法以 set 開頭;3、方法修飾符類型為 public;4、形參個數為 1;5、形參類型為基本類型
			if (name.length() > 3 && name.startsWith("set") && Modifier.isPublic(method.getModifiers())
					&& method.getParameterTypes().length == 1 && isPrimitive(method.getParameterTypes()[0])) {
				// camelToSplitName: 舉個例子 ApplicationConfig——>application.config
				String property = StringUtils.camelToSplitName(name.substring(3, 4).toLowerCase() + name.substring(4), ".");

				String value = null;
				if (config.getId() != null && config.getId().length() > 0) {
					// 拼接屬性名稱,並嘗試獲取對應屬性
					String pn = prefix + config.getId() + "." + property;
					value = System.getProperty(pn);
					if (!StringUtils.isBlank(value)) {
						logger.info("Use System Property " + pn + " to config dubbo");
					}
				}
				if (value == null || value.length() == 0) {
					// 比如當前 config 為 ApplicationConfig,pn = dubbo.application.xxx
					String pn = prefix + property;
					value = System.getProperty(pn);
					if (!StringUtils.isBlank(value)) {
						logger.info("Use System Property " + pn + " to config dubbo");
					}
				}
				if (value == null || value.length() == 0) {
					Method getter;
					try {
						getter = config.getClass().getMethod("get" + name.substring(3));
					} catch (NoSuchMethodException e) {
						try {
							getter = config.getClass().getMethod("is" + name.substring(3));
						} catch (NoSuchMethodException e2) {
							getter = null;
						}
					}
					if (getter != null) {
						if (getter.invoke(config) == null) {
							// 嘗試使用 ConfigUtils.getProperty() 方法獲取屬性值
							// 嘗試從 dubbo.properties.file 文件或 dubbo.properties 文件中讀取屬性
							if (config.getId() != null && config.getId().length() > 0) {
								value = ConfigUtils.getProperty(prefix + config.getId() + "." + property);
							}
							if (value == null || value.length() == 0) {
								value = ConfigUtils.getProperty(prefix + property);
							}
							if (value == null || value.length() == 0) {
								String legacyKey = legacyProperties.get(prefix + property);
								if (legacyKey != null && legacyKey.length() > 0) {
									value = convertLegacyValue(legacyKey, ConfigUtils.getProperty(legacyKey));
								}
							}

						}
					}
				}
				if (value != null && value.length() > 0) {
					method.invoke(config, convertPrimitive(method.getParameterTypes()[0], value));
				}
			}
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
		}
	}
}

appendParameters()

protected static void appendParameters(Map<String, String> parameters, Object config) {
	appendParameters(parameters, config, null);
}
protected static void appendParameters(Map<String, String> parameters, Object config, String prefix) {
	if (config == null) {
		return;
	}
	Method[] methods = config.getClass().getMethods();
	// 遍歷 config 類方法集合
	for (Method method : methods) {
		try {
			String name = method.getName();
			// 找到滿足以下的方法:以set/is 開頭,非 getClass;方法修飾符為 public;方法參數個數為 0;返回類型為基本類型
			if ((name.startsWith("get") || name.startsWith("is"))
					&& !"getClass".equals(name)
					&& Modifier.isPublic(method.getModifiers())
					&& method.getParameterTypes().length == 0
					&& isPrimitive(method.getReturnType())) {
				// 獲取 parameter 注解
				Parameter parameter = method.getAnnotation(Parameter.class);
				// @Parameter(excluded = true),直接跳過
				if (method.getReturnType() == Object.class || parameter != null && parameter.excluded()) {
					continue;
				}
				int i = name.startsWith("get") ? 3 : 2;
				String prop = StringUtils.camelToSplitName(name.substring(i, i + 1).toLowerCase() + name.substring(i + 1), ".");
				String key;
				if (parameter != null && parameter.key().length() > 0) {
					key = parameter.key();
				} else {
					key = prop;
				}
				// 利用反射調用 config 類中的 get/is 方法
				Object value = method.invoke(config);
				String str = String.valueOf(value).trim();
				if (value != null && str.length() > 0) {
					// 是否需要轉義,UTF-8
					if (parameter != null && parameter.escaped()) {
						str = URL.encode(str);
					}
					if (parameter != null && parameter.append()) {
						String pre = parameters.get(Constants.DEFAULT_KEY + "." + key);
						if (pre != null && pre.length() > 0) {
							str = pre + "," + str;
						}
						pre = parameters.get(key);
						if (pre != null && pre.length() > 0) {
							str = pre + "," + str;
						}
					}
					if (prefix != null && prefix.length() > 0) {
						key = prefix + "." + key;
					}
					// key/value 添加到 parameters 集合
					parameters.put(key, str);
				} else if (parameter != null && parameter.required()) {
					throw new IllegalStateException(config.getClass().getSimpleName() + "." + key + " == null");
				}
				// 方法名為 getParameters();方法修飾符為 public;方法形參個數為0;返回類型為 Map
			} else if ("getParameters".equals(name)
					&& Modifier.isPublic(method.getModifiers())
					&& method.getParameterTypes().length == 0
					&& method.getReturnType() == Map.class) {
				Map<String, String> map = (Map<String, String>) method.invoke(config, new Object[0]);
				if (map != null && map.size() > 0) {
					String pre = (prefix != null && prefix.length() > 0 ? prefix + "." : "");
					for (Map.Entry<String, String> entry : map.entrySet()) {
						parameters.put(pre + entry.getKey().replace('-', '.'), entry.getValue());
					}
				}
			}
		} catch (Exception e) {
			throw new IllegalStateException(e.getMessage(), e);
		}
	}
}

該方法會調用當前類對象的 isXXX/getXXX 方法(非 getClass 方法;方法修飾符為 public;形參個數為 0;返回類型為基本類型),獲取其返回值構造鍵值對添加到指定 map 集合中;同時也會解析 getParameters() 返回的結果,構造鍵值對注入到 map 集合中。

本BLOG上原創文章未經本人許可,不得用於商業用途及傳統媒體。網絡媒體轉載請注明出處,否則屬於侵權行為。https://juejin.im/post/5c2b7ab46fb9a049d236273b


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM