在閱讀此文章之前,我希望閱讀者對Spring 擴展機制的有一定的了解,比如:自定義標簽與Spring整合, InitializingBean 接口,ApplicationContextAware,BeanNameAware,
BeanFactory 接口所起到的作用 ;從來沒了解過的,請看我之前的關於Spring的博客
關於此篇文章受益於 :http://dubbo.apache.org/zh-cn/docs/source_code_guide/export-service.html dubbo官網文檔
開始正題
(一)onApplicationEvent (事件監聽)
dubbo 服務導出的方法是在 com.alibaba.dubbo.config.spring.ServiceBean 類中,dubbo 服務的導出過程始於在Spring 容器發生刷新事件,那么如何感知到Spring 容器發生刷新
事件呢? ~~ 得益於Spring提供的 ApplicationListener 接口,看如下代碼實現:
public void onApplicationEvent(ApplicationEvent event) {
//在Spring bean 刷新后進行服務的導出;
if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) { if (isDelay() && ! isExported() && ! isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } export(); } } }
實現該接口需要實現 onApplicationEvent 方法;事件的監聽,在有event 事件發生后,Spring會自動進行觸發此方法;
在Spring 容器發生刷新事件后進行導出 export(); 這一步就是最先開始的地方;
(二)檢查參數,組裝 URL
public synchronized void export() {
if (provider != null) { //是否進行導出的操作,用於在我們配置了<dubbo:provider export="false" />的時候,本地進行操作,不進行服務暴露的時候 if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && ! export.booleanValue()) { return; } //延時導出,線程睡眠,在高版本中,此方法改變成了schedule if (delay != null && delay > 0) { Thread thread = new Thread(new Runnable() { public void run() { try { Thread.sleep(delay); } catch (Throwable e) { } doExport(); } }); thread.setDaemon(true); thread.setName("DelayExportServiceThread"); thread.start(); } else { doExport(); } }
以上的方法就是進行了<dubbo:provider> 標簽屬性 export 與delay 的操作判斷;
我們繼續進行 doExport();
doExport() 又進行了一些的判斷與初始化的動作,沒什么好說的,接下來就是調用 doExportUrls();
private void doExportUrls() { List<URL> registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
loadRegistries() 會將一些必要信息進行排序封裝,生成url地址鏈接如下格式:
registry://47.95.**.138:2181/com.alibaba.dubbo.registry.RegistryService?application=hm-service-cart&check=false&dubbo=2.5.3&pid=931®istry=zookeeper×tamp=1545835371053
有多少協議,就導出多少次:
doExportUrlsFor1Protocol(protocolConfig, registryURLs);這個方法我們在第4步分析;
我們先看看Dubbo spi 機制
(三) Dubbo SPI (服務發現機制)
DUBBO SPI 的實現主要依賴於。 com.alibaba.dubbo.common.extension.ExtensionLoader 這個類,這個類是dubbo 運行的容器,維護着一些組件的實例,就像Spring 容器一樣;
SPI 服務發現機制;我給大家揭開它的面紗:
在源碼閱讀中,我們只要發現了像如下的代碼,內部就進行了spi 的機制
ExtensionLoader.getExtensionLoader(XXX.class).getExtension(type);
ExtensionLoader.getExtensionLoader(XXX.class).getAdaptiveExtension();
xxx.class 是接口名字,必須帶有@SPI的注解
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { //不為空檢測 if (type == null) throw new IllegalArgumentException("Extension type == null"); //必須的接口 if(!type.isInterface()) { throw new IllegalArgumentException("Extension type(" + type + ") is not interface!"); } //接口必須有@SPI 的注解 if(!withExtensionAnnotation(type)) { throw new IllegalArgumentException("Extension type(" + type + ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!"); } //實例化 ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); if (loader == null) { EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); } return loader; }
具體如下
1.(大家可以去看一看dubbo jar 包下 META-INF/dubbo/internal 目錄的文件):以接口作為文件名稱,內部是ke y = 實現類
private static final String SERVICES_DIRECTORY = "META-INF/services/"; private static final String DUBBO_DIRECTORY = "META-INF/dubbo/"; private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
2.在 getExtension 方法執行中,主要看 createExtension(name)方法;
public T getExtension(String name) { if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null"); if ("true".equals(name)) { return getDefaultExtension(); } Holder<Object> holder = cachedInstances.get(name); if (holder == null) { cachedInstances.putIfAbsent(name, new Holder<Object>()); holder = cachedInstances.get(name); } Object instance = holder.get(); if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { instance = createExtension(name); holder.set(instance); } } } return (T) instance; }
createExtension(name)方法(ioc + aop )
private T createExtension(String name) {
//這個方法會從文件系統中獲取name 所對應的value ,大家可以看看這個方法 Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); }
//ioc 操作 injectExtension(instance); Set<Class<?>> wrapperClasses = cachedWrapperClasses;
//aop 包裝的操作,包裝類你調用我我調用你,形成了一條執行鏈;wrapperClasses 是包裝類,在loadFile() 方法中被裝配 if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class<?> wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } }
獲取所有的擴展類:
private Map<String, Class<?>> getExtensionClasses() { Map<String, Class<?>> classes = cachedClasses.get(); if (classes == null) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null) { classes = loadExtensionClasses(); cachedClasses.set(classes); } } } return classes; }
// 此方法已經getExtensionClasses方法同步過。 private Map<String, Class<?>> loadExtensionClasses() { final SPI defaultAnnotation = type.getAnnotation(SPI.class); if(defaultAnnotation != null) { String value = defaultAnnotation.value(); if(value != null && (value = value.trim()).length() > 0) { String[] names = NAME_SEPARATOR.split(value); if(names.length > 1) { throw new IllegalStateException("more than 1 default extension name on extension " + type.getName() + ": " + Arrays.toString(names)); } if(names.length == 1) cachedDefaultName = names[0]; } } Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
//從META-INF/services/ META-INF/dubbo/ /META-INF/dubbo/internal/ 這三個文件夾中獲取class配置 loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY); loadFile(extensionClasses, DUBBO_DIRECTORY); loadFile(extensionClasses, SERVICES_DIRECTORY); return extensionClasses; }
//dubbo 擴展檢索的文件
private static final String SERVICES_DIRECTORY = "META-INF/services/"; private static final String DUBBO_DIRECTORY = "META-INF/dubbo/"; private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
這樣對應着key 就可以實例化value,找到實現類了; 大家可以做一做小的demo進行一下測試:
/** 接口 * dubbo spi 機制,需要引用dubbo的spi 注解標簽 */ @SPI public interface DubboSPI { public void sayHello(); } //實現類 public class DuboSPIImpl implements DubboSPI { public void sayHello(){ System.out.println("dubbo spi 機制運行......cys"); }; } //測試類 public static void main(String[] args) { ExtensionLoader<DubboSPI> extensionLoader = ExtensionLoader.getExtensionLoader(DubboSPI.class); DubboSPI optimusPrime = extensionLoader.getExtension("DubboSPI"); optimusPrime.sayHello(); }
在META-INF/dubbo 下配置創建 com.baseknow.spi.dubbospi.DubboSPI (你的接口名字)文件
DubboSPI = com.baseknow.spi.dubbospi.DuboSPIImpl
關於一些常規的操作就到這里;
高級特性:(動態生成字節碼文件,加載進入內存)
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
如上:dubbo 通過 getAdaptiveExtension()進行擴展適配;通過javassist 將動態創建的類加載進內存;可以這么說,如上的protocol proxyFactory 這兩個對象是不存在的
通過dubbo 擴展 SPI Adapter 機制,將會自動生成實現該接口的一個類(這個類是自動生成的,不存在我們的文件中),通過javassiste 進行記載,然后通過創建的Adapter 類進行方法
的調度;
具體請看 ExtensionLoader 類如下兩個方法(源碼生成➕編譯):
private Class<?> createAdaptiveExtensionClass() { //生成源碼code ,建議debug ,看一看生成的源碼是什么樣子的 String code = createAdaptiveExtensionClassCode(); ClassLoader classLoader = findClassLoader(); com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); //默認會進行javassist 將源碼加載toclass() return compiler.compile(code, classLoader); } //源碼生成的方法 private String createAdaptiveExtensionClassCode() { StringBuilder codeBuidler = new StringBuilder(); Method[] methods = type.getMethods(); boolean hasAdaptiveAnnotation = false; for(Method m : methods) { if(m.isAnnotationPresent(Adaptive.class)) { hasAdaptiveAnnotation = true; break; } } //........
如下是dubbo 通過 getAdaptiveExtension() 通過一系列判斷 stringbbuilder 拼接而生成的 實現了Protocol接口的 適配器源碼,根據傳來的url來處理不同協議的調度
package com.alibaba.dubbo.rpc; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol { public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); } public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } }
這是實現了ProxyFactory 接口的 適配器源碼
package com.alibaba.dubbo.rpc; import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getProxy(arg0); }
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object { if (arg2 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg2; String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getInvoker(arg0, arg1, arg2); } }
以上兩個類就是生成的源碼code;
通過Javassist 進行將類進行加載成字節碼 com.alibaba.dubbo.common.compiler.support.JavassistCompiler
總的來說就二步:
1.通過Dubbo 擴展機制 ExtensionLoader 生成一個接口的自定義實現類源碼;
2.通過javassist 將生成的源碼轉換為字節碼加載進入內存
關於Protocol 與 ProxyFactory 這兩個動態的實現類,內部又是進行了spi機制的一些方法調用;大家看一看,因為下面的一些擴展都是基於這兩個的,不然很難知道它進行調用了哪個實現類;當然還有 Transporter 接口實現等等也是該機制;
如下是Transport Adapter 源碼:可以看出在沒有指定傳輸的時候,默認使用的是netty
package com.alibaba.dubbo.remoting; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter { public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = url.getParameter("client", url.getParameter("transporter", "netty")); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])"); com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName); return extension.connect(arg0, arg1); } public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = url.getParameter("server", url.getParameter("transporter", "netty")); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])"); com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName); return extension.bind(arg0, arg1); } }
關於高級特性就到這里,具體的還是要看getAdaptiveExtension() 方法實現,里面是進行了 StringBuilder append 拼接源碼生成新的類,再進行JAVAssist toClass() 操作,需要對
JAVAssist 了解,不了解,大約知道生成的新類長什么樣子就好了;
(四) Netty(mina) 服務的啟動,端口綁定
緊接着第二步的步伐,我們分析 doExportUrlsFor1Protocol();我們拋開一些小細節,具體看如下Mark 紅色的部分
//如果配置不是local則暴露為遠程服務.(配置為local,則表示只暴露遠程服務) if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){ if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter);
proxyFactory 與 protocol 都是動態生成類,大家可以看第四部分我貼出來的動態生成的源碼,就可以知道真正的實現類是哪一個(默認是javassist 的實現類);
protocol 則是進行了aop 式的包裝類,會將DubboProtocol 類 ProtocolFilterWrapper 類, ProtocolListenerWrapper類,RegistryProtocol 類的export()方法執行一邊,為什么都會執行呢,請大家看 ExtensionLoader 的 createExtension 方法;如下:
private T createExtension(String name) { //這個方法會從文件系統中獲取name 所對應的value ,大家可以看看這個方法 Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } //ioc 操作 injectExtension(instance); Set<Class<?>> wrapperClasses = cachedWrapperClasses; //aop 包裝的操作,包裝類你調用我我調用你,形成了一條執行鏈; if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class<?> wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } }
在調用 DubboProtocol 的export() 方法時候,會進行 openServer(url)操作;
openServer(url);
private void openServer(URL url) { // find server. String key = url.getAddress(); //client 也可以暴露一個只有server可以調用的服務。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } else { //server支持reset,配合override功能使用 server.reset(url); } } }
檢查是否開啟過服務,沒有的話,createServer(url);創建服務,這里大家就往下面點,會經過如下的過程:
會使用dubbo SPI 機制返回Transporter 傳輸對象,在我們不在xml 做任何配置的情況下,默認使用的是netty,我們可以看Javassist 生成的Transporter Adapter 源碼:
package com.alibaba.dubbo.remoting; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter { public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = url.getParameter("client", url.getParameter("transporter", "netty")); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])"); com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName); return extension.connect(arg0, arg1); } public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = url.getParameter("server", url.getParameter("transporter", "netty")); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])"); com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName); return extension.bind(arg0, arg1); } }
如果使用mina ,我們可以在xml 做如下配置:
<!-- 用dubbo協議在20880端口暴露服務 --> <dubbo:protocol name="dubbo" port="20881" transporter="mina" />
在實例化的時候會進行doOpen() 操作了,就是正常的netty api 流程了;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException{ super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } @Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
通過分析,netty 服務創建流程就是spi 的各種調用,所以大家了解dubbo 就必須了解它的spi 機制;
這樣,一個netty 服務就啟動了..
(五) 注冊中心注冊服務
注冊中心我們以zookeeper 為例,也是Dubbo 推薦使用的注冊中心以及生產部署中常用的;
注冊中心的實現是 protocol 執行鏈中的RegistryProtocol 類中的export 方法
實現主要是CURATOR 框架進行節點的創建以及監聽
我們看節點的創建過程:
遞歸創建節點,其中url地址是臨時的節點,客戶端斷開會消失,其他的像/dubbo/接口地址 是永久的節點:如下圖