服務端發布流程:
dubbo 是基於 spring 配置來實現服務的發布的,對於dubbo 配置文件中看到的<dubbo:service>等標簽都是服務發布的重要配置 ,對於這些提供可配置化的支持,spring功不可沒,spring提供了可拓展的Schema的支持。也就是自定義標簽的使用,這樣 dubbo基於這樣的規范實現自己的拓展,以至於我們在項目中可以使用dubbo所定義的標簽。在實現這個拓展的前提是要把spring的Core包加入項目中。具體的加載在Spring源碼深度解析一書中有詳細介紹,這里簡單提一下拓展自定義標簽的大致步驟:
1. NamespaceHandler: 注冊一堆 BeanDefinitionParser,利用他們來進行解析,源碼中 com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler extends NamespaceHandlerSupport,這里的DubboNamespaceHandler就是dubbo所實現的命名空間拓展:
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
}
可以看到在 init 里面 利用registerBeanDefinitionParser注冊了一系列的標簽,有我們很熟悉的protocol,service,registry等。這個里面主要做了一件事,把不同的配置分別轉化成spring容器中的bean對象
2. BeanDefinitionParser:用於解析每個 element 的內容,com.alibaba.dubbo.config.spring.schema.DubboBeanDefinitionParser implements BeanDefinitionParser 中DubboBeanDefinitionParser就充當了這么一個角色。
3. Spring 默認會加載 jar 包下的 META-INF/spring.handlers 文件尋找對應的 NamespaceHandler。在dubbo源碼中對應路徑我們真的發現了這個文件,以下是里面的內容:
http\://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler
還需要一個 spring.schemas 文件:
http\://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd
在服務啟動后會注冊一個 ServiceBean ,該類實現了 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware 五個接口,分別是初始化bean的時候提供一個機制,啟動后會在bean加載完調用afterPropertiesSet(),bean銷毀后的機制,上下文機制 加載AppicationContext,事件的監聽,獲取bean的當前屬性。
所以在解析 dubbo 配置文件的時候 會通過 ServiceBean 里的 afterPropertiesSet()這個方法 。 這個方法里面再次判斷 哪些標簽及屬性是否解析,判斷通過最后調用了一個:
if (! isDelay()) {
export();
}
判斷是否配置了delay屬性,延遲加載,在<dubbo:provider delay="10" /> 中配置。這個 export() 就是發布服務的入口。然后到 ServiceConfig<T> 類里面 export() 方法,繼而進入doExportUrls() 方法進行下一步發布跟注冊:
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true); // 獲取注冊中心的配置地址,可能有多個
for (ProtocolConfig protocolConfig : protocols) { // 是否支持多協議發布
doExportUrlsFor1Protocol(protocolConfig, registryURLs);// 協議 注冊中心地址
}
}
其中protocols是:<dubbo:protocol name="dubbo" port="20880" id="dubbo" /> 地址就是配置的注冊中心地址:[registry://192.168.254.135:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&owner=wuzz&pid=53004®istry=zookeeper×tamp=1543566568738]。其中我在dubbo配置文件里的配置是這樣的:
<!--提供方信息 -->
<dubbo:application name="dubbo-server" owner="wuzz" />
<!--注冊中心 --> <dubbo:registry id="zk1" address="zookeeper://192.168.254.135:2181" /> <!-- 協議 --> <dubbo:protocol port="20880" name="dubbo" />
然后進入 doExportUrlsFor1Protocol 方法:先是通過各種判斷獲取一個合法的IP地址,也就是我們上篇博文中提到的主機綁定,然后通過protocolConfig.getPort();獲取端口並進行一系列判斷,最后創建一個map:
Map<String, String> map = new HashMap<String, String>(); 用這個map來存放所有的參數,就是dubbo中配置的所有參數屬性。dubbo整個調用鏈路是基於URL驅動的,在綁定好所有的參數以后:
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
把這個map轉化成了一個URL地址:
dubbo://192.168.254.1:20880/com.gupaoedu.dubbo.IGpHello?anyhost=true&application=dubbo-server&dubbo=2.5.3&interface=com.gupaoedu.dubbo.IGpHello
&methods=sayHello&owner=wuzz&pid=50344&side=provider×tamp=1543567388046
這個地址是否似曾相識呢? 沒錯這個就是最后注冊到注冊中心節點的地址值。
接下去就是服務的核心發布過程:
//判斷注冊中心是否為空
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
//封裝了兩個URL
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// 執行遠程調用過成
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// 執行服務發布
Exporter<?> exporter = protocol.export(invoker);
// 暴露服務或取消暴露
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));這段代碼是獲得一個遠程服務的代理對象,可以發現,這個proxyFactory是一個拓展點:默認實現是@SPI("javassist")。而且這個拓展點會生成一個自適應適配器ProxyFactory$Adaptive
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
看一下 ProxyFactory$Adaptive :
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1,
com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
if (arg2 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url("
+ url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader
.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url("
+ url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader
.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
}
程序會調用ProxyFactory$Adaptive.getInvoker 方法,也是跟之前一樣 。從URL中獲取 extName,然后獲得指定的拓展點 ,默認使用 javassist實現,發現該拓展點有3個實現:
stub=com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper jdk=com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory javassist=com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory
默認使用 com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory,最終調用 JavassistProxyFactory.getInvoker:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper類不能正確處理帶$的類名 //這里主要做了一系列的封裝
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}//所以Invoker<?> invoker = proxyFactory.getInvoker();拿到的實例就是他,在處理消息的時候要調用
};
}
我們進入 Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);然后會發現一個 makeWrapper(c); 在這個方法里面是創建 Wapper的一些裝飾。里面主要是組裝一個動態的代理類,最后返回一個實例。可以看到里面有個 ClassGenerator cc = ClassGenerator.newInstance(cl); 這個是字節碼生成工具,里面組裝了很多東西,有一個最核心的是invokeMethod 方法是非常重要的,單獨拉出來看看:
public Object invokeMethod(Object o, String n, Class[] p, Object[] v)
throws java.lang.reflect.InvocationTargetException {
com.gupaoedu.dubbo.IGpHello w;
try {
w = ((com.gupaoedu.dubbo.IGpHello) $1);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
if ("sayHello".equals($2) && $3.length == 1) {
return ($w) w.sayHello((java.lang.String) $4[0]);
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException(
"Not found method \"" + $2 + "\" in class com.gupaoedu.dubbo.IGpHello.");
}
這是服務端返回的動態字節碼 像$1這樣的可能是 Object對象進行對IGpHello的轉換。 最后 makeWrapper 返回 (Wrapper) cc.toClass().newInstance();
接着:
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
上上述代碼最后返回的是wrapper.invokeMethod,也就是服務端返回的動態字節碼里面的.invokeMethod 方法,最后觸發 ($w) w.sayHello((java.lang.String) $4[0]); 其實Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));這段代碼中 invoker:

服務的發布---protocol.export(invoker)
protocol 這個地方,其實並不是直接調用 DubboProtocol 協議的 export, 大家跟我看看 protocol 這個屬性是在哪里實例化的?以及實例化的代碼是什么?
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); //Protocol$Adaptive
其實這個protocol 就是上篇博客中我們所提到的SPI拓展機制中關於 getAdaptiveExtension 獲取到的一個動態自適應拓展點的類 :Protocol$Adaptive 調用的 export方法也是這個類中的方法:
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0)
throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)// 判斷參數是否為空
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) // 判斷請求地址是否為空
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
// 獲取到URL,並且從URL中獲取請求的協議,默認為dubbo
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url("
+ url.toString() + ") use keys([protocol])");
// 獲取指定的協議拓展點實現
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader
.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
//發布
return extension.export(arg0);
}
Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName); 這段代碼是獲取制動名稱的拓展點實現,而extName 在該場景下是 registry ,所以這里利用自使用的適配器去運行動態的加載指定拓展點,這也是使用適配器的目的。非常的靈活。可以從debug中看到:

所以 extension 就是等於 RegistryProtocol.最后調用了 extension.export(arg0); 其實就是 RegistryProtocol 類的export 方法:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker 本地發布服務(啟動 netty)
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
// 訂閱 override 數據
// FIXME 提供者訂閱時,會影響同一 JVM 即暴露服務,又引用同一服務的的場景,因為subscribed 以服務名為緩存的 key,導致訂閱信息覆蓋。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 保證每次 export 都返回一個新的 exporter 實例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
先來看一下這個本地服務發布的流程 : final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
// 從緩存中獲取 key
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) { // 雙重檢查鎖
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
這里檢查完exporter 判斷為 null 的時候會新建一個 exporter 並且會調用 protocol.export(invokerDelegete), originInvoker),在加載擴展點的時候,有一個 injectExtension 方法,針對已經加載的擴展點中的擴展點屬性進行依賴注入。而這個protocol是通過set方法注入來的。所以這里會產生一個自適應的適配器,就是因此我們知道 protocol 是一個自適應擴展點,Protocol$Adaptive,然后調用這個自適應擴展點中的 export 方法,這個時候傳入的協議地址應該是:

可以看到這里的協議變成了dubbo,然后在獲取Protocol 拓展點實現的時候找到了 com.alibaba.dubbo.rpc.Protocol 文件內有如下內容:
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
緊接着在動態生成的 Protocol$Adaptive的export方法中的 ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);這個方法內createExtension會對DubboProtocol進行一個包裝:
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
cachedWrapperClasses 在loadFile方法內,也就是在尋找3哥指定文件目錄下的拓展點的時候加載的。
因此在 Protocol$Adaptive.export 方 法 中 ,ExtensionLoader.getExtension(Protocol.class).getExtension。應該就是基於 DubboProtocol 協議去發布服務了嗎?如果是這樣,那你們太單純了。這里並不是獲得一個單純的 DubboProtocol 擴展點,而是會通過 Wrapper對 Protocol 進 行 裝 飾 , 裝 飾 器 分 別 為 : ProtocolFilterWrapper/ProtocolListenerWrapper; 至於 MockProtocol 為什么不在裝飾器里面呢?大家再回想一下我們在看 ExtensionLoader.loadFile 這段代碼的時候,有一個判斷,裝飾器必須要具備一個帶有 Protocol 的構造方法,如下:
public ProtocolFilterWrapper(Protocol protocol){
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
截止到這里,我們已經知道,Protocol$Adaptive 里面的 export 方法,會調用 ProtocolFilterWrapper 以及 ProtocolListenerWrapper 類的方法
看看 ProtocolFilterWrapper 和 ProtocolListenerWrapper
ProtocolFilterWrapper:
這個類非常重要,dubbo 機制里面日志記錄、超時等等功能都是在這一部分實現的這個類有 3 個特點,
第一它有一個參數為 Protocol protocol 的構造函數;
第二,它實現了 Protocol 接口;
第三,它使用責任鏈模式,對 export 和 refer 函數進行了封裝
其中export:
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
buildInvokerChain://buildInvokerChain 函數:它讀取所有的 filter 類,利用這些類封裝invoker
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 自動激活擴展點,根據條件獲取當前擴展可自動激活的實現
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i --) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
其中 fileter 的拓展點從jar包上來看 如下:
echo=com.alibaba.dubbo.rpc.filter.EchoFilter generic=com.alibaba.dubbo.rpc.filter.GenericFilter genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter token=com.alibaba.dubbo.rpc.filter.TokenFilter accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter context=com.alibaba.dubbo.rpc.filter.ContextFilter consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter monitor=com.alibaba.dubbo.monitor.support.MonitorFilter validation=com.alibaba.dubbo.validation.filter.ValidationFilter cache=com.alibaba.dubbo.cache.filter.CacheFilter trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
這其中涉及到很多功能,包括權限驗證、異常、超時等等,當然可以預計計算調用時間等等應該也是在這其中的某個類實現的;這里我們可以看到 export 和 refer 過程都會被 filter 過濾
ProtocolListenerWrapper:
在這里我們可以看到 export 和 refer 分別對應了不同的 Wrapper;export是對應的 ListenerExporterWrapper。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
上訴代碼中的 protocol 所對應的就是 DubboProtocol,調用他的 export方法,繼而調用 openServer開啟服務:
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一個只有 server 可以調用的服務
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) { // 沒有的話就是創建服務
serverMap.put(key, createServer(url));
} else {
//server 支持 reset,配合 override 功能使用
server.reset(url);
}
}
}
createServer:創建服務,開啟心跳檢測,默認使用 netty。組裝 url
private ExchangeServer createServer(URL url) {
//默認開啟 server 關閉時發送 readonly 事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// heartbeat 心跳連接
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
然后進入server = Exchangers.bind(url, requestHandler);
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type)
}
可以看出這里又是一個獲取 Exchanger 拓展點,默認是 HeaderExchanger.NAME,調用他的 HeaderExchanger.bind方法,在調用 HeaderExchanger.bind 方 法 的 時 候 , 是 先 new 一 個HeaderExchangeServer. 這個 server 是干嘛呢? 是對當前這個連接去建立心跳機制
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
這里通過 Transporters 的bind 方法:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().bind(url, handler); } public static Transporter getTransporter() { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); }
最后這里又是一個自適應適配器去選擇合適的 getTransporter,這里Transporter的默認實現是 netty,可以在SPI注解上看出的:@SPI("netty")。通過 NettyTranport 創建基於 Netty 的 server 服務
官網有個服務發布的流程圖

服務注冊的過程:
前面,我們已經知道,基於 spring 這個解析入口,到發布服務的過程,接着基於 DubboProtocol 去發布,最終調用 Netty 的 api 創建了一個NettyServer。服務注冊的話就需要通過剛剛發布服務的時候通過
RegistryProtocol.export方法去看看:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker 本地發布服務(啟動 netty)
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
// 訂閱 override 數據
// FIXME 提供者訂閱時,會影響同一 JVM 即暴露服務,又引用同一服務的的場景,因為subscribed 以服務名為緩存的 key,導致訂閱信息覆蓋。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 保證每次 export 都返回一個新的 exporter 實例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
getRegistry:這個方法是 invoker 的地址獲取 registry 實例
private Registry getRegistry(final Invoker<?> originInvoker){
//獲得registry://192.168.254.135:2181 的協議地址
URL registryUrl = originInvoker.getUrl();
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
// 得到 zookeeper 的協議地址
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
// registryUrl 就會變成了 zookeeper://192.168.11.156
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}
return registryFactory.getRegistry(registryUrl);
}
這段代碼很明顯了,通過前面這段代碼的分析,其實就是把 registry 的協議頭改成服務提供者配置的協議地址,也就是我們配置的<dubbo:registry address="zookeeper://192.168.254.135:2181" />然后 registryFactory.getRegistry 的目的,就是通過協議地址匹配到對應的注冊中心。
registryFactory :這個又是一個拓展點,且方法層面有@Adaptive注解,所以會創建一個自適應的適配器類 RegistryFactory$Adaptive
@SPI("dubbo")
public interface RegistryFactory {
/**
* 連接注冊中心.
*
* 連接注冊中心需處理契約:
* 1. 當設置 check=false 時表示不檢查連接,否則在連接不上時拋出異常。
* 2. 支持 URL 上的 username:password 權限認證。
* 3. 支持 backup=10.20.153.10 備選注冊中心集群地址。
* 4. 支持 file=registry.cache 本地磁盤文件緩存。
* 5. 支持 timeout=1000 請求超時設置。
* 6. 支持 session=60000 會話超時或過期設置。
* @param url 注冊中心地址,不允許為空
* @return 注冊中心引用,總不返回空
*/
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
這里生成了適配器類以后,會跟之前一樣 得到一個 extName ,這里是zookeeper,在通過這個名稱獲取指定的拓展點實現,通過那3個指定路徑可以發現一個文件 com.alibaba.dubbo.registry.redis.RegistryFactory里面有3個實現:
dubbo=com.alibaba.dubbo.registry.dubbo.DubboRegistryFactory multicast=com.alibaba.dubbo.registry.multicast.MulticastRegistryFactory zookeeper=com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistryFactory redis=com.alibaba.dubbo.registry.redis.RedisRegistryFactory
最后得到 ZookeeperRegistryFactory。看看ZookeeperRegistryFactory:
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
這個方法中並沒有 getRegistry 方法,而是在父類 AbstractRegistryFactory:
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
//鎖定注冊中心獲取過程,保證注冊中心單一實例
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// 釋放鎖
LOCK.unlock();
}
}
createRegistry:創建一個注冊中心,這個是一個抽象方法,具體的實現在對應的子類實例中實現的,在 ZookeeperRegistryFactory 中
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
這里的 zookeeperTransporter 又是一個拓展點,默認是 ZkclientZookeeperTransporter
這里調用 ZookeeperRegistry 去調用注冊方法:
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (! group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group; //設置根節點
// 建立連接
zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener(new StateListener() {
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
我們對於 getRegistry 得出了一個結論,根據當前注冊中心的配置信息,最后new 了一個ZookeeperRegistry ,通過 zkClient 獲得一個匹配的注冊中心,繼續通過registry.register(registedProviderUrl);繼續往下分析,會調用 registry.register 去將 dubbo://的協議地址注冊到zookeeper 上這個方法會調用 FailbackRegistry 類中的 register. 為什么呢?因為ZookeeperRegistry 這個類中並沒有 register 這個方法,但是他的父類FailbackRegistry中存在register方法,而這個類又重寫了AbstractRegistry類中的 register 方法。所以我們可以直接定位到 FailbackRegistry 這個類中的 register 方法中:
public void register(URL url) {
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 向服務器端發送注冊請求
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 如果開啟了啟動時檢測,則直接拋出異常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if(skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// 將失敗的注冊請求記錄到失敗列表,定時重試
failedRegistered.add(url);
}
}
最后看看 ZookeeperRegistry.doRegister:
protected void doRegister(URL url) {
try {//調用 zkclient.create 在 zookeeper 中創建一個節點。
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
RegistryProtocol.export 這個方法中后續的代碼就是去對服務提供端去注冊一個 zookeeper 監聽,當監聽發生變化的時候,服務端做相應的處理。
我們可以發現最核心的2個方法:
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); Exporter<?> exporter = protocol.export(invoker);
跟我們之前手寫的RPC框架整合Zookeeper的時候也有相同思想的影子。無非就是獲取到需要發布的服務代理,把他發布出去,無非這里是更加復雜,使得系統更加全面。
服務端接收到消息以后處理過程:
在服務發布的時候有一個 Exchanger 的拓展點相關的,會進入默認拓展點 HeaderExchanger 的 bind 方法:
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
這里 new DecodeHandler(new HeaderExchangeHandler(handler)) 是干嘛的呢 ? 這里的 handler 是前面可以看到 ExchangeHandlerAdapter ,然后通過調用NettyTransporter 的bind ,緊接着 new 了一個 Nettyserver:在里面又進行了一層包裝
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
再看看這個 warp 是做了什么:
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
最后又包裝了個 MultiMessageHandler,這里面又獲取了一個自適應適配器Dispatcher$Adaptive ,默認 AllDispatcher。
所以處理鏈為:MultiMessageHandler: ->HeartbeatHandler:->AllChannelHandler:->DecodeHandler:->HeaderExchangeHandler->ExchangeHandlerAdapter
Netty最后處理消息的Handler 是 NettyHandler 里面的 messageReceived(ChannelHandlerContext, MessageEvent)方法去處理:
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
然后進入我們上面提到的處理鏈:
MultiMessageHandler: 復合消息處理
HeartbeatHandler:心跳消息處理,接收心跳並發送心跳響應
AllChannelHandler:業務線程轉化處理器,把接收到的消息封裝成ChannelEventRunnable可執行任務給線程池處理
DecodeHandler:業務解碼處理器,反序列化
HeaderExchangeHandler 里面:
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {//進入這里
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {//雙向請求/進入這里
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {// 單向
handler.received(exchangeChannel, request.getData());
}
}
。。。。。。。。。
}
handleRequest(exchangeChannel, request);進行請求的驗證處理,最后進入handler.reply(channel, msg);
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) msg = null;
else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
else msg = data.toString();
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
return res;
}
// find handler by message class.
Object msg = req.getData();
try {
// handle data.
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
handler.reply(channel, msg); 所調用的handler是我們最最原始傳進來的沒有包裝過的handler:
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
就是 ExchangeHandlerAdapter 的reply 方法,最后調用了一個 invoke.invoke(),在發布服務的時候我們說過那個得到的invoke對象是JavassistProxyFactory 里面的getInvoker方法返回的 AbstractProxyInvoker,所以進入該類的invoke方法:
public Result invoke(Invocation invocation) throws RpcException {
try {
return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
} catch (InvocationTargetException e) {
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
然后就是調用服務發布的時候的代理對象的方法,即最終要調用的實現方法。這樣子消息就處理結束了。
