7.4 服務遠程暴露 - 創建Exporter與啟動netty服務端


為了安全:服務啟動的ip全部使用10.10.10.10

遠程服務的暴露總體步驟:

  • 將ref封裝為invoker
  • 將invoker轉換為exporter
  • 啟動netty
  • 注冊服務到zookeeper
  • 訂閱
  • 返回新的exporter實例

服務遠程暴露的代碼:

 1             //如果配置不是local則暴露為遠程服務.(配置為local,則表示只暴露本地服務)
 2             if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
 3                 if (logger.isInfoEnabled()) {
 4                     logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
 5                 }
 6                 if (registryURLs != null && registryURLs.size() > 0
 7                         && url.getParameter("register", true)) {
 8                     for (URL registryURL : registryURLs) {
 9                         url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
10                         URL monitorUrl = loadMonitor(registryURL);
11                         if (monitorUrl != null) {
12                             url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
13                         }
14                         if (logger.isInfoEnabled()) {
15                             logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
16                         }
17                         Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
18                         Exporter<?> exporter = protocol.export(invoker);
19                         exporters.add(exporter);
20                     }
21                 } else {
22                     Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
23                     Exporter<?> exporter = protocol.export(invoker);
24                     exporters.add(exporter);
25                 }
26             }

首先將實現類ref封裝為Invoker,之后將invoker轉換為exporter,最后將exporter放入緩存List<Exporter> exporters中。

 

一 將實現類ref封裝為Invoker

1 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

1  為registryURL拼接export=providerUrl參數

一開始的registryURL:

registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&pid=887&registry=zookeeper&timestamp=1507096022072

registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())這句代碼為registryURL添加了參數並編碼:(這里給出沒有編碼的樣子)

1 export=dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=887&side=provider&timestamp=1507096024334

2  ProxyFactory$Adaptive.getInvoker(DemoServiceImpl實例, Class<DemoService>, registryURL)

 1     public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
 2         if (arg2 == null)
 3             throw new IllegalArgumentException("url == null");
 4         com.alibaba.dubbo.common.URL url = arg2;
 5         String extName = url.getParameter("proxy", "javassist");//結果是javassist
 6         if(extName == null)
 7             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
 8         com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
 9         return extension.getInvoker(arg0, arg1, arg2);
10     }

這里,本來是調用JavassistProxyFactory的getInvoker方法,但是JavassistProxyFactory被StubProxyFactoryWrapper給aop了。

3  StubProxyFactoryWrapper.getInvoker(DemoServiceImpl實例, Class<DemoService>, registryURL)

1     public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
2         return proxyFactory.getInvoker(proxy, type, url);
3     }

4  JavassistProxyFactory.getInvoker(DemoServiceImpl實例, Class<DemoService>, registryURL)

 1     public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
 2         // TODO Wrapper類不能正確處理帶$的類名
 3         final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
 4         return new AbstractProxyInvoker<T>(proxy, type, url) {
 5             @Override
 6             protected Object doInvoke(T proxy, String methodName,
 7                                       Class<?>[] parameterTypes,
 8                                       Object[] arguments) throws Throwable {
 9                 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
10             }
11         };
12     }

首先是創建Wrapper類:Wrapper.getWrapper(Class<DemoServiceImpl>)。該類記錄了DemoServiceImpl的屬性名稱,方法名稱等信息。關鍵代碼如下:(完整代碼見:7.2 服務本地暴露

 1 import com.alibaba.dubbo.common.bytecode.Wrapper;
 2 import java.util.HashMap;
 3 
 4 public class Wrapper1 extends Wrapper {
 5 
 6     public static String[] pns;//property name array
 7     public static java.util.Map pts = new HashMap();//<property key, property value>
 8     public static String[] mns;//method names
 9     public static String[] dmns;//
10     public static Class[] mts0;
55     /**
56      * @param o  實現類
57      * @param n  方法名稱
58      * @param p  參數類型
59      * @param v  參數名稱
60      * @return
61      * @throws java.lang.reflect.InvocationTargetException
62      */
63     public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
64         com.alibaba.dubbo.demo.provider.DemoServiceImpl w;
65         try {
66             w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl) o);
67         } catch (Throwable e) {
68             throw new IllegalArgumentException(e);
69         }
70         try {
71             if ("sayHello".equals(n) && p.length == 1) {
72                 return ($w) w.sayHello((java.lang.String) v[0]);
73             }
74         } catch (Throwable e) {
75             throw new java.lang.reflect.InvocationTargetException(e);
76         }
77         throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + n + "\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.");
78     }
79 }

創建完DemoServiceImpl的Wrapper類之后(實際上該實例在本地暴露的時候已經存入緩存了,這里只是從緩存中拿出來而已),創建一個AbstractProxyInvoker實例。

 1     private final T proxy;
 2     private final Class<T> type;
 3     private final URL url;
 4 
 5     public AbstractProxyInvoker(T proxy, Class<T> type, URL url) {
 6         if (proxy == null) {
 7             throw new IllegalArgumentException("proxy == null");
 8         }
 9         if (type == null) {
10             throw new IllegalArgumentException("interface == null");
11         }
12         if (!type.isInstance(proxy)) {
13             throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type);
14         }
15         this.proxy = proxy;
16         this.type = type;
17         this.url = url;
18     }

最后創建完成的AbstractProxyInvoker實例屬性如下:

  • proxy:DemoServiceImpl實例
  • type:Class<com.alibaba.dubbo.demo.DemoService>
  • url:registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D993%26side%3Dprovider%26timestamp%3D1507100322516&pid=993&registry=zookeeper&timestamp=1507100319830

這樣我們就將ref實現類轉換成了Invoker,之后在調用該invoker.invoke(Invocation invocation)的時候,會調用invoker.doInvoke(T proxy, String methodName,Class<?>[] parameterTypes, Object[] arguments)的時候,就會調用相應的實現類proxy的wrapper類的invokeMethod(proxy, methodName, parameterTypes, arguments),該方法又會調用真實的實現類methodName方法。這里可以先給出AbstractProxyInvoker.invoke(Invocation invocation)源碼:

1     public Result invoke(Invocation invocation) throws RpcException {
2         try {
3             return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
4         } catch (InvocationTargetException e) {
5             return new RpcResult(e.getTargetException());
6         } catch (Throwable e) {
7             throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
8         }
9     }

這里的proxy就是上邊賦好值的proxy:DemoServiceImpl實例。而方法信息會封裝在Invocation對象中,該對象在服務引用時介紹。

 

二  將Invoker轉換為Exporter

1 Exporter<?> exporter = protocol.export(invoker)

1  Protocol$Adaptive.export(com.alibaba.dubbo.rpc.Invoker AbstractProxyInvoker實例)

 1     public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
 2         if (arg0 == null)
 3             throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
 4         if (arg0.getUrl() == null)
 5             throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
 6         com.alibaba.dubbo.common.URL url = arg0.getUrl();
 7         String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());//registry
 8         if(extName == null)
 9             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
10         com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
11         return extension.export(arg0);
12     }

這里,由於aop的原因,首先調用了ProtocolListenerWrapper的export(Invoker<T> invoker),如下:

1     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2         if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
3             return protocol.export(invoker);
4         }
5         return new ListenerExporterWrapper<T>(protocol.export(invoker),
6                                               Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
7     }

由於協議是“registry”,所以不做任何處理,繼續調用ProtocolFilterWrapper的export(Invoker<T> invoker),如下:

1     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2         if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
3             return protocol.export(invoker);
4         }
5         return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
6     }

同理,由於協議是“registry”,所以不做任何處理,繼續調用RegistryProtocol.export(final Invoker<T> originInvoker),如下:

 1     public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
 2         //export invoker
 3         final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
 4         //registry provider
 5         final Registry registry = getRegistry(originInvoker);
 6         final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
 7         registry.register(registedProviderUrl);
 8         // 訂閱override數據
 9         // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,因為subscribed以服務名為緩存的key,導致訂閱信息覆蓋。
10         final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
11         final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
12         overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
13         registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
14         //保證每次export都返回一個新的exporter實例
15         return new Exporter<T>() {
16             public Invoker<T> getInvoker() {
17                 return exporter.getInvoker();
18             }
19 
20             public void unexport() {
21                 try {
22                     exporter.unexport();
23                 } catch (Throwable t) {
24                     logger.warn(t.getMessage(), t);
25                 }
26                 try {
27                     registry.unregister(registedProviderUrl);
28                 } catch (Throwable t) {
29                     logger.warn(t.getMessage(), t);
30                 }
31                 try {
32                     overrideListeners.remove(overrideSubscribeUrl);
33                     registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
34                 } catch (Throwable t) {
35                     logger.warn(t.getMessage(), t);
36                 }
37             }
38         };
39     }

該方法完成了遠程暴露的全部流程。

  • 將invoker轉換為exporter
  • 啟動netty
  • 注冊服務到zookeeper
  • 訂閱
  • 返回新的exporter實例

2  將invoker轉換為exporter並啟動netty服務

1 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

doLocalExport(final Invoker<T> originInvoker)

 1     /**
 2      * 1 從invoker的URL中的Map<String, String> parameters中獲取key為export的地址providerUrl,該地址將是服務注冊在zk上的節點
 3      * 2 從 Map<String, ExporterChangeableWrapper<?>> bounds 緩存中獲取key為上述providerUrl的exporter,如果有,直接返回,如果沒有,創建並返回
 4      * @return
 5      */
 6     @SuppressWarnings("unchecked")
 7     private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
 8         String key = getCacheKey(originInvoker);//根據originInvoker獲取providerUrl
 9         ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
10         if (exporter == null) {
11             synchronized (bounds) {
12                 exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
13                 if (exporter == null) {
14                     final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));//存儲originInvoker和providerUrl
15                     exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
16                     bounds.put(key, exporter);
17                 }
18             }
19         }
20         return exporter;
21     }

2.1 從originInvoker中獲取providerUrl

該方法直接首先調用getCacheKey(final Invoker<?> originInvoker)中獲取providerUrl,這里的originInvoker就是上述創建出來的AbstractProxyInvoker實例,注意他的url是registry協議的,該url的export參數的value就是我們要獲取的providerUrl。獲取providerUrl的源碼如下:

 1     private String getCacheKey(final Invoker<?> originInvoker) {
 2         URL providerUrl = getProviderUrl(originInvoker);
 3         String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
 4         return key;
 5     }
 6 
 7     private URL getProviderUrl(final Invoker<?> origininvoker) {
 8         String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY);
 9         if (export == null || export.length() == 0) {
10             throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
11         }
12 
13         URL providerUrl = URL.valueOf(export);
14         return providerUrl;
15     }

之后一系列的操作,就是獲取該providerUrl對應的exporter,之后放入緩存Map<String, ExporterChangeableWrapper<?>> bounds中,所以一個providerUrl只會對應一個exporter。

2.2  創建InvokerDelegete

1 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));

InvokerDelegete是RegistryProtocol的一個靜態內部類,該類是一個originInvoker的委托類,該類存儲了originInvoker,其父類InvokerWrapper還會存儲providerUrl,InvokerWrapper會調用originInvoker的invoke方法,也會銷毀invoker。可以管理invoker的生命周期。

 1     public static class InvokerDelegete<T> extends InvokerWrapper<T> {
 2         private final Invoker<T> invoker;
 3 
 4         /**
 5          * @param invoker
 6          * @param url     invoker.getUrl返回此值
 7          */
 8         public InvokerDelegete(Invoker<T> invoker, URL url) {
 9             super(invoker, url);
10             this.invoker = invoker;
11         }
12 
13         public Invoker<T> getInvoker() {
14             if (invoker instanceof InvokerDelegete) {
15                 return ((InvokerDelegete<T>) invoker).getInvoker();
16             } else {
17                 return invoker;
18             }
19         }
20     }

InvokerWrapper的核心代碼:

 1 public class InvokerWrapper<T> implements Invoker<T> {
 2     private final Invoker<T> invoker;//originInvoker
 3     private final URL url;//providerUrl
 4 
 5     public InvokerWrapper(Invoker<T> invoker, URL url) {
 6         this.invoker = invoker;
 7         this.url = url;
 8     }
 9 
10     public boolean isAvailable() {
11         return invoker.isAvailable();
12     }
13 
14     public Result invoke(Invocation invocation) throws RpcException {
15         return invoker.invoke(invocation);
16     }
17 
18     public void destroy() {
19         invoker.destroy();
20     }
21 }

這樣一個InvokerDelegete對象就創建好了,屬性如下:

  • invoker:originInvoker(AbstractProxyInvoker對象)
  • InvokerWrapper.invoker:originInvoker(AbstractProxyInvoker對象)
  • url:providerUrl(dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1035&side=provider&timestamp=1507101286063)

2.3  使用DubboProtocol將InvokerDelegete轉換為Exporter

1 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker)

2.3.1  Protocol$Adaptive.export(com.alibaba.dubbo.rpc.Invoker InvokerDelegete對象)

 1     public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
 2         if (arg0 == null)
 3             throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
 4         if (arg0.getUrl() == null)
 5             throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
 6         com.alibaba.dubbo.common.URL url = arg0.getUrl();
 7         String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());//dubbo
 8         if(extName == null)
 9             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
10         com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
11         return extension.export(arg0);
12     }

該代碼再貼最后一遍了。之后調用ProtocolListenerWrapper的ProtocolListenerWrapper.export(Invoker<T> InvokerDelegete),之后調用ProtocolFilterWrapper.export(Invoker<T> InvokerDelegete):首先對InvokerDelegete對象進行8個filter的遞歸包裝,之后使用DubboProtocol對包裝后的InvokerDelegete對象進行export。

層層包裝的源碼:

 1     /**
 2      * 1 根據key從url中獲取相應的filter的values,再根據這個values和group去獲取類上帶有@Active注解的filter集合
 3      * 2 之后將這些filter對傳入的invoker進行遞歸包裝層invoker(就是一個鏈表)
 4      */
 5     private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
 6         Invoker<T> last = invoker;
 7         List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
 8         if (filters.size() > 0) {
 9             for (int i = filters.size() - 1; i >= 0; i--) {
10                 final Filter filter = filters.get(i);
11                 final Invoker<T> next = last;
12                 last = new Invoker<T>() {
13 
14                     public Class<T> getInterface() {
15                         return invoker.getInterface();
16                     }
17 
18                     public URL getUrl() {
19                         return invoker.getUrl();
20                     }
21 
22                     public boolean isAvailable() {
23                         return invoker.isAvailable();
24                     }
25 
26                     public Result invoke(Invocation invocation) throws RpcException {
27                         return filter.invoke(next, invocation);
28                     }
29 
30                     public void destroy() {
31                         invoker.destroy();
32                     }
33 
34                     @Override
35                     public String toString() {
36                         return invoker.toString();
37                     }
38                 };
39             }
40         }
41         return last;
42     }

上述方法中最重要的就是Invoker的Result invoke(Invocation invocation),在該方法中,是使用了filter.invoke(next, invocation),而這里的next又可能是另一個filter。這里我們打開一個filter來看一下源碼:

1 @Activate(group = Constants.PROVIDER, order = -110000)
2 public class EchoFilter implements Filter {
3     public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
4         if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1)
5             return new RpcResult(inv.getArguments()[0]);
6         return invoker.invoke(inv);
7     }
8 }

可以看到,該filter會調用傳入的next的invoke方法。

這里給出被遞歸包裝后的對象:(命名為InvokerDelegete的filter對象)

1 EchoFilter
2 -->ClassLoaderFilter
3    -->GenericFilter
4       -->ContextFilter
5          -->TraceFilter
6             -->TimeoutFilter
7                -->MonitorFilter
8                   -->ExceptionFilter
9                      -->InvokerDelegete對象

2.3.2  DubboProtocol.export(Invoker<T> InvokerDelegete的filter對象)

    /**
     * 1 從invoker的url中獲取將要暴露的遠程服務的key:com.alibaba.dubbo.demo.DemoService:20880(實際上是:serviceGroup/serviceName:serviceVersion:port)
     * 注意:本地暴露的key就是:com.alibaba.dubbo.demo.DemoService
     * 2 打開ExchangeServer
     */
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispaching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);

        return exporter;
    }

首先從“InvokerDelegete的filter對象”中的url獲取key,這段代碼很簡單,就是獲取serviceGroup/serviceName:serviceVersion:port這樣形式的一個key,這里最后獲取到的是com.alibaba.dubbo.demo.DemoService:20880。

之后創建DubboExporter。

2.3.2.1 DubboExporter<T>(InvokerDelegete的filter對象, "com.alibaba.dubbo.demo.DemoService:20880", exporterMap)

 1 public class DubboExporter<T> extends AbstractExporter<T> {
 2     //serviceGroup/serviceName:serviceVersion:port, 例如:com.alibaba.dubbo.demo.DemoService:20880
 3     private final String key;//
 4     //{ "com.alibaba.dubbo.demo.DemoService:20880" -> 當前的DubboExporter實例 }
 5     private final Map<String, Exporter<?>> exporterMap;
 6 
 7     public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
 8         super(invoker);
 9         this.key = key;
10         this.exporterMap = exporterMap;
11     }
12 
13     @Override
14     public void unexport() {
15         super.unexport();
16         exporterMap.remove(key);
17     }
18 }

注意這里的exporterMap是引用傳遞。

父類:

 1 public abstract class AbstractExporter<T> implements Exporter<T> {
 2     protected final Logger logger = LoggerFactory.getLogger(getClass());
 3     private final Invoker<T> invoker;
 4     private volatile boolean unexported = false;
 5 
 6     public AbstractExporter(Invoker<T> invoker) {
 7         if (invoker == null)
 8             throw new IllegalStateException("service invoker == null");
 9         if (invoker.getInterface() == null)
10             throw new IllegalStateException("service type == null");
11         if (invoker.getUrl() == null)
12             throw new IllegalStateException("service url == null");
13         this.invoker = invoker;
14     }
15 
16     public Invoker<T> getInvoker() {
17         return invoker;
18     }
19 
20     public void unexport() {
21         if (unexported) {
22             return;
23         }
24         unexported = true;
25         getInvoker().destroy();
26     }
27 
28     public String toString() {
29         return getInvoker().toString();
30     }
31 }

這里,我們把一個“InvokerDelegete的filter對象”賦給了AbstractExporter的Invoker引用,也就是說從exporter中可以獲取到invoker。最后在DubboProtocol.export(Invoker<T> invoker)中執行:exporterMap.put(key, exporter); 這樣就將{ "com.alibaba.dubbo.demo.DemoService:20880" -> 當前的DubboExporter實例 }存儲起來了。

來看一下現在的DubboExporter實例:

  • key:com.alibaba.dubbo.demo.DemoService:20880
  • invoker:“InvokerDelegete的filter對象”
  • exporterMap:{ "com.alibaba.dubbo.demo.DemoService:20880" -> 當前的DubboExporter實例 }

2.3.2.2 開啟ExchangeServer

 1     /**
 2      * 從緩存Map<String, ExchangeServer> serverMap中根據"host:port"獲取ExchangeServer,如果沒有,創建ExchangeServer,之后放入緩存。
 3      * @param url
 4      */
 5     private void openServer(URL url) {
 6         // find server.
 7         String key = url.getAddress();//10.10.10.10:20880
 8         //client 也可以暴露一個只有server可以調用的服務。
 9         boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
10         if (isServer) {
11             ExchangeServer server = serverMap.get(key);
12             if (server == null) {
13                 serverMap.put(key, createServer(url));
14             } else {
15                 //server支持reset,配合override功能使用
16                 server.reset(url);
17             }
18         }
19     }

首先從provderUrl中獲取host:port作為key,之后從緩存serverMap中獲取ExchangeServer,如果沒有,創建ExchangeServer,最后以如下方式放入緩存:

Map<String, ExchangeServer> serverMap:{ "10.10.10.10:20880"<->ExchangeServer實例 }。

創建ExchangeServer:createServer(URL providerUrl)

 1     private ExchangeServer createServer(URL url) {
 2         //默認開啟server關閉時發送readonly事件
 3         url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
 4         //默認開啟heartbeat
 5         url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
 6         String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
 7 
 8         if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
 9             throw new RpcException("Unsupported server type: " + str + ", url: " + url);
10 
11         url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
12         ExchangeServer server;
13         try {
14             server = Exchangers.bind(url, requestHandler);
15         } catch (RemotingException e) {
16             throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
17         }
18         str = url.getParameter(Constants.CLIENT_KEY);
19         if (str != null && str.length() > 0) {
20             Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
21             if (!supportedTypes.contains(str)) {
22                 throw new RpcException("Unsupported client type: " + str);
23             }
24         }
25         return server;
26     }

首先是在原本providerUrl上添加參數:channel.readonly.sent=true&heartbeat=60000&codec=dubbo(其中的heartbeat參數會在HeaderExchangeServer啟動心跳計時器時使用)

之后使用Exchangers.bind("添加參數后的providerUrl", requestHandler)創建ExchangeServer。首先來看一下DubboProtocol#requestHandler。這個類極其重要,后續經過層層包裝后,會成為最終netty的服務端邏輯處理器。

 1     private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
 2         public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
 3             if (message instanceof Invocation) {
 4                 Invocation inv = (Invocation) message;
 5                 Invoker<?> invoker = getInvoker(channel, inv);
 6                 //如果是callback 需要處理高版本調用低版本的問題
 7                 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
 8                     String methodsStr = invoker.getUrl().getParameters().get("methods");
 9                     boolean hasMethod = false;
10                     if (methodsStr == null || methodsStr.indexOf(",") == -1) {
11                         hasMethod = inv.getMethodName().equals(methodsStr);
12                     } else {
13                         String[] methods = methodsStr.split(",");
14                         for (String method : methods) {
15                             if (inv.getMethodName().equals(method)) {
16                                 hasMethod = true;
17                                 break;
18                             }
19                         }
20                     }
21                     if (!hasMethod) {
22                         logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
23                         return null;
24                     }
25                 }
26                 RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
27                 return invoker.invoke(inv);
28             }
29             throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
30         }
31 
32         @Override
33         public void received(Channel channel, Object message) throws RemotingException {
34             if (message instanceof Invocation) {
35                 reply((ExchangeChannel) channel, message);
36             } else {
37                 super.received(channel, message);
38             }
39         }
40 
41         @Override
42         public void connected(Channel channel) throws RemotingException {
43             invoke(channel, Constants.ON_CONNECT_KEY);
44         }
45 
46         @Override
47         public void disconnected(Channel channel) throws RemotingException {
48             if (logger.isInfoEnabled()) {
49                 logger.info("disconected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
50             }
51             invoke(channel, Constants.ON_DISCONNECT_KEY);
52         }
53 
54         private void invoke(Channel channel, String methodKey) {
55             Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
56             if (invocation != null) {
57                 try {
58                     received(channel, invocation);
59                 } catch (Throwable t) {
60                     logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
61                 }
62             }
63         }
64 
65         private Invocation createInvocation(Channel channel, URL url, String methodKey) {
66             String method = url.getParameter(methodKey);
67             if (method == null || method.length() == 0) {
68                 return null;
69             }
70             RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
71             invocation.setAttachment(Constants.PATH_KEY, url.getPath());
72             invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
73             invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
74             invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
75             if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
76                 invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
77             }
78             return invocation;
79         }
80     };

從上可以看出在該handler中,定義了與客戶端連接成功/斷開連接/接受到客戶端消息/相應消息,以及創造Invocation的方法。其中的getInvoker(Channel channel, Invocation inv)方法簡碼如下:

1         String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
2         DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
3         return exporter.getInvoker();

這不就是我們剛剛放置到exporterMap中的DubboExporter,而其中的invoker不就是我們的“filter的invokerdelegete對象”。

 

使用Exchangers.bind(providerUrl, ExchangeHandlerAdapter對象)創建ExchangeServer

 1     public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
 2         if (url == null) {
 3             throw new IllegalArgumentException("url == null");
 4         }
 5         if (handler == null) {
 6             throw new IllegalArgumentException("handler == null");
 7         }
 8         url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
 9         return getExchanger(url).bind(url, handler);
10     }
11 
12     public static Exchanger getExchanger(URL url) {
13         String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);//header
14         return getExchanger(type);
15     }
16 
17     public static Exchanger getExchanger(String type) {
18         return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
19     }

getExchanger(URL url)返回一個HeaderExchanger實例。所以ExchangeServer的創建交由HeaderExchanger來實現。

 

HeaderExchanger.bind(providerUrl, ExchangeHandlerAdapter對象) 

1     /**
2      * 1 對handler進行三次包裝:首先將ExchangeHandlerAdapter賦給HeaderExchangeHandler中的ExchangeHandler handler屬性;然后將創建出來的HeaderExchangeHandler賦給DecodeHandler的父類AbstractChannelHandlerDelegate的ChannelHandler handler屬性
3      */
4     public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
5         return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
6     }

說明:

  • 這里首先對傳入的ExchangeHandlerAdapter進行了兩次包裝,最終得到DecodeHandler實例;
  • 之后,使用Transporters.bind(providerUrl, DecodeHandler對象)創建了一個NettyServer;
  • 最后使用HeaderExchangeServer包裝了上邊的NettyServer,並啟動了心跳計數器。
    • HeaderExchangeServer實例也是最終返回的ExchangeServer實例,將最終被存儲在Map<String, ExchangeServer> serverMap:{ "10.10.10.10:20880"<->HeaderExchangeServer實例 }

包裝ExchangeHandlerAdapter,獲取DecodeHandler實例。代碼比較簡單,不列出來了。

最終獲取到的DecodeHandler實例的層級關系:

1 DecodeHandler實例
2 -->HeaderExchangeHandler實例
3    -->ExchangeHandlerAdapter實例

 

使用Transporters.bind(providerUrl, DecodeHandler對象)創建了一個NettyServer

Transporters.bind(providerUrl, DecodeHandler對象)

 1     public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
 2         if (url == null) {
 3             throw new IllegalArgumentException("url == null");
 4         }
 5         if (handlers == null || handlers.length == 0) {
 6             throw new IllegalArgumentException("handlers == null");
 7         }
 8         ChannelHandler handler;
 9         if (handlers.length == 1) {
10             handler = handlers[0];
11         } else {
12             handler = new ChannelHandlerDispatcher(handlers);
13         }
14         return getTransporter().bind(url, handler);
15     }
16 
17     public static Transporter getTransporter() {
18         return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
19     }

 

Transporter$Adaptive.bind(providerUrl, DecodeHandler對象)

 1     public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException {
 2         if (arg0 == null)
 3             throw new IllegalArgumentException("url == null");
 4         com.alibaba.dubbo.common.URL url = arg0;
 5         String extName = url.getParameter("server", url.getParameter("transporter", "netty"));//netty
 6         if(extName == null)
 7             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])");
 8         com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
 9         return extension.bind(arg0, arg1);
10     }

 

最后NettyServer的創建由NettyTransporter來創建。

NettyTransporter.bind(providerUrl, DecodeHandler對象)

 1 public class NettyTransporter implements Transporter {
 2     public static final String NAME = "netty";
 3 
 4     public Server bind(URL url, ChannelHandler listener) throws RemotingException {
 5         return new NettyServer(url, listener);
 6     }
 7 
 8     public Client connect(URL url, ChannelHandler listener) throws RemotingException {
 9         return new NettyClient(url, listener);
10     }
11 }

 

new NettyServer(providerUrl, DecodeHandler對象)

1     public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
2         super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
3     }

這里首先為providerUrl添加參數:threadname=DubboServerHandler-10.10.10.10:20880(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME));

之后,使用ChannelHandlers.wrap(DecodeHandler對象, providerUrl)對DecodeHandler對象進行了三層包裝,最終得到MultiMessageHandler實例;

最后調用父類的構造器初始化NettyServer的各個屬性,最后啟動netty。

先看一下

ChannelHandlers.wrap(DecodeHandler對象, providerUrl)

 1     /**
 2      * 這里又是層層包裹:
 3      * MultiMessageHandler
 4      * --HeartbeatHandler
 5      *   --AllChannelHandler
 6      *     --DecodeHandler
 7      *       --HeaderExchangeHandler
 8      *         --ExchangeHandlerAdapter
 9      * @param handler
10      * @param url
11      * @return
12      */
13     protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
14         return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
15                 .getAdaptiveExtension().dispatch(handler, url)));
16     }

ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()獲取到一個Dispatcher$Adaptive適配類。

Dispatcher$Adaptive.dispatch(DecodeHandler對象, providerUrl)

 1     public com.alibaba.dubbo.remoting.ChannelHandler dispatch(com.alibaba.dubbo.remoting.ChannelHandler arg0, com.alibaba.dubbo.common.URL arg1) {
 2         if (arg1 == null)
 3             throw new IllegalArgumentException("url == null");
 4         com.alibaba.dubbo.common.URL url = arg1;
 5         String extName = url.getParameter("dispatcher", url.getParameter("dispather", url.getParameter("channel.handler", "all")));//all
 6         if(extName == null)
 7             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + url.toString() + ") use keys([dispatcher, dispather, channel.handler])");
 8         com.alibaba.dubbo.remoting.Dispatcher extension = (com.alibaba.dubbo.remoting.Dispatcher)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Dispatcher.class).getExtension(extName);
 9         return extension.dispatch(arg0, arg1);
10     }

這里獲取到AllDispatcher,Dispatcher決定了dubbo的線程模型,指定了哪些做什么,哪些線程做什么。講到dubbo通信的時候再寫。

AllDispatcher.dispatch(DecodeHandler對象, providerUrl)

1     public ChannelHandler dispatch(ChannelHandler handler, URL url) {
2         return new AllChannelHandler(handler, url);
3     }

new AllChannelHandler(DecodeHandler對象, providerUrl)

1     public AllChannelHandler(ChannelHandler handler, URL url) {
2         super(handler, url);
3     }

來看其父類的WrappedChannelHandler的構造器:

WrappedChannelHandler(DecodeHandler對象, providerUrl)

 1     protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
 2     protected final ExecutorService executor;
 3     protected final ChannelHandler handler;
 4     protected final URL url;
 5 
 6     public WrappedChannelHandler(ChannelHandler handler, URL url) {
 7         this.handler = handler;
 8         this.url = url;
 9         executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
10 
11         String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
12         if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
13             componentKey = Constants.CONSUMER_SIDE;
14         }
15         DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
16         dataStore.put(componentKey, Integer.toString(url.getPort()), executor);//{"java.util.concurrent.ExecutorService":{"20880":executor}}
17     }

首先創建了一個共享線程池:SHARED_EXECUTOR;

之后為handler/url/executor賦值,其中executor是一個200個線程數的fixed線程池(隊列為0,即同步隊列);

 1     public Executor getExecutor(URL url) {
 2         String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);//默認為dubbo,但是我們這里是DubboServerHandler-10.10.10.10:20880(就是之前設置到url上的threadname)
 3         int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);//200
 4         int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);//0
 5         return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
 6                 queues == 0 ? new SynchronousQueue<Runnable>() :
 7                         (queues < 0 ? new LinkedBlockingQueue<Runnable>()
 8                                 : new LinkedBlockingQueue<Runnable>(queues)),
 9                 new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
10     }

之后獲取了一個數據存儲器:SimpleDataStore;

最后將{"java.util.concurrent.ExecutorService":{"20880": executor}}數據存儲在SimpleDataStore的ConcurrentMap<String, ConcurrentMap<String, Object>> data數據結構中。也就是說:每一個端口,有一個線程池。

注意:為什么SimpleDataSource可以做緩存來使用?

 1     public T getExtension(String name) {
 2         if (name == null || name.length() == 0)
 3             throw new IllegalArgumentException("Extension name == null");
 4         if ("true".equals(name)) {
 5             return getDefaultExtension();
 6         }
 7         Holder<Object> holder = cachedInstances.get(name);
 8         if (holder == null) {
 9             cachedInstances.putIfAbsent(name, new Holder<Object>());
10             holder = cachedInstances.get(name);
11         }
12         Object instance = holder.get();
13         if (instance == null) {
14             synchronized (holder) {
15                 instance = holder.get();
16                 if (instance == null) {
17                     instance = createExtension(name);
18                     holder.set(instance);
19                 }
20             }
21         }
22         return (T) instance;
23     }

其實,就是這樣SimpleDataStore實例會存儲在cachedInstances緩存中,下一次不會再創建,而是直接獲取該緩存。

 

這樣之后,一個AllChannelHandler實例就完成了,該實例屬性如下

  • WrappedChannelHandler.url:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&channel.readonly.sent=true&codec=dubbo&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1287&side=provider&threadname=DubboServerHandler-10.10.10.10:20880&timestamp=1507116859919
  • WrappedChannelHandler.handler:DecodeHandler對象
  • WrappedChannelHandler.executor:FixedThreadPool實例

當然還有一個類變量WrappedChannelHandler.SHARED_EXECUTOR=CachedThreadPool實例。

 

之后AllChannelHandler實例會被HeartbeatHandler進行包裹,之后HeartbeatHandler實例又會被MultiMessageHandler所包裹,最后得到的MultiMessageHandler實例的層級結構如下:

1 MultiMessageHandler
2 -->handler: HeartbeatHandler
3    -->handler: AllChannelHandler
4          -->url: providerUrl
5          -->executor: FixedExecutor
6          -->handler: DecodeHandler
7             -->handler: HeaderExchangeHandler
8                -->handler: ExchangeHandlerAdapter

 

MultiMessageHandler實例創建出來之后,NettyServer就開始調用其各個父類進行屬性的初始化了。首先來看一下NettyServer的父類層級圖:

AbstractServer:

 1     protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
 2     ExecutorService executor;
 3     private InetSocketAddress localAddress;
 4     private InetSocketAddress bindAddress;
 5     private int accepts;
 6     private int idleTimeout = 600;
 7 
 8     public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
 9         super(url, handler);
10         localAddress = getUrl().toInetSocketAddress();
11         String host = url.getParameter(Constants.ANYHOST_KEY, false)
12                 || NetUtils.isInvalidLocalHost(getUrl().getHost())
13                 ? NetUtils.ANYHOST : getUrl().getHost();
14         bindAddress = new InetSocketAddress(host, getUrl().getPort());
15         this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
16         this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
17         try {
18             doOpen();
19             if (logger.isInfoEnabled()) {
20                 logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
21             }
22         } catch (Throwable t) {
23             throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
24                     + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
25         }
26         //fixme replace this with better method
27         DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
28         executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
29     }

首先調用父類初始化屬性,之后啟動服務。

AbstractEndpoint:

 1     private Codec2 codec;
 2     private int timeout;
 3     private int connectTimeout;
 4 
 5     public AbstractEndpoint(URL url, ChannelHandler handler) {
 6         super(url, handler);
 7         this.codec = getChannelCodec(url);
 8         this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);//1000
 9         this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);//3000
10     }

AbstractPeer:

 1     private final ChannelHandler handler;
 2     private volatile URL url;
 3 
 4     public AbstractPeer(URL url, ChannelHandler handler) {
 5         if (url == null) {
 6             throw new IllegalArgumentException("url == null");
 7         }
 8         if (handler == null) {
 9             throw new IllegalArgumentException("handler == null");
10         }
11         this.url = url;
12         this.handler = handler;
13     }

來看一下最后初始化好的NettyServer實例:

  • url:providerUrl(dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&channel.readonly.sent=true&codec=dubbo&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1287&side=provider&timestamp=1507116859919)
  • handler:MultiMessageHandler實例
  • codec:DubboCountCodec實例
  • timeout:1000
  • connectTimeout:3000
  • idleTime:600*1000
  • localAddress:10.10.10.10:20880
  • bindAddress:0.0.0.0:20880
  • accepts:0
  • executor:null(此時的executor還沒實例話,要等netty服務起來之后才會從緩存中獲取之前存儲在SimpleDataStore緩存中的那個200個線程數的FixedThreadPool實例)

 

之后,就要啟動netty服務了。

 1     /**
 2      * 啟動netty服務,監聽客戶端連接
 3      */
 4     @Override
 5     protected void doOpen() throws Throwable {
 6         NettyHelper.setNettyLoggerFactory();
 7         ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
 8         ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
 9         ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
10         bootstrap = new ServerBootstrap(channelFactory);
11 
12         final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
13         channels = nettyHandler.getChannels();
14         // https://issues.jboss.org/browse/NETTY-365
15         // https://issues.jboss.org/browse/NETTY-379
16         // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
17         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
18             public ChannelPipeline getPipeline() {
19                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
20                 ChannelPipeline pipeline = Channels.pipeline();
21                 /*int idleTimeout = getIdleTimeout();
22                 if (idleTimeout > 10000) {
23                     pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
24                 }*/
25                 pipeline.addLast("decoder", adapter.getDecoder());
26                 pipeline.addLast("encoder", adapter.getEncoder());
27                 pipeline.addLast("handler", nettyHandler);
28                 return pipeline;
29             }
30         });
31         // bind
32         channel = bootstrap.bind(getBindAddress());
33     }

說明:

  • boss線程數默認只有一個;
  • worker線程數:Runtime.getRuntime().availableProcessors() + 1,為計算機核數+1;
  • 服務端邏輯處理器為NettyHandler:
  • 編碼器為:InternalEncoder實例,內部使用NettyServer的DubboCountCodec實例來編碼
  • 解碼器為:InternalDecoder實例,內部使用NettyServer的DubboCountCodec實例來解碼

 NettyHandler:

 1 @Sharable
 2 public class NettyHandler extends SimpleChannelHandler {
 3     private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
 4     private final URL url;
 5     private final ChannelHandler handler;
 6 
 7     public NettyHandler(URL url, ChannelHandler handler) {
 8         if (url == null) {
 9             throw new IllegalArgumentException("url == null");
10         }
11         if (handler == null) {
12             throw new IllegalArgumentException("handler == null");
13         }
14         this.url = url;
15         this.handler = handler;
16     }
17 
18     public Map<String, Channel> getChannels() {
19         return channels;
20     }
21 
22     @Override
23     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
24         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
25         try {
26             if (channel != null) {
27                 channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel);
28             }
29             handler.connected(channel);
30         } finally {
31             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
32         }
33     }
34 
35     @Override
36     public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
37         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
38         try {
39             channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()));
40             handler.disconnected(channel);
41         } finally {
42             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
43         }
44     }
45 
46     @Override
47     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
48         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
49         try {
50             handler.received(channel, e.getMessage());
51         } finally {
52             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
53         }
54     }
55 
56     @Override
57     public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
58         super.writeRequested(ctx, e);
59         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
60         try {
61             handler.sent(channel, e.getMessage());
62         } finally {
63             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
64         }
65     }
66 
67     @Override
68     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
69         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
70         try {
71             handler.caught(channel, e.getCause());
72         } finally {
73             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
74         }
75     }
76 }

說明:

屬性

  • handler:就是當前的NettyServer實例
  • url:providerUrl
  • channels:存放連接到來的channel

監聽連接完成/連接斷開/接收到消息/發送完消息/異常捕捉事件,之后使用NettyServer實例進行相應的處理,NettyServer又會調用MultiMessageHandler實例(該handler屬性位於NettyServer的父類AbstractPeer中)進行處理。

 

在來看編碼器和解碼器:

NettyCodecAdapter(DubboCountCodec實例, providerUrl, 當前的NettyServer實例)

 1 final class NettyCodecAdapter {
 2     private final ChannelHandler encoder = new InternalEncoder();
 3     private final ChannelHandler decoder = new InternalDecoder();
 4     private final Codec2 codec;
 5     private final URL url;
 6     private final int bufferSize;
 7     private final com.alibaba.dubbo.remoting.ChannelHandler handler;
 8 
 9     public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
10         this.codec = codec;
11         this.url = url;
12         this.handler = handler;
13         int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);//8*1024
14         this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;//8*1024
15     }
16 
17     public ChannelHandler getEncoder() {
18         return encoder;
19     }
20 
21     public ChannelHandler getDecoder() {
22         return decoder;
23     }
24 
25     @Sharable
26     private class InternalEncoder extends OneToOneEncoder {
27         @Override
28         protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
29             ...
30             codec.encode(channel, buffer, msg);
31             ...
32         }
33     }
34 
35     private class InternalDecoder extends SimpleChannelUpstreamHandler {
36         @Override
37         public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
38            ...
39             msg = codec.decode(channel, message);
40            ...
41         }
42         ...
43     }
44 }

只列出核心代碼:可以看到,InternalEncoder實例和InternalDecoder實例內部還是使用NettyServer的DubboCountCodec實例來編解碼的。dubbo的編解碼做的非常好,后續會寫。

 

到此為止,NettyServer就創建成功了。 之后,終於執行到了:

new HeaderExchangeServer(Server NettyServer)

 1     private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, new NamedThreadFactory("dubbo-remoting-server-heartbeat",true));
 2     private final Server server;
 3     // 心跳定時器
 4     private ScheduledFuture<?> heatbeatTimer;
 5     // 心跳超時,毫秒。缺省0,不會執行心跳。
 6     private int heartbeat;
 7     private int heartbeatTimeout;
 8     private AtomicBoolean closed = new AtomicBoolean(false);
 9 
10     public HeaderExchangeServer(Server server) {
11         if (server == null) {
12             throw new IllegalArgumentException("server == null");
13         }
14         this.server = server;
15         this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);//60000 在createServer(URL providerUrl)中拼接了heartbeat參數
16         this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);//3*60000
17         if (heartbeatTimeout < heartbeat * 2) {
18             throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
19         }
20         startHeatbeatTimer();
21     }

說明:

  • 屬性
    • scheduled:是一個有1個名字為dubbo-remoting-server-heartbeat的后台線程的定時線程池;
    • server:之前創建出來的NettyServer實例;
    • heartbeatTimer:心跳計時器
    • heartbeat:心跳時間,該參數會在HeaderExchangeServer的構造器中進行賦值,60000
    • heartbeatTimeout:心跳超時時間,超過該時間,會進行channel重連,180000
  • 啟動心跳計時器

startHeatbeatTimer()

 1     private void startHeatbeatTimer() {
 2         stopHeartbeatTimer();
 3         if (heartbeat > 0) {
 4             heatbeatTimer = scheduled.scheduleWithFixedDelay(
 5                     new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
 6                         public Collection<Channel> getChannels() {
 7                             return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels());
 8                         }
 9                     }, heartbeat, heartbeatTimeout),
10                     heartbeat,
11                     heartbeat,
12                     TimeUnit.MILLISECONDS);
13         }
14     }
15 
16     private void stopHeartbeatTimer() {
17         try {
18             ScheduledFuture<?> timer = heatbeatTimer;
19             if (timer != null && !timer.isCancelled()) {
20                 timer.cancel(true);
21             }
22         } catch (Throwable t) {
23             logger.warn(t.getMessage(), t);
24         } finally {
25             heatbeatTimer = null;
26         }
27     }

首先停掉之前的計時器,之后在線程創建開始heartbeat毫秒(60s)后執行第一次HeartBeatTask任務,之后每隔heartbeat毫秒(60s)執行一次HeartBeatTask任務。來看一下HeartBeatTask:

HeartBeatTask

 1 final class HeartBeatTask implements Runnable {
 2     private ChannelProvider channelProvider;
 3     private int heartbeat;//60s
 4     private int heartbeatTimeout;//180s
 5 
 6     HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
 7         this.channelProvider = provider;
 8         this.heartbeat = heartbeat;
 9         this.heartbeatTimeout = heartbeatTimeout;
10     }
11 
12     public void run() {
13         try {
14             long now = System.currentTimeMillis();
15             for (Channel channel : channelProvider.getChannels()) {
16                 if (channel.isClosed()) {
17                     continue;
18                 }
19                 try {
20                     Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);//"READ_TIMESTAMP"
21                     Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);//"WRITE_TIMESTAMP"
22                     //如果最后一次讀和寫在heartbeat時間(60s)內,則最后一次的讀和寫本身可以看作心跳;否則,需要程序發送心跳
23                     if ((lastRead != null && now - lastRead > heartbeat)
24                             || (lastWrite != null && now - lastWrite > heartbeat)) {
25                         Request req = new Request();
26                         req.setVersion("2.0.0");
27                         req.setTwoWay(true);
28                         req.setEvent(Request.HEARTBEAT_EVENT);
29                         channel.send(req);
30                         if (logger.isDebugEnabled()) {
31                             logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
32                                     + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
33                         }
34                     }
35                     //如果最后一次讀的時間距離現在已經超過heartbeatTimeout了,我們認為channel已經斷了(因為在這個過程中,發送了三次心跳都沒反應),此時channel進行重連
36                     if (lastRead != null && now - lastRead > heartbeatTimeout) {
37                         logger.warn("Close channel " + channel
38                                 + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
39                         if (channel instanceof Client) {
40                             try {
41                                 ((Client) channel).reconnect();
42                             } catch (Exception e) {
43                                 //do nothing
44                             }
45                         } else {
46                             channel.close();
47                         }
48                     }
49                 } catch (Throwable t) {
50                     logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
51                 }
52             }
53         } catch (Throwable t) {
54             logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
55         }
56     }
57 
58     interface ChannelProvider {
59         Collection<Channel> getChannels();
60     }
61 }

說明:

  • 屬性
    • channelProvider在startHeatbeatTimer()中創建,並且獲取了當前的HeaderExchangeServer的所有channels
    • heartbeat:60s
    • heartbeatTimeout:180s
  • run()
    • 如果最后一次讀和寫的時間距離現在在heartbeat時間(60s)內,則最后一次的讀和寫本身可以看作心跳;否則,發送心跳
    • 如果最后一次讀的時間距離現在已經超過heartbeatTimeout了,認為channel已經斷了(因為在這個過程中,發送了三次心跳都沒反應),此時channel進行重連

到現在一個完整的ExchangeServer就OK了。之后我們將創建出來的ExchangeServer實例存放在DubboProtocol的Map<String, ExchangeServer> serverMap屬性中:

{ "10.10.10.10:20880" : ExchangeServer實例 }

最后,DubboProtocol.export(Invoker<T> invoker)將之前創建的DubboExporter實例返回。

 

2.4  創建RegistryProtocol.ExporterChangeableWrapper來封裝Exporter和originInvoker

1 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker)
 1     private class ExporterChangeableWrapper<T> implements Exporter<T> {
 2         private final Invoker<T> originInvoker;
 3         private Exporter<T> exporter;
 4 
 5         public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
 6             this.exporter = exporter;
 7             this.originInvoker = originInvoker;
 8         }
 9 
10         public Invoker<T> getOriginInvoker() {
11             return originInvoker;
12         }
13 
14         public Invoker<T> getInvoker() {
15             return exporter.getInvoker();
16         }
17 
18         public void setExporter(Exporter<T> exporter) {
19             this.exporter = exporter;
20         }
21 
22         public void unexport() {
23             String key = getCacheKey(this.originInvoker);
24             bounds.remove(key);
25             exporter.unexport();
26         }
27     }

ExporterChangeableWrapper類是RegistryProtocol的私有內部類

最后,將<providerUrl, ExporterChangeableWrapper實例>放入RegistryProtocol的屬性Map<String, ExporterChangeableWrapper<?>> bounds中。

  • key:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=744&side=provider&timestamp=1507176748026
  • value:RegistryProtocol$ExporterChangeableWrapper實例
    • originInvoker:即AbstractProxyInvoker實例屬性如下:
      • proxy:DemoServiceImpl實例
      • type:Class<com.alibaba.dubbo.demo.DemoService>
      • url:registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D993%26side%3Dprovider%26timestamp%3D1507100322516&pid=993&registry=zookeeper&timestamp=1507100319830
    • DubboExporter實例
      • key:com.alibaba.dubbo.demo.DemoService:20880
      • invoker:"InvokerDelegete的filter對象"
      • exporterMap:{ "com.alibaba.dubbo.demo.DemoService:20880" -> 當前的DubboExporter實例 }

 

到此為止,RegistryProtocol.export(final Invoker<T> originInvoker)的第一行代碼就完成了。

 1     public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
 2         //export invoker
 3         final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
 4         //registry provider
 5         final Registry registry = getRegistry(originInvoker);
 6         final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
 7         registry.register(registedProviderUrl);
 8         // 訂閱override數據
 9         // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,因為subscribed以服務名為緩存的key,導致訂閱信息覆蓋。
10         final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
11         final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
12         overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
13         registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
14         //保證每次export都返回一個新的exporter實例
15         return new Exporter<T>() {
16             public Invoker<T> getInvoker() {
17                 return exporter.getInvoker();
18             }
19 
20             public void unexport() {
21                 try {
22                     exporter.unexport();
23                 } catch (Throwable t) {
24                     logger.warn(t.getMessage(), t);
25                 }
26                 try {
27                     registry.unregister(registedProviderUrl);
28                 } catch (Throwable t) {
29                     logger.warn(t.getMessage(), t);
30                 }
31                 try {
32                     overrideListeners.remove(overrideSubscribeUrl);
33                     registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
34                 } catch (Throwable t) {
35                     logger.warn(t.getMessage(), t);
36                 }
37             }
38         };
39     }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM