DUBBO 服務導出實現


在閱讀此文章之前,我希望閱讀者對Spring 擴展機制的有一定的了解,比如:自定義標簽與Spring整合, InitializingBean 接口,ApplicationContextAware,BeanNameAware,

BeanFactory 接口所起到的作用 ;從來沒了解過的,請看我之前的關於Spring的博客 

 

關於此篇文章受益於 :http://dubbo.apache.org/zh-cn/docs/source_code_guide/export-service.html   dubbo官網文檔

 

開始正題

(一)onApplicationEvent (事件監聽)

dubbo 服務導出的方法是在 com.alibaba.dubbo.config.spring.ServiceBean 類中,dubbo 服務的導出過程始於在Spring 容器發生刷新事件,那么如何感知到Spring 容器發生刷新

事件呢? ~~ 得益於Spring提供的 ApplicationListener 接口,看如下代碼實現:

public void onApplicationEvent(ApplicationEvent event) {
    //在Spring bean 刷新后進行服務的導出;
        if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) { if (isDelay() && ! isExported() && ! isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } export(); } } }

實現該接口需要實現 onApplicationEvent 方法;事件的監聽,在有event 事件發生后,Spring會自動進行觸發此方法;

在Spring 容器發生刷新事件后進行導出 export();   這一步就是最先開始的地方;

 

(二)檢查參數,組裝 URL

public synchronized void export() {
        if (provider != null) { //是否進行導出的操作,用於在我們配置了<dubbo:provider export="false" />的時候,本地進行操作,不進行服務暴露的時候 if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && ! export.booleanValue()) { return; } //延時導出,線程睡眠,在高版本中,此方法改變成了schedule if (delay != null && delay > 0) { Thread thread = new Thread(new Runnable() { public void run() { try { Thread.sleep(delay); } catch (Throwable e) { } doExport(); } }); thread.setDaemon(true); thread.setName("DelayExportServiceThread"); thread.start(); } else { doExport(); } } 

以上的方法就是進行了<dubbo:provider> 標簽屬性 export 與delay 的操作判斷;

我們繼續進行  doExport();

 doExport() 又進行了一些的判斷與初始化的動作,沒什么好說的,接下來就是調用 doExportUrls();

private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
loadRegistries() 會將一些必要信息進行排序封裝,生成url地址鏈接如下格式:

registry://47.95.**.138:2181/com.alibaba.dubbo.registry.RegistryService?application=hm-service-cart&check=false&dubbo=2.5.3&pid=931&registry=zookeeper&timestamp=1545835371053
有多少協議,就導出多少次:

doExportUrlsFor1Protocol(protocolConfig, registryURLs);這個方法我們在第4步分析;

我們先看看Dubbo spi 機制
  

 (三) Dubbo SPI (服務發現機制)

DUBBO SPI 的實現主要依賴於。 com.alibaba.dubbo.common.extension.ExtensionLoader 這個類,這個類是dubbo 運行的容器,維護着一些組件的實例,就像Spring 容器一樣;

SPI 服務發現機制;我給大家揭開它的面紗:

 

在源碼閱讀中,我們只要發現了像如下的代碼,內部就進行了spi 的機制

ExtensionLoader.getExtensionLoader(XXX.class).getExtension(type);
ExtensionLoader.getExtensionLoader(XXX.class).getAdaptiveExtension();

xxx.class 是接口名字,必須帶有@SPI的注解

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    //不為空檢測
        if (type == null)
            throw new IllegalArgumentException("Extension type == null");
//必須的接口
        if(!type.isInterface()) {
            throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
        }
//接口必須有@SPI 的注解
        if(!withExtensionAnnotation(type)) {
            throw new IllegalArgumentException("Extension type(" + type + 
                    ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
        }
        //實例化
        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        if (loader == null) {
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }

 

具體如下

1.(大家可以去看一看dubbo jar 包下 META-INF/dubbo/internal 目錄的文件):以接口作為文件名稱,內部是ke y = 實現類

 private static final String SERVICES_DIRECTORY = "META-INF/services/";

 private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
    
 private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";

2.在 getExtension  方法執行中,主要看  createExtension(name)方法;

public T getExtension(String name) {
        if (name == null || name.length() == 0)
            throw new IllegalArgumentException("Extension name == null");
        if ("true".equals(name)) {
            return getDefaultExtension();
        }
        Holder<Object> holder = cachedInstances.get(name);
        if (holder == null) {
            cachedInstances.putIfAbsent(name, new Holder<Object>());
            holder = cachedInstances.get(name);
        }
        Object instance = holder.get();
        if (instance == null) {
            synchronized (holder) {
                instance = holder.get();
                if (instance == null) {
                    instance = createExtension(name);
                    holder.set(instance);
                }
            }
        }
        return (T) instance;
    }
createExtension(name)方法(ioc + aop )
private T createExtension(String name) {
      //這個方法會從文件系統中獲取name 所對應的value ,大家可以看看這個方法 Class
<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); }
        //ioc 操作 injectExtension(instance); Set
<Class<?>> wrapperClasses = cachedWrapperClasses;
       //aop 包裝的操作,包裝類你調用我我調用你,形成了一條執行鏈;wrapperClasses 是包裝類,在loadFile() 方法中被裝配
if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class<?> wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } }

獲取所有的擴展類:

    private Map<String, Class<?>> getExtensionClasses() {
        Map<String, Class<?>> classes = cachedClasses.get();
        if (classes == null) {
            synchronized (cachedClasses) {
                classes = cachedClasses.get();
                if (classes == null) {
                    classes = loadExtensionClasses();
                    cachedClasses.set(classes);
                }
            }
        }
        return classes;
    }

 

 // 此方法已經getExtensionClasses方法同步過。
    private Map<String, Class<?>> loadExtensionClasses() {
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        if(defaultAnnotation != null) {
            String value = defaultAnnotation.value();
            if(value != null && (value = value.trim()).length() > 0) {
                String[] names = NAME_SEPARATOR.split(value);
                if(names.length > 1) {
                    throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                            + ": " + Arrays.toString(names));
                }
                if(names.length == 1) cachedDefaultName = names[0];
            }
        }
        
        Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
    //從META-INF/services/ META-INF/dubbo/ /META-INF/dubbo/internal/ 這三個文件夾中獲取class配置 loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY); loadFile(extensionClasses, DUBBO_DIRECTORY); loadFile(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses; }
  //dubbo 擴展檢索的文件

private static final String SERVICES_DIRECTORY = "META-INF/services/"; private static final String DUBBO_DIRECTORY = "META-INF/dubbo/"; private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";

 

這樣對應着key 就可以實例化value,找到實現類了; 大家可以做一做小的demo進行一下測試:

/**
接口
 * dubbo spi 機制,需要引用dubbo的spi 注解標簽
 */
@SPI
public interface DubboSPI {

    public void sayHello();
}

//實現類
public class DuboSPIImpl implements DubboSPI {

    public void sayHello(){
        System.out.println("dubbo spi 機制運行......cys");
    };

}



//測試類
public static void main(String[] args) {
        ExtensionLoader<DubboSPI> extensionLoader =
                ExtensionLoader.getExtensionLoader(DubboSPI.class);
        DubboSPI optimusPrime = extensionLoader.getExtension("DubboSPI");
        optimusPrime.sayHello();

    }

在META-INF/dubbo 下配置創建 com.baseknow.spi.dubbospi.DubboSPI (你的接口名字)文件 

DubboSPI = com.baseknow.spi.dubbospi.DuboSPIImpl

關於一些常規的操作就到這里;

 

高級特性(動態生成字節碼文件,加載進入內存)

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

 如上:dubbo 通過 getAdaptiveExtension()進行擴展適配;通過javassist 將動態創建的類加載進內存;可以這么說,如上的protocol  proxyFactory 這兩個對象是不存在的

通過dubbo 擴展 SPI Adapter 機制,將會自動生成實現該接口的一個類(這個類是自動生成的,不存在我們的文件中),通過javassiste 進行記載,然后通過創建的Adapter 類進行方法

的調度;

 

 具體請看 ExtensionLoader 類如下兩個方法(源碼生成➕編譯):

private Class<?> createAdaptiveExtensionClass() {
        //生成源碼code ,建議debug ,看一看生成的源碼是什么樣子的
        String code = createAdaptiveExtensionClassCode();
        ClassLoader classLoader = findClassLoader();
        com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
    //默認會進行javassist 將源碼加載toclass()
        return compiler.compile(code, classLoader);
    }

    //源碼生成的方法
    private String createAdaptiveExtensionClassCode() {
        StringBuilder codeBuidler = new StringBuilder();
        Method[] methods = type.getMethods();
        boolean hasAdaptiveAnnotation = false;
        for(Method m : methods) {
            if(m.isAnnotationPresent(Adaptive.class)) {
                hasAdaptiveAnnotation = true;
                break;
            }
        }        
//........ 

 如下是dubbo   通過 getAdaptiveExtension() 通過一系列判斷 stringbbuilder 拼接而生成的 實現了Protocol接口的 適配器源碼,根據傳來的url來處理不同協議的調度

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}

public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}

public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}

 這是實現了ProxyFactory  接口的 適配器源碼

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getProxy(arg0); }
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object { if (arg2 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg2; String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getInvoker(arg0, arg1, arg2); } }

 以上兩個類就是生成的源碼code;

通過Javassist 進行將類進行加載成字節碼 com.alibaba.dubbo.common.compiler.support.JavassistCompiler    

總的來說就二步:

1.通過Dubbo 擴展機制 ExtensionLoader 生成一個接口的自定義實現類源碼;
2.通過javassist 將生成的源碼轉換為字節碼加載進入內存

 

關於Protocol 與 ProxyFactory 這兩個動態的實現類,內部又是進行了spi機制的一些方法調用;大家看一看,因為下面的一些擴展都是基於這兩個的,不然很難知道它進行調用了哪個實現類;當然還有 Transporter 接口實現等等也是該機制;

如下是Transport Adapter 源碼:可以看出在沒有指定傳輸的時候,默認使用的是netty

package com.alibaba.dubbo.remoting;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter {
public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = url.getParameter("client", url.getParameter("transporter", "netty"));
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])");
com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
return extension.connect(arg0, arg1);
}
public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])");
com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
return extension.bind(arg0, arg1);
}
}

 

 關於高級特性就到這里,具體的還是要看getAdaptiveExtension() 方法實現,里面是進行了 StringBuilder  append 拼接源碼生成新的類,再進行JAVAssist toClass() 操作,需要對

JAVAssist 了解,不了解,大約知道生成的新類長什么樣子就好了;

 

  (四) Netty(mina) 服務的啟動,端口綁定

緊接着第二步的步伐,我們分析 doExportUrlsFor1Protocol();我們拋開一些小細節,具體看如下Mark 紅色的部分

 //如果配置不是local則暴露為遠程服務.(配置為local,則表示只暴露遠程服務)
            if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                      Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

                       Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
proxyFactory 與 protocol 都是動態生成類,大家可以看第四部分我貼出來的動態生成的源碼,就可以知道真正的實現類是哪一個(默認是javassist 的實現類);

protocol 則是進行了aop 式的包裝類,會將DubboProtocol 類 ProtocolFilterWrapper 類, ProtocolListenerWrapper類,RegistryProtocol 類的export()方法執行一邊,為什么都會執行呢,請大家看 ExtensionLoader 的 createExtension 方法;如下:
private T createExtension(String name) {
      //這個方法會從文件系統中獲取name 所對應的value ,大家可以看看這個方法
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
        //ioc 操作
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
       //aop 包裝的操作,包裝類你調用我我調用你,形成了一條執行鏈;
            if (wrapperClasses != null && wrapperClasses.size() > 0) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                    type + ")  could not be instantiated: " + t.getMessage(), t);
        }
    }

 

在調用 DubboProtocol 的export() 方法時候,會進行 openServer(url)操作;

 openServer(url);
private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client 也可以暴露一個只有server可以調用的服務。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                //server支持reset,配合override功能使用
                server.reset(url);
            }
        }
    }

檢查是否開啟過服務,沒有的話,createServer(url);創建服務,這里大家就往下面點,會經過如下的過程:

 

會使用dubbo SPI 機制返回Transporter 傳輸對象,在我們不在xml 做任何配置的情況下,默認使用的是netty,我們可以看Javassist 生成的Transporter Adapter 源碼:

package com.alibaba.dubbo.remoting;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Transporter$Adpative implements com.alibaba.dubbo.remoting.Transporter {
public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = url.getParameter("client", url.getParameter("transporter", "netty"));
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])");
com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
return extension.connect(arg0, arg1);
}
public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.common.URL {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = url.getParameter("server", url.getParameter("transporter", "netty")); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])");
com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
return extension.bind(arg0, arg1);
}
}

如果使用mina ,我們可以在xml 做如下配置:

    <!-- 用dubbo協議在20880端口暴露服務 -->
    <dubbo:protocol name="dubbo" port="20881" transporter="mina" />

 

在實例化的時候會進行doOpen() 操作了,就是正常的netty api 流程了; 

public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

    @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);
        
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }
通過分析,netty 服務創建流程就是spi 的各種調用,所以大家了解dubbo 就必須了解它的spi 機制;
這樣,一個netty 服務就啟動了.. 

 (五) 注冊中心注冊服務

注冊中心我們以zookeeper 為例,也是Dubbo 推薦使用的注冊中心以及生產部署中常用的;

注冊中心的實現是 protocol 執行鏈中的RegistryProtocol 類中的export 方法

實現主要是CURATOR 框架進行節點的創建以及監聽

我們看節點的創建過程:

 遞歸創建節點,其中url地址是臨時的節點,客戶端斷開會消失,其他的像/dubbo/接口地址 是永久的節點:如下圖

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM