為了安全:服務啟動的ip全部使用10.10.10.10
遠程服務的暴露總體步驟:
- 將ref封裝為invoker
- 將invoker轉換為exporter
- 啟動netty
- 注冊服務到zookeeper
- 訂閱
- 返回新的exporter實例
服務遠程暴露的代碼:
1 //如果配置不是local則暴露為遠程服務.(配置為local,則表示只暴露本地服務) 2 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { 3 if (logger.isInfoEnabled()) { 4 logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); 5 } 6 if (registryURLs != null && registryURLs.size() > 0 7 && url.getParameter("register", true)) { 8 for (URL registryURL : registryURLs) { 9 url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); 10 URL monitorUrl = loadMonitor(registryURL); 11 if (monitorUrl != null) { 12 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); 13 } 14 if (logger.isInfoEnabled()) { 15 logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); 16 } 17 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); 18 Exporter<?> exporter = protocol.export(invoker); 19 exporters.add(exporter); 20 } 21 } else { 22 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); 23 Exporter<?> exporter = protocol.export(invoker); 24 exporters.add(exporter); 25 } 26 }
首先將實現類ref封裝為Invoker,之后將invoker轉換為exporter,最后將exporter放入緩存List<Exporter> exporters中。
一 將實現類ref封裝為Invoker
1 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
1 為registryURL拼接export=providerUrl參數
一開始的registryURL:
registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&pid=887®istry=zookeeper×tamp=1507096022072
registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())這句代碼為registryURL添加了參數並編碼:(這里給出沒有編碼的樣子)
1 export=dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=887&side=provider×tamp=1507096024334
2 ProxyFactory$Adaptive.getInvoker(DemoServiceImpl實例, Class<DemoService>, registryURL)
1 public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException { 2 if (arg2 == null) 3 throw new IllegalArgumentException("url == null"); 4 com.alibaba.dubbo.common.URL url = arg2; 5 String extName = url.getParameter("proxy", "javassist");//結果是javassist 6 if(extName == null) 7 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); 8 com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); 9 return extension.getInvoker(arg0, arg1, arg2); 10 }
這里,本來是調用JavassistProxyFactory的getInvoker方法,但是JavassistProxyFactory被StubProxyFactoryWrapper給aop了。
3 StubProxyFactoryWrapper.getInvoker(DemoServiceImpl實例, Class<DemoService>, registryURL)
1 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException { 2 return proxyFactory.getInvoker(proxy, type, url); 3 }
4 JavassistProxyFactory.getInvoker(DemoServiceImpl實例, Class<DemoService>, registryURL)
1 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { 2 // TODO Wrapper類不能正確處理帶$的類名 3 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); 4 return new AbstractProxyInvoker<T>(proxy, type, url) { 5 @Override 6 protected Object doInvoke(T proxy, String methodName, 7 Class<?>[] parameterTypes, 8 Object[] arguments) throws Throwable { 9 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); 10 } 11 }; 12 }
首先是創建Wrapper類:Wrapper.getWrapper(Class<DemoServiceImpl>)。該類記錄了DemoServiceImpl的屬性名稱,方法名稱等信息。關鍵代碼如下:(完整代碼見:7.2 服務本地暴露)
1 import com.alibaba.dubbo.common.bytecode.Wrapper; 2 import java.util.HashMap; 3 4 public class Wrapper1 extends Wrapper { 5 6 public static String[] pns;//property name array 7 public static java.util.Map pts = new HashMap();//<property key, property value> 8 public static String[] mns;//method names 9 public static String[] dmns;// 10 public static Class[] mts0; 55 /** 56 * @param o 實現類 57 * @param n 方法名稱 58 * @param p 參數類型 59 * @param v 參數名稱 60 * @return 61 * @throws java.lang.reflect.InvocationTargetException 62 */ 63 public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException { 64 com.alibaba.dubbo.demo.provider.DemoServiceImpl w; 65 try { 66 w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl) o); 67 } catch (Throwable e) { 68 throw new IllegalArgumentException(e); 69 } 70 try { 71 if ("sayHello".equals(n) && p.length == 1) { 72 return ($w) w.sayHello((java.lang.String) v[0]); 73 } 74 } catch (Throwable e) { 75 throw new java.lang.reflect.InvocationTargetException(e); 76 } 77 throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + n + "\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl."); 78 } 79 }
創建完DemoServiceImpl的Wrapper類之后(實際上該實例在本地暴露的時候已經存入緩存了,這里只是從緩存中拿出來而已),創建一個AbstractProxyInvoker實例。
1 private final T proxy; 2 private final Class<T> type; 3 private final URL url; 4 5 public AbstractProxyInvoker(T proxy, Class<T> type, URL url) { 6 if (proxy == null) { 7 throw new IllegalArgumentException("proxy == null"); 8 } 9 if (type == null) { 10 throw new IllegalArgumentException("interface == null"); 11 } 12 if (!type.isInstance(proxy)) { 13 throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type); 14 } 15 this.proxy = proxy; 16 this.type = type; 17 this.url = url; 18 }
最后創建完成的AbstractProxyInvoker實例屬性如下:
- proxy:DemoServiceImpl實例
- type:Class<com.alibaba.dubbo.demo.DemoService>
- url:registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D993%26side%3Dprovider%26timestamp%3D1507100322516&pid=993®istry=zookeeper×tamp=1507100319830
這樣我們就將ref實現類轉換成了Invoker,之后在調用該invoker.invoke(Invocation invocation)的時候,會調用invoker.doInvoke(T proxy, String methodName,Class<?>[] parameterTypes, Object[] arguments)的時候,就會調用相應的實現類proxy的wrapper類的invokeMethod(proxy, methodName, parameterTypes, arguments),該方法又會調用真實的實現類methodName方法。這里可以先給出AbstractProxyInvoker.invoke(Invocation invocation)源碼:
1 public Result invoke(Invocation invocation) throws RpcException { 2 try { 3 return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments())); 4 } catch (InvocationTargetException e) { 5 return new RpcResult(e.getTargetException()); 6 } catch (Throwable e) { 7 throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); 8 } 9 }
這里的proxy就是上邊賦好值的proxy:DemoServiceImpl實例。而方法信息會封裝在Invocation對象中,該對象在服務引用時介紹。
二 將Invoker轉換為Exporter
1 Exporter<?> exporter = protocol.export(invoker)
1 Protocol$Adaptive.export(com.alibaba.dubbo.rpc.Invoker AbstractProxyInvoker實例)
1 public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { 2 if (arg0 == null) 3 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); 4 if (arg0.getUrl() == null) 5 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); 6 com.alibaba.dubbo.common.URL url = arg0.getUrl(); 7 String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());//registry 8 if(extName == null) 9 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); 10 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); 11 return extension.export(arg0); 12 }
這里,由於aop的原因,首先調用了ProtocolListenerWrapper的export(Invoker<T> invoker),如下:
1 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 2 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { 3 return protocol.export(invoker); 4 } 5 return new ListenerExporterWrapper<T>(protocol.export(invoker), 6 Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); 7 }
由於協議是“registry”,所以不做任何處理,繼續調用ProtocolFilterWrapper的export(Invoker<T> invoker),如下:
1 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { 2 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { 3 return protocol.export(invoker); 4 } 5 return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); 6 }
同理,由於協議是“registry”,所以不做任何處理,繼續調用RegistryProtocol.export(final Invoker<T> originInvoker),如下:
1 public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { 2 //export invoker 3 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); 4 //registry provider 5 final Registry registry = getRegistry(originInvoker); 6 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); 7 registry.register(registedProviderUrl); 8 // 訂閱override數據 9 // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,因為subscribed以服務名為緩存的key,導致訂閱信息覆蓋。 10 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); 11 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); 12 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 13 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 14 //保證每次export都返回一個新的exporter實例 15 return new Exporter<T>() { 16 public Invoker<T> getInvoker() { 17 return exporter.getInvoker(); 18 } 19 20 public void unexport() { 21 try { 22 exporter.unexport(); 23 } catch (Throwable t) { 24 logger.warn(t.getMessage(), t); 25 } 26 try { 27 registry.unregister(registedProviderUrl); 28 } catch (Throwable t) { 29 logger.warn(t.getMessage(), t); 30 } 31 try { 32 overrideListeners.remove(overrideSubscribeUrl); 33 registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); 34 } catch (Throwable t) { 35 logger.warn(t.getMessage(), t); 36 } 37 } 38 }; 39 }
該方法完成了遠程暴露的全部流程。
- 將invoker轉換為exporter
- 啟動netty
- 注冊服務到zookeeper
- 訂閱
- 返回新的exporter實例
2 將invoker轉換為exporter並啟動netty服務
1 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
doLocalExport(final Invoker<T> originInvoker)
1 /** 2 * 1 從invoker的URL中的Map<String, String> parameters中獲取key為export的地址providerUrl,該地址將是服務注冊在zk上的節點 3 * 2 從 Map<String, ExporterChangeableWrapper<?>> bounds 緩存中獲取key為上述providerUrl的exporter,如果有,直接返回,如果沒有,創建並返回 4 * @return 5 */ 6 @SuppressWarnings("unchecked") 7 private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { 8 String key = getCacheKey(originInvoker);//根據originInvoker獲取providerUrl 9 ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); 10 if (exporter == null) { 11 synchronized (bounds) { 12 exporter = (ExporterChangeableWrapper<T>) bounds.get(key); 13 if (exporter == null) { 14 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));//存儲originInvoker和providerUrl 15 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); 16 bounds.put(key, exporter); 17 } 18 } 19 } 20 return exporter; 21 }
2.1 從originInvoker中獲取providerUrl
該方法直接首先調用getCacheKey(final Invoker<?> originInvoker)中獲取providerUrl,這里的originInvoker就是上述創建出來的AbstractProxyInvoker實例,注意他的url是registry協議的,該url的export參數的value就是我們要獲取的providerUrl。獲取providerUrl的源碼如下:
1 private String getCacheKey(final Invoker<?> originInvoker) { 2 URL providerUrl = getProviderUrl(originInvoker); 3 String key = providerUrl.removeParameters("dynamic", "enabled").toFullString(); 4 return key; 5 } 6 7 private URL getProviderUrl(final Invoker<?> origininvoker) { 8 String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY); 9 if (export == null || export.length() == 0) { 10 throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl()); 11 } 12 13 URL providerUrl = URL.valueOf(export); 14 return providerUrl; 15 }
之后一系列的操作,就是獲取該providerUrl對應的exporter,之后放入緩存Map<String, ExporterChangeableWrapper<?>> bounds中,所以一個providerUrl只會對應一個exporter。
2.2 創建InvokerDelegete
1 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
InvokerDelegete是RegistryProtocol的一個靜態內部類,該類是一個originInvoker的委托類,該類存儲了originInvoker,其父類InvokerWrapper還會存儲providerUrl,InvokerWrapper會調用originInvoker的invoke方法,也會銷毀invoker。可以管理invoker的生命周期。
1 public static class InvokerDelegete<T> extends InvokerWrapper<T> { 2 private final Invoker<T> invoker; 3 4 /** 5 * @param invoker 6 * @param url invoker.getUrl返回此值 7 */ 8 public InvokerDelegete(Invoker<T> invoker, URL url) { 9 super(invoker, url); 10 this.invoker = invoker; 11 } 12 13 public Invoker<T> getInvoker() { 14 if (invoker instanceof InvokerDelegete) { 15 return ((InvokerDelegete<T>) invoker).getInvoker(); 16 } else { 17 return invoker; 18 } 19 } 20 }
InvokerWrapper的核心代碼:
1 public class InvokerWrapper<T> implements Invoker<T> { 2 private final Invoker<T> invoker;//originInvoker 3 private final URL url;//providerUrl 4 5 public InvokerWrapper(Invoker<T> invoker, URL url) { 6 this.invoker = invoker; 7 this.url = url; 8 } 9 10 public boolean isAvailable() { 11 return invoker.isAvailable(); 12 } 13 14 public Result invoke(Invocation invocation) throws RpcException { 15 return invoker.invoke(invocation); 16 } 17 18 public void destroy() { 19 invoker.destroy(); 20 } 21 }
這樣一個InvokerDelegete對象就創建好了,屬性如下:
- invoker:originInvoker(AbstractProxyInvoker對象)
- InvokerWrapper.invoker:originInvoker(AbstractProxyInvoker對象)
- url:providerUrl(dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1035&side=provider×tamp=1507101286063)
2.3 使用DubboProtocol將InvokerDelegete轉換為Exporter
1 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker)
2.3.1 Protocol$Adaptive.export(com.alibaba.dubbo.rpc.Invoker InvokerDelegete對象)
1 public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { 2 if (arg0 == null) 3 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); 4 if (arg0.getUrl() == null) 5 throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); 6 com.alibaba.dubbo.common.URL url = arg0.getUrl(); 7 String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());//dubbo 8 if(extName == null) 9 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); 10 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); 11 return extension.export(arg0); 12 }
該代碼再貼最后一遍了。之后調用ProtocolListenerWrapper的ProtocolListenerWrapper.export(Invoker<T> InvokerDelegete),之后調用ProtocolFilterWrapper.export(Invoker<T> InvokerDelegete):首先對InvokerDelegete對象進行8個filter的遞歸包裝,之后使用DubboProtocol對包裝后的InvokerDelegete對象進行export。
層層包裝的源碼:
1 /** 2 * 1 根據key從url中獲取相應的filter的values,再根據這個values和group去獲取類上帶有@Active注解的filter集合 3 * 2 之后將這些filter對傳入的invoker進行遞歸包裝層invoker(就是一個鏈表) 4 */ 5 private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { 6 Invoker<T> last = invoker; 7 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); 8 if (filters.size() > 0) { 9 for (int i = filters.size() - 1; i >= 0; i--) { 10 final Filter filter = filters.get(i); 11 final Invoker<T> next = last; 12 last = new Invoker<T>() { 13 14 public Class<T> getInterface() { 15 return invoker.getInterface(); 16 } 17 18 public URL getUrl() { 19 return invoker.getUrl(); 20 } 21 22 public boolean isAvailable() { 23 return invoker.isAvailable(); 24 } 25 26 public Result invoke(Invocation invocation) throws RpcException { 27 return filter.invoke(next, invocation); 28 } 29 30 public void destroy() { 31 invoker.destroy(); 32 } 33 34 @Override 35 public String toString() { 36 return invoker.toString(); 37 } 38 }; 39 } 40 } 41 return last; 42 }
上述方法中最重要的就是Invoker的Result invoke(Invocation invocation),在該方法中,是使用了filter.invoke(next, invocation),而這里的next又可能是另一個filter。這里我們打開一個filter來看一下源碼:
1 @Activate(group = Constants.PROVIDER, order = -110000) 2 public class EchoFilter implements Filter { 3 public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException { 4 if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) 5 return new RpcResult(inv.getArguments()[0]); 6 return invoker.invoke(inv); 7 } 8 }
可以看到,該filter會調用傳入的next的invoke方法。
這里給出被遞歸包裝后的對象:(命名為InvokerDelegete的filter對象)
1 EchoFilter 2 -->ClassLoaderFilter 3 -->GenericFilter 4 -->ContextFilter 5 -->TraceFilter 6 -->TimeoutFilter 7 -->MonitorFilter 8 -->ExceptionFilter 9 -->InvokerDelegete對象
2.3.2 DubboProtocol.export(Invoker<T> InvokerDelegete的filter對象)
/** * 1 從invoker的url中獲取將要暴露的遠程服務的key:com.alibaba.dubbo.demo.DemoService:20880(實際上是:serviceGroup/serviceName:serviceVersion:port) * 注意:本地暴露的key就是:com.alibaba.dubbo.demo.DemoService * 2 打開ExchangeServer */ public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } openServer(url); return exporter; }
首先從“InvokerDelegete的filter對象”中的url獲取key,這段代碼很簡單,就是獲取serviceGroup/serviceName:serviceVersion:port這樣形式的一個key,這里最后獲取到的是com.alibaba.dubbo.demo.DemoService:20880。
之后創建DubboExporter。
2.3.2.1 DubboExporter<T>(InvokerDelegete的filter對象, "com.alibaba.dubbo.demo.DemoService:20880", exporterMap)
1 public class DubboExporter<T> extends AbstractExporter<T> { 2 //serviceGroup/serviceName:serviceVersion:port, 例如:com.alibaba.dubbo.demo.DemoService:20880 3 private final String key;// 4 //{ "com.alibaba.dubbo.demo.DemoService:20880" -> 當前的DubboExporter實例 } 5 private final Map<String, Exporter<?>> exporterMap; 6 7 public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) { 8 super(invoker); 9 this.key = key; 10 this.exporterMap = exporterMap; 11 } 12 13 @Override 14 public void unexport() { 15 super.unexport(); 16 exporterMap.remove(key); 17 } 18 }
注意這里的exporterMap是引用傳遞。
父類:
1 public abstract class AbstractExporter<T> implements Exporter<T> { 2 protected final Logger logger = LoggerFactory.getLogger(getClass()); 3 private final Invoker<T> invoker; 4 private volatile boolean unexported = false; 5 6 public AbstractExporter(Invoker<T> invoker) { 7 if (invoker == null) 8 throw new IllegalStateException("service invoker == null"); 9 if (invoker.getInterface() == null) 10 throw new IllegalStateException("service type == null"); 11 if (invoker.getUrl() == null) 12 throw new IllegalStateException("service url == null"); 13 this.invoker = invoker; 14 } 15 16 public Invoker<T> getInvoker() { 17 return invoker; 18 } 19 20 public void unexport() { 21 if (unexported) { 22 return; 23 } 24 unexported = true; 25 getInvoker().destroy(); 26 } 27 28 public String toString() { 29 return getInvoker().toString(); 30 } 31 }
這里,我們把一個“InvokerDelegete的filter對象”賦給了AbstractExporter的Invoker引用,也就是說從exporter中可以獲取到invoker。最后在DubboProtocol.export(Invoker<T> invoker)中執行:exporterMap.put(key, exporter); 這樣就將{ "com.alibaba.dubbo.demo.DemoService:20880" -> 當前的DubboExporter實例 }存儲起來了。
來看一下現在的DubboExporter實例:
- key:com.alibaba.dubbo.demo.DemoService:20880
- invoker:“InvokerDelegete的filter對象”
- exporterMap:{ "com.alibaba.dubbo.demo.DemoService:20880" -> 當前的DubboExporter實例 }
2.3.2.2 開啟ExchangeServer
1 /** 2 * 從緩存Map<String, ExchangeServer> serverMap中根據"host:port"獲取ExchangeServer,如果沒有,創建ExchangeServer,之后放入緩存。 3 * @param url 4 */ 5 private void openServer(URL url) { 6 // find server. 7 String key = url.getAddress();//10.10.10.10:20880 8 //client 也可以暴露一個只有server可以調用的服務。 9 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); 10 if (isServer) { 11 ExchangeServer server = serverMap.get(key); 12 if (server == null) { 13 serverMap.put(key, createServer(url)); 14 } else { 15 //server支持reset,配合override功能使用 16 server.reset(url); 17 } 18 } 19 }
首先從provderUrl中獲取host:port作為key,之后從緩存serverMap中獲取ExchangeServer,如果沒有,創建ExchangeServer,最后以如下方式放入緩存:
Map<String, ExchangeServer> serverMap:{ "10.10.10.10:20880"<->ExchangeServer實例 }。
創建ExchangeServer:createServer(URL providerUrl)
1 private ExchangeServer createServer(URL url) { 2 //默認開啟server關閉時發送readonly事件 3 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); 4 //默認開啟heartbeat 5 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); 6 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); 7 8 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) 9 throw new RpcException("Unsupported server type: " + str + ", url: " + url); 10 11 url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); 12 ExchangeServer server; 13 try { 14 server = Exchangers.bind(url, requestHandler); 15 } catch (RemotingException e) { 16 throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); 17 } 18 str = url.getParameter(Constants.CLIENT_KEY); 19 if (str != null && str.length() > 0) { 20 Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); 21 if (!supportedTypes.contains(str)) { 22 throw new RpcException("Unsupported client type: " + str); 23 } 24 } 25 return server; 26 }
首先是在原本providerUrl上添加參數:channel.readonly.sent=true&heartbeat=60000&codec=dubbo(其中的heartbeat參數會在HeaderExchangeServer啟動心跳計時器時使用)
之后使用Exchangers.bind("添加參數后的providerUrl", requestHandler)創建ExchangeServer。首先來看一下DubboProtocol#requestHandler。這個類極其重要,后續經過層層包裝后,會成為最終netty的服務端邏輯處理器。
1 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { 2 public Object reply(ExchangeChannel channel, Object message) throws RemotingException { 3 if (message instanceof Invocation) { 4 Invocation inv = (Invocation) message; 5 Invoker<?> invoker = getInvoker(channel, inv); 6 //如果是callback 需要處理高版本調用低版本的問題 7 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { 8 String methodsStr = invoker.getUrl().getParameters().get("methods"); 9 boolean hasMethod = false; 10 if (methodsStr == null || methodsStr.indexOf(",") == -1) { 11 hasMethod = inv.getMethodName().equals(methodsStr); 12 } else { 13 String[] methods = methodsStr.split(","); 14 for (String method : methods) { 15 if (inv.getMethodName().equals(method)) { 16 hasMethod = true; 17 break; 18 } 19 } 20 } 21 if (!hasMethod) { 22 logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); 23 return null; 24 } 25 } 26 RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); 27 return invoker.invoke(inv); 28 } 29 throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); 30 } 31 32 @Override 33 public void received(Channel channel, Object message) throws RemotingException { 34 if (message instanceof Invocation) { 35 reply((ExchangeChannel) channel, message); 36 } else { 37 super.received(channel, message); 38 } 39 } 40 41 @Override 42 public void connected(Channel channel) throws RemotingException { 43 invoke(channel, Constants.ON_CONNECT_KEY); 44 } 45 46 @Override 47 public void disconnected(Channel channel) throws RemotingException { 48 if (logger.isInfoEnabled()) { 49 logger.info("disconected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); 50 } 51 invoke(channel, Constants.ON_DISCONNECT_KEY); 52 } 53 54 private void invoke(Channel channel, String methodKey) { 55 Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); 56 if (invocation != null) { 57 try { 58 received(channel, invocation); 59 } catch (Throwable t) { 60 logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); 61 } 62 } 63 } 64 65 private Invocation createInvocation(Channel channel, URL url, String methodKey) { 66 String method = url.getParameter(methodKey); 67 if (method == null || method.length() == 0) { 68 return null; 69 } 70 RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]); 71 invocation.setAttachment(Constants.PATH_KEY, url.getPath()); 72 invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); 73 invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY)); 74 invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY)); 75 if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { 76 invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); 77 } 78 return invocation; 79 } 80 };
從上可以看出在該handler中,定義了與客戶端連接成功/斷開連接/接受到客戶端消息/相應消息,以及創造Invocation的方法。其中的getInvoker(Channel channel, Invocation inv)方法簡碼如下:
1 String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); 2 DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); 3 return exporter.getInvoker();
這不就是我們剛剛放置到exporterMap中的DubboExporter,而其中的invoker不就是我們的“filter的invokerdelegete對象”。
使用Exchangers.bind(providerUrl, ExchangeHandlerAdapter對象)創建ExchangeServer
1 public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { 2 if (url == null) { 3 throw new IllegalArgumentException("url == null"); 4 } 5 if (handler == null) { 6 throw new IllegalArgumentException("handler == null"); 7 } 8 url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); 9 return getExchanger(url).bind(url, handler); 10 } 11 12 public static Exchanger getExchanger(URL url) { 13 String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);//header 14 return getExchanger(type); 15 } 16 17 public static Exchanger getExchanger(String type) { 18 return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); 19 }
getExchanger(URL url)返回一個HeaderExchanger實例。所以ExchangeServer的創建交由HeaderExchanger來實現。
HeaderExchanger.bind(providerUrl, ExchangeHandlerAdapter對象)
1 /** 2 * 1 對handler進行三次包裝:首先將ExchangeHandlerAdapter賦給HeaderExchangeHandler中的ExchangeHandler handler屬性;然后將創建出來的HeaderExchangeHandler賦給DecodeHandler的父類AbstractChannelHandlerDelegate的ChannelHandler handler屬性 3 */ 4 public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { 5 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); 6 }
說明:
- 這里首先對傳入的ExchangeHandlerAdapter進行了兩次包裝,最終得到DecodeHandler實例;
- 之后,使用Transporters.bind(providerUrl, DecodeHandler對象)創建了一個NettyServer;
- 最后使用HeaderExchangeServer包裝了上邊的NettyServer,並啟動了心跳計數器。
- HeaderExchangeServer實例也是最終返回的ExchangeServer實例,將最終被存儲在Map<String, ExchangeServer> serverMap:{ "10.10.10.10:20880"<->HeaderExchangeServer實例 }
包裝ExchangeHandlerAdapter,獲取DecodeHandler實例。代碼比較簡單,不列出來了。
最終獲取到的DecodeHandler實例的層級關系:
1 DecodeHandler實例 2 -->HeaderExchangeHandler實例 3 -->ExchangeHandlerAdapter實例
使用Transporters.bind(providerUrl, DecodeHandler對象)創建了一個NettyServer
Transporters.bind(providerUrl, DecodeHandler對象)
1 public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { 2 if (url == null) { 3 throw new IllegalArgumentException("url == null"); 4 } 5 if (handlers == null || handlers.length == 0) { 6 throw new IllegalArgumentException("handlers == null"); 7 } 8 ChannelHandler handler; 9 if (handlers.length == 1) { 10 handler = handlers[0]; 11 } else { 12 handler = new ChannelHandlerDispatcher(handlers); 13 } 14 return getTransporter().bind(url, handler); 15 } 16 17 public static Transporter getTransporter() { 18 return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); 19 }
Transporter$Adaptive.bind(providerUrl, DecodeHandler對象)
1 public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException { 2 if (arg0 == null) 3 throw new IllegalArgumentException("url == null"); 4 com.alibaba.dubbo.common.URL url = arg0; 5 String extName = url.getParameter("server", url.getParameter("transporter", "netty"));//netty 6 if(extName == null) 7 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])"); 8 com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName); 9 return extension.bind(arg0, arg1); 10 }
最后NettyServer的創建由NettyTransporter來創建。
NettyTransporter.bind(providerUrl, DecodeHandler對象)
1 public class NettyTransporter implements Transporter { 2 public static final String NAME = "netty"; 3 4 public Server bind(URL url, ChannelHandler listener) throws RemotingException { 5 return new NettyServer(url, listener); 6 } 7 8 public Client connect(URL url, ChannelHandler listener) throws RemotingException { 9 return new NettyClient(url, listener); 10 } 11 }
new NettyServer(providerUrl, DecodeHandler對象)
1 public NettyServer(URL url, ChannelHandler handler) throws RemotingException { 2 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); 3 }
這里首先為providerUrl添加參數:threadname=DubboServerHandler-10.10.10.10:20880(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME));
之后,使用ChannelHandlers.wrap(DecodeHandler對象, providerUrl)對DecodeHandler對象進行了三層包裝,最終得到MultiMessageHandler實例;
最后調用父類的構造器初始化NettyServer的各個屬性,最后啟動netty。
先看一下
ChannelHandlers.wrap(DecodeHandler對象, providerUrl)
1 /** 2 * 這里又是層層包裹: 3 * MultiMessageHandler 4 * --HeartbeatHandler 5 * --AllChannelHandler 6 * --DecodeHandler 7 * --HeaderExchangeHandler 8 * --ExchangeHandlerAdapter 9 * @param handler 10 * @param url 11 * @return 12 */ 13 protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { 14 return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) 15 .getAdaptiveExtension().dispatch(handler, url))); 16 }
ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()獲取到一個Dispatcher$Adaptive適配類。
Dispatcher$Adaptive.dispatch(DecodeHandler對象, providerUrl)
1 public com.alibaba.dubbo.remoting.ChannelHandler dispatch(com.alibaba.dubbo.remoting.ChannelHandler arg0, com.alibaba.dubbo.common.URL arg1) { 2 if (arg1 == null) 3 throw new IllegalArgumentException("url == null"); 4 com.alibaba.dubbo.common.URL url = arg1; 5 String extName = url.getParameter("dispatcher", url.getParameter("dispather", url.getParameter("channel.handler", "all")));//all 6 if(extName == null) 7 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + url.toString() + ") use keys([dispatcher, dispather, channel.handler])"); 8 com.alibaba.dubbo.remoting.Dispatcher extension = (com.alibaba.dubbo.remoting.Dispatcher)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Dispatcher.class).getExtension(extName); 9 return extension.dispatch(arg0, arg1); 10 }
這里獲取到AllDispatcher,Dispatcher決定了dubbo的線程模型,指定了哪些做什么,哪些線程做什么。講到dubbo通信的時候再寫。
AllDispatcher.dispatch(DecodeHandler對象, providerUrl)
1 public ChannelHandler dispatch(ChannelHandler handler, URL url) { 2 return new AllChannelHandler(handler, url); 3 }
new AllChannelHandler(DecodeHandler對象, providerUrl)
1 public AllChannelHandler(ChannelHandler handler, URL url) { 2 super(handler, url); 3 }
來看其父類的WrappedChannelHandler的構造器:
WrappedChannelHandler(DecodeHandler對象, providerUrl)
1 protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true)); 2 protected final ExecutorService executor; 3 protected final ChannelHandler handler; 4 protected final URL url; 5 6 public WrappedChannelHandler(ChannelHandler handler, URL url) { 7 this.handler = handler; 8 this.url = url; 9 executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); 10 11 String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; 12 if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { 13 componentKey = Constants.CONSUMER_SIDE; 14 } 15 DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); 16 dataStore.put(componentKey, Integer.toString(url.getPort()), executor);//{"java.util.concurrent.ExecutorService":{"20880":executor}} 17 }
首先創建了一個共享線程池:SHARED_EXECUTOR;
之后為handler/url/executor賦值,其中executor是一個200個線程數的fixed線程池(隊列為0,即同步隊列);
1 public Executor getExecutor(URL url) { 2 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);//默認為dubbo,但是我們這里是DubboServerHandler-10.10.10.10:20880(就是之前設置到url上的threadname) 3 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);//200 4 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);//0 5 return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 6 queues == 0 ? new SynchronousQueue<Runnable>() : 7 (queues < 0 ? new LinkedBlockingQueue<Runnable>() 8 : new LinkedBlockingQueue<Runnable>(queues)), 9 new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); 10 }
之后獲取了一個數據存儲器:SimpleDataStore;
最后將{"java.util.concurrent.ExecutorService":{"20880": executor}}數據存儲在SimpleDataStore的ConcurrentMap<String, ConcurrentMap<String, Object>> data數據結構中。也就是說:每一個端口,有一個線程池。
注意:為什么SimpleDataSource可以做緩存來使用?
1 public T getExtension(String name) { 2 if (name == null || name.length() == 0) 3 throw new IllegalArgumentException("Extension name == null"); 4 if ("true".equals(name)) { 5 return getDefaultExtension(); 6 } 7 Holder<Object> holder = cachedInstances.get(name); 8 if (holder == null) { 9 cachedInstances.putIfAbsent(name, new Holder<Object>()); 10 holder = cachedInstances.get(name); 11 } 12 Object instance = holder.get(); 13 if (instance == null) { 14 synchronized (holder) { 15 instance = holder.get(); 16 if (instance == null) { 17 instance = createExtension(name); 18 holder.set(instance); 19 } 20 } 21 } 22 return (T) instance; 23 }
其實,就是這樣SimpleDataStore實例會存儲在cachedInstances緩存中,下一次不會再創建,而是直接獲取該緩存。
這樣之后,一個AllChannelHandler實例就完成了,該實例屬性如下:
- WrappedChannelHandler.url:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&channel.readonly.sent=true&codec=dubbo&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1287&side=provider&threadname=DubboServerHandler-10.10.10.10:20880×tamp=1507116859919
- WrappedChannelHandler.handler:DecodeHandler對象
- WrappedChannelHandler.executor:FixedThreadPool實例
當然還有一個類變量WrappedChannelHandler.SHARED_EXECUTOR=CachedThreadPool實例。
之后AllChannelHandler實例會被HeartbeatHandler進行包裹,之后HeartbeatHandler實例又會被MultiMessageHandler所包裹,最后得到的MultiMessageHandler實例的層級結構如下:
1 MultiMessageHandler 2 -->handler: HeartbeatHandler 3 -->handler: AllChannelHandler 4 -->url: providerUrl 5 -->executor: FixedExecutor 6 -->handler: DecodeHandler 7 -->handler: HeaderExchangeHandler 8 -->handler: ExchangeHandlerAdapter
MultiMessageHandler實例創建出來之后,NettyServer就開始調用其各個父類進行屬性的初始化了。首先來看一下NettyServer的父類層級圖:
AbstractServer:
1 protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler"; 2 ExecutorService executor; 3 private InetSocketAddress localAddress; 4 private InetSocketAddress bindAddress; 5 private int accepts; 6 private int idleTimeout = 600; 7 8 public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { 9 super(url, handler); 10 localAddress = getUrl().toInetSocketAddress(); 11 String host = url.getParameter(Constants.ANYHOST_KEY, false) 12 || NetUtils.isInvalidLocalHost(getUrl().getHost()) 13 ? NetUtils.ANYHOST : getUrl().getHost(); 14 bindAddress = new InetSocketAddress(host, getUrl().getPort()); 15 this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); 16 this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); 17 try { 18 doOpen(); 19 if (logger.isInfoEnabled()) { 20 logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); 21 } 22 } catch (Throwable t) { 23 throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() 24 + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); 25 } 26 //fixme replace this with better method 27 DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); 28 executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); 29 }
首先調用父類初始化屬性,之后啟動服務。
AbstractEndpoint:
1 private Codec2 codec; 2 private int timeout; 3 private int connectTimeout; 4 5 public AbstractEndpoint(URL url, ChannelHandler handler) { 6 super(url, handler); 7 this.codec = getChannelCodec(url); 8 this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);//1000 9 this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);//3000 10 }
AbstractPeer:
1 private final ChannelHandler handler; 2 private volatile URL url; 3 4 public AbstractPeer(URL url, ChannelHandler handler) { 5 if (url == null) { 6 throw new IllegalArgumentException("url == null"); 7 } 8 if (handler == null) { 9 throw new IllegalArgumentException("handler == null"); 10 } 11 this.url = url; 12 this.handler = handler; 13 }
來看一下最后初始化好的NettyServer實例:
- url:providerUrl(dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&channel.readonly.sent=true&codec=dubbo&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1287&side=provider×tamp=1507116859919)
- handler:MultiMessageHandler實例
- codec:DubboCountCodec實例
- timeout:1000
- connectTimeout:3000
- idleTime:600*1000
- localAddress:10.10.10.10:20880
- bindAddress:0.0.0.0:20880
- accepts:0
- executor:null(此時的executor還沒實例話,要等netty服務起來之后才會從緩存中獲取之前存儲在SimpleDataStore緩存中的那個200個線程數的FixedThreadPool實例)
之后,就要啟動netty服務了。
1 /** 2 * 啟動netty服務,監聽客戶端連接 3 */ 4 @Override 5 protected void doOpen() throws Throwable { 6 NettyHelper.setNettyLoggerFactory(); 7 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); 8 ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); 9 ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); 10 bootstrap = new ServerBootstrap(channelFactory); 11 12 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); 13 channels = nettyHandler.getChannels(); 14 // https://issues.jboss.org/browse/NETTY-365 15 // https://issues.jboss.org/browse/NETTY-379 16 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); 17 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 18 public ChannelPipeline getPipeline() { 19 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); 20 ChannelPipeline pipeline = Channels.pipeline(); 21 /*int idleTimeout = getIdleTimeout(); 22 if (idleTimeout > 10000) { 23 pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); 24 }*/ 25 pipeline.addLast("decoder", adapter.getDecoder()); 26 pipeline.addLast("encoder", adapter.getEncoder()); 27 pipeline.addLast("handler", nettyHandler); 28 return pipeline; 29 } 30 }); 31 // bind 32 channel = bootstrap.bind(getBindAddress()); 33 }
說明:
- boss線程數默認只有一個;
- worker線程數:Runtime.getRuntime().availableProcessors() + 1,為計算機核數+1;
- 服務端邏輯處理器為NettyHandler:
- 編碼器為:InternalEncoder實例,內部使用NettyServer的DubboCountCodec實例來編碼
- 解碼器為:InternalDecoder實例,內部使用NettyServer的DubboCountCodec實例來解碼
NettyHandler:
1 @Sharable 2 public class NettyHandler extends SimpleChannelHandler { 3 private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel> 4 private final URL url; 5 private final ChannelHandler handler; 6 7 public NettyHandler(URL url, ChannelHandler handler) { 8 if (url == null) { 9 throw new IllegalArgumentException("url == null"); 10 } 11 if (handler == null) { 12 throw new IllegalArgumentException("handler == null"); 13 } 14 this.url = url; 15 this.handler = handler; 16 } 17 18 public Map<String, Channel> getChannels() { 19 return channels; 20 } 21 22 @Override 23 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 24 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); 25 try { 26 if (channel != null) { 27 channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel); 28 } 29 handler.connected(channel); 30 } finally { 31 NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); 32 } 33 } 34 35 @Override 36 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 37 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); 38 try { 39 channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress())); 40 handler.disconnected(channel); 41 } finally { 42 NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); 43 } 44 } 45 46 @Override 47 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 48 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); 49 try { 50 handler.received(channel, e.getMessage()); 51 } finally { 52 NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); 53 } 54 } 55 56 @Override 57 public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 58 super.writeRequested(ctx, e); 59 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); 60 try { 61 handler.sent(channel, e.getMessage()); 62 } finally { 63 NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); 64 } 65 } 66 67 @Override 68 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 69 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); 70 try { 71 handler.caught(channel, e.getCause()); 72 } finally { 73 NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); 74 } 75 } 76 }
說明:
屬性
- handler:就是當前的NettyServer實例
- url:providerUrl
- channels:存放連接到來的channel
監聽連接完成/連接斷開/接收到消息/發送完消息/異常捕捉事件,之后使用NettyServer實例進行相應的處理,NettyServer又會調用MultiMessageHandler實例(該handler屬性位於NettyServer的父類AbstractPeer中)進行處理。
在來看編碼器和解碼器:
NettyCodecAdapter(DubboCountCodec實例, providerUrl, 當前的NettyServer實例)
1 final class NettyCodecAdapter { 2 private final ChannelHandler encoder = new InternalEncoder(); 3 private final ChannelHandler decoder = new InternalDecoder(); 4 private final Codec2 codec; 5 private final URL url; 6 private final int bufferSize; 7 private final com.alibaba.dubbo.remoting.ChannelHandler handler; 8 9 public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) { 10 this.codec = codec; 11 this.url = url; 12 this.handler = handler; 13 int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);//8*1024 14 this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;//8*1024 15 } 16 17 public ChannelHandler getEncoder() { 18 return encoder; 19 } 20 21 public ChannelHandler getDecoder() { 22 return decoder; 23 } 24 25 @Sharable 26 private class InternalEncoder extends OneToOneEncoder { 27 @Override 28 protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception { 29 ... 30 codec.encode(channel, buffer, msg); 31 ... 32 } 33 } 34 35 private class InternalDecoder extends SimpleChannelUpstreamHandler { 36 @Override 37 public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { 38 ... 39 msg = codec.decode(channel, message); 40 ... 41 } 42 ... 43 } 44 }
只列出核心代碼:可以看到,InternalEncoder實例和InternalDecoder實例內部還是使用NettyServer的DubboCountCodec實例來編解碼的。dubbo的編解碼做的非常好,后續會寫。
到此為止,NettyServer就創建成功了。 之后,終於執行到了:
new HeaderExchangeServer(Server NettyServer)。
1 private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, new NamedThreadFactory("dubbo-remoting-server-heartbeat",true)); 2 private final Server server; 3 // 心跳定時器 4 private ScheduledFuture<?> heatbeatTimer; 5 // 心跳超時,毫秒。缺省0,不會執行心跳。 6 private int heartbeat; 7 private int heartbeatTimeout; 8 private AtomicBoolean closed = new AtomicBoolean(false); 9 10 public HeaderExchangeServer(Server server) { 11 if (server == null) { 12 throw new IllegalArgumentException("server == null"); 13 } 14 this.server = server; 15 this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);//60000 在createServer(URL providerUrl)中拼接了heartbeat參數 16 this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);//3*60000 17 if (heartbeatTimeout < heartbeat * 2) { 18 throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); 19 } 20 startHeatbeatTimer(); 21 }
說明:
- 屬性
- scheduled:是一個有1個名字為dubbo-remoting-server-heartbeat的后台線程的定時線程池;
- server:之前創建出來的NettyServer實例;
- heartbeatTimer:心跳計時器
- heartbeat:心跳時間,該參數會在HeaderExchangeServer的構造器中進行賦值,60000
- heartbeatTimeout:心跳超時時間,超過該時間,會進行channel重連,180000
- 啟動心跳計時器
startHeatbeatTimer()
1 private void startHeatbeatTimer() { 2 stopHeartbeatTimer(); 3 if (heartbeat > 0) { 4 heatbeatTimer = scheduled.scheduleWithFixedDelay( 5 new HeartBeatTask(new HeartBeatTask.ChannelProvider() { 6 public Collection<Channel> getChannels() { 7 return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels()); 8 } 9 }, heartbeat, heartbeatTimeout), 10 heartbeat, 11 heartbeat, 12 TimeUnit.MILLISECONDS); 13 } 14 } 15 16 private void stopHeartbeatTimer() { 17 try { 18 ScheduledFuture<?> timer = heatbeatTimer; 19 if (timer != null && !timer.isCancelled()) { 20 timer.cancel(true); 21 } 22 } catch (Throwable t) { 23 logger.warn(t.getMessage(), t); 24 } finally { 25 heatbeatTimer = null; 26 } 27 }
首先停掉之前的計時器,之后在線程創建開始heartbeat毫秒(60s)后執行第一次HeartBeatTask任務,之后每隔heartbeat毫秒(60s)執行一次HeartBeatTask任務。來看一下HeartBeatTask:
HeartBeatTask
1 final class HeartBeatTask implements Runnable { 2 private ChannelProvider channelProvider; 3 private int heartbeat;//60s 4 private int heartbeatTimeout;//180s 5 6 HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) { 7 this.channelProvider = provider; 8 this.heartbeat = heartbeat; 9 this.heartbeatTimeout = heartbeatTimeout; 10 } 11 12 public void run() { 13 try { 14 long now = System.currentTimeMillis(); 15 for (Channel channel : channelProvider.getChannels()) { 16 if (channel.isClosed()) { 17 continue; 18 } 19 try { 20 Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);//"READ_TIMESTAMP" 21 Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);//"WRITE_TIMESTAMP" 22 //如果最后一次讀和寫在heartbeat時間(60s)內,則最后一次的讀和寫本身可以看作心跳;否則,需要程序發送心跳 23 if ((lastRead != null && now - lastRead > heartbeat) 24 || (lastWrite != null && now - lastWrite > heartbeat)) { 25 Request req = new Request(); 26 req.setVersion("2.0.0"); 27 req.setTwoWay(true); 28 req.setEvent(Request.HEARTBEAT_EVENT); 29 channel.send(req); 30 if (logger.isDebugEnabled()) { 31 logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() 32 + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms"); 33 } 34 } 35 //如果最后一次讀的時間距離現在已經超過heartbeatTimeout了,我們認為channel已經斷了(因為在這個過程中,發送了三次心跳都沒反應),此時channel進行重連 36 if (lastRead != null && now - lastRead > heartbeatTimeout) { 37 logger.warn("Close channel " + channel 38 + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms"); 39 if (channel instanceof Client) { 40 try { 41 ((Client) channel).reconnect(); 42 } catch (Exception e) { 43 //do nothing 44 } 45 } else { 46 channel.close(); 47 } 48 } 49 } catch (Throwable t) { 50 logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t); 51 } 52 } 53 } catch (Throwable t) { 54 logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t); 55 } 56 } 57 58 interface ChannelProvider { 59 Collection<Channel> getChannels(); 60 } 61 }
說明:
- 屬性
- channelProvider在startHeatbeatTimer()中創建,並且獲取了當前的HeaderExchangeServer的所有channels
- heartbeat:60s
- heartbeatTimeout:180s
- run()
- 如果最后一次讀和寫的時間距離現在在heartbeat時間(60s)內,則最后一次的讀和寫本身可以看作心跳;否則,發送心跳
- 如果最后一次讀的時間距離現在已經超過heartbeatTimeout了,認為channel已經斷了(因為在這個過程中,發送了三次心跳都沒反應),此時channel進行重連
到現在一個完整的ExchangeServer就OK了。之后我們將創建出來的ExchangeServer實例存放在DubboProtocol的Map<String, ExchangeServer> serverMap屬性中:
{ "10.10.10.10:20880" : ExchangeServer實例 }
最后,DubboProtocol.export(Invoker<T> invoker)將之前創建的DubboExporter實例返回。
2.4 創建RegistryProtocol.ExporterChangeableWrapper來封裝Exporter和originInvoker
1 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker)
1 private class ExporterChangeableWrapper<T> implements Exporter<T> { 2 private final Invoker<T> originInvoker; 3 private Exporter<T> exporter; 4 5 public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) { 6 this.exporter = exporter; 7 this.originInvoker = originInvoker; 8 } 9 10 public Invoker<T> getOriginInvoker() { 11 return originInvoker; 12 } 13 14 public Invoker<T> getInvoker() { 15 return exporter.getInvoker(); 16 } 17 18 public void setExporter(Exporter<T> exporter) { 19 this.exporter = exporter; 20 } 21 22 public void unexport() { 23 String key = getCacheKey(this.originInvoker); 24 bounds.remove(key); 25 exporter.unexport(); 26 } 27 }
ExporterChangeableWrapper類是RegistryProtocol的私有內部類。
最后,將<providerUrl, ExporterChangeableWrapper實例>放入RegistryProtocol的屬性Map<String, ExporterChangeableWrapper<?>> bounds中。
- key:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=744&side=provider×tamp=1507176748026
- value:RegistryProtocol$ExporterChangeableWrapper實例
- originInvoker:即AbstractProxyInvoker實例屬性如下:
- proxy:DemoServiceImpl實例
- type:Class<com.alibaba.dubbo.demo.DemoService>
- url:registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D993%26side%3Dprovider%26timestamp%3D1507100322516&pid=993®istry=zookeeper×tamp=1507100319830
- DubboExporter實例
- key:com.alibaba.dubbo.demo.DemoService:20880
- invoker:"InvokerDelegete的filter對象"
- exporterMap:{ "com.alibaba.dubbo.demo.DemoService:20880" -> 當前的DubboExporter實例 }
- originInvoker:即AbstractProxyInvoker實例屬性如下:
到此為止,RegistryProtocol.export(final Invoker<T> originInvoker)的第一行代碼就完成了。
1 public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { 2 //export invoker 3 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); 4 //registry provider 5 final Registry registry = getRegistry(originInvoker); 6 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); 7 registry.register(registedProviderUrl); 8 // 訂閱override數據 9 // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,因為subscribed以服務名為緩存的key,導致訂閱信息覆蓋。 10 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); 11 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); 12 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 13 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 14 //保證每次export都返回一個新的exporter實例 15 return new Exporter<T>() { 16 public Invoker<T> getInvoker() { 17 return exporter.getInvoker(); 18 } 19 20 public void unexport() { 21 try { 22 exporter.unexport(); 23 } catch (Throwable t) { 24 logger.warn(t.getMessage(), t); 25 } 26 try { 27 registry.unregister(registedProviderUrl); 28 } catch (Throwable t) { 29 logger.warn(t.getMessage(), t); 30 } 31 try { 32 overrideListeners.remove(overrideSubscribeUrl); 33 registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); 34 } catch (Throwable t) { 35 logger.warn(t.getMessage(), t); 36 } 37 } 38 }; 39 }