dubbo系列四、dubbo服務暴露過程源碼解析


一、代碼准備

1、示例代碼

參考dubbo系列二、dubbo+zookeeper+dubboadmin分布式服務框架搭建(windows平台)

2、簡單了解下spring自定義標簽

https://www.jianshu.com/p/16b72c10fca8

Spring自定義標簽總共可以分為以下幾個步驟
定義Bean 標簽解析生成接收配置的POJO。
定義schema文件,定義自定義標簽的attr屬性
定義解析類parser,遇到自定義標簽如何解析。
定義命名空間處理類namespaceSupport,遇到自定義的命名標簽,能夠路由到對應的解析類。
聲明schema,寫入spring.schema文件中
聲明自定義標簽的命名處理類namespaceHandler,寫入spring.handlers文件中

例如dubbo標簽:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://code.alibabatech.com/schema/dubbo
        http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!--dubbo應用程序命名-->
<dubbo:application name="dubbo-demo-provider"/>
<!--dubbo注冊地址-->
<dubbo:registry address="zookeeper://192.168.1.100:2181"/>
<!--dubbo協議地址-->
<dubbo:protocol name="dubbo" port="20880"/>
<!--接口聲明-->
<dubbo:service interface="com.dubbo.demo.api.DemoRpcService" ref="demoRpcService"/>
    <bean id="demoRpcService" class="com.dubbo.demo.DemoRpcServiceImpl"/>
</beans>

3、官網說明

官網:https://dubbo.incubator.apache.org/zh-cn/docs/dev/implementation.html

初始化過程細節

解析服務

基於 dubbo.jar 內的 META-INF/spring.handlers 配置,Spring 在遇到 dubbo 名稱空間時,會回調 DubboNamespaceHandler

所有 dubbo 的標簽,都統一用 DubboBeanDefinitionParser 進行解析,基於一對一屬性映射,將 XML 標簽解析為 Bean 對象。

在 ServiceConfig.export() 或 ReferenceConfig.get() 初始化時,將 Bean 對象轉換 URL 格式,所有 Bean 屬性轉成 URL 的參數。

然后將 URL 傳給 協議擴展點,基於擴展點的 擴展點自適應機制,根據 URL 的協議頭,進行不同協議的服務暴露或引用。

 

二、dubbo標簽解析

1、啟動服務,斷點跟蹤

public static void main(String[] args) throws IOException {
        ClassPathXmlApplicationContext context
                = new ClassPathXmlApplicationContext("classpath:dubbo-provider.xml");
        context.start();
        // 阻塞當前進程,否則程序會直接停止
        System.in.read();
    }

 

spring啟動過程中,隨着Spring在初始化過程中,碰到dubbo命名的標簽,如(<dubbo:service>,<dubbo:registry>)等標簽,會由DubboNamespaceHandler類處理,具體原理見鏈接Spring自定義標簽

DubboNamespaceHandler類源碼:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package com.alibaba.dubbo.config.spring.schema;
。。。import org.springframework.beans.factory.xml.NamespaceHandlerSupport;

public class DubboNamespaceHandler extends NamespaceHandlerSupport {
    public DubboNamespaceHandler() {
    }

    public void init() {
     // application標簽解析 <dubbo:application name="dubbo-demo-provider"/>
this.registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); // module標簽解析
this.registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));      // module標簽解析
     this.registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));

     this
.registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
this.registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); this.registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
     // <dubbo:protocol name="dubbo" port="20880"/>
this.registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
     // service標簽
     // <dubbo:service interface="com.dubbo.demo.api.DemoRpcService" ref="demoRpcService"/>
    // <bean id="demoRpcService" class="com.dubbo.demo.DemoRpcServiceImpl"/>
// </beans>

this.registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); this.registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); this.registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true)); } static { Version.checkDuplicate(DubboNamespaceHandler.class); } }

遇到不同的標簽,會由不同的Parser處理,這里重點看服務發布,這行代碼:

registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));

也就是說,當Spring容器處理完<dubbo:service>標簽后,會在Spring容器中生成一個ServiceBean ,服務的發布也會在ServiceBean中完成。不妨看一下ServiceBean的定義:

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
}

2、啟動入口類

ServiceBean 同時也是service標簽解析之后的bean之一,繼承ServiceConfig

該Bean實現了很多接口,關於InitializingBeanDisposableBeanApplicationContextAwareBeanNameAware,這些接口的使用介紹如下鏈接: 而在Spring初始化完成Bean的組裝,會調用InitializingBeanafterPropertiesSet方法,在Spring容器加載完成,會接收到事件ContextRefreshedEvent,調用ApplicationListeneronApplicationEvent方法。
afterPropertiesSet中,和onApplicationEvent中,會調用export(),在export()中,會暴露dubbo服務,具體區別在於是否配置了delay屬性,是否延遲暴露,如果delay不為null,或者不為-1時,會在afterPropertiesSet中調用export()暴露dubbo服務,如果為null,或者為-1時,會在Spring容器初始化完成,接收到ContextRefreshedEvent事件,調用onApplicationEvent,暴露dubbo服務。
部分ServiceBean的代碼如下:
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
 //Spring容器初始化完成,調用
 public void onApplicationEvent(ContextRefreshedEvent event) {
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            //暴露服務
            export();
        }
    }
}

export(),暴露服務過程中,如果發現有delay屬性,則延遲delay時間,暴露服務,如果沒有,則直接暴露服務。

public synchronized void export() {
        //忽略若干行代碼
        if (delay != null && delay > 0) {
            //當delay不為null,且大於0時,延遲delay時間,暴露服務
            delayExportExecutor.schedule(new Runnable() {
                public void run() {
                    //暴露服務
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            //直接暴露服務
            doExport();
        }
    }

而在doExport()中,驗證參數,按照不同的Protocol,比如(dubbo,injvm)暴露服務,在不同的zookeeper集群節點上注冊自己的服務。

protected synchronized void doExport() {
         //忽略10000行代碼
        doExportUrls();
        //忽略10000行代碼
    }

 private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            //按照不同的Protocal暴露服務
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

    // 獲取注冊中心地址
    protected List<URL> loadRegistries(boolean provider) {
        checkRegistry();
        List<URL> registryList = new ArrayList<URL>();
        // protected List<RegistryConfig> registries;解析后的RegistryConfig中獲取地址列表
        if (registries != null && !registries.isEmpty()) {
            for (RegistryConfig config : registries) {
                String address = config.getAddress();
                if (address == null || address.length() == 0) {
                    address = Constants.ANYHOST_VALUE;
                }
                // 如果地址為空,再次從配置文件中取
                String sysaddress = System.getProperty("dubbo.registry.address");
                if (sysaddress != null && sysaddress.length() > 0) {
                    address = sysaddress;
                }
                // 如果地址不為空,拼接協議類型、版本信息
                if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                    Map<String, String> map = new HashMap<String, String>();
                    appendParameters(map, application);
                    appendParameters(map, config);
                    map.put("path", RegistryService.class.getName());
                    map.put("dubbo", Version.getProtocolVersion());
                    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
                    if (ConfigUtils.getPid() > 0) {
                        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
                    }
                    // 默認dubbo協議
                    if (!map.containsKey("protocol")) {
                        if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
                            map.put("protocol", "remote");
                        } else {
                            map.put("protocol", "dubbo");
                        }
                    }
                    // 如果同一個標簽配置多個地址,則拆分
                    List<URL> urls = UrlUtils.parseURLs(address, map);
                    for (URL url : urls) {
                        url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
                        url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
                        if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                                || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                            registryList.add(url);
                        }
                    }
                }
            }
        }
        // 返回格式化后的注冊地址
        return registryList;
    }

 3、服務暴露過程

這里以dubbo協議為例,看一下發布的過程,在發布過程中,會用一個變量map保存URL的所有變量和value值,然后調用代理工程proxyFactory,獲取代理類,然后將invoker轉換成exporter,暴露服務,具體如下:

 protocol://host:port/path?key=value&key=value
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        //如果協議類型為null,則默認為dubbo協議
        String name = protocolConfig.getName();
        if (name == null || name.length() == 0) {
            name = "dubbo";
        }
        //map是保存url中key-Value的值
        Map<String, String> map = new HashMap<String, String>();
        //URL中的side屬性,有兩個值,一個provider,一個consumer,暴露服務的時候為provider
        map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
        //dubbo的版本號  url中的dubbo
        map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
       //url中的timestamp
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        //url中的pid
        if (ConfigUtils.getPid() > 0) {
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
        //從其他參數中獲取參數
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, provider, Constants.DEFAULT_KEY);
        appendParameters(map, protocolConfig);
        appendParameters(map, this);
        //忽略若干代碼
        
        if (ProtocolUtils.isGeneric(generic)) {
            map.put("generic", generic);
            map.put("methods", Constants.ANY_VALUE);
        } else {
            //url中的revesion字段
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }
            //拼接URL中的methods
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put("methods", Constants.ANY_VALUE);
            } else {
                map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        //token 臨牌校驗
        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put("token", UUID.randomUUID().toString());
            } else {
                map.put("token", token);
            }
        }
        //injvm協議
        if ("injvm".equals(protocolConfig.getName())) {
            protocolConfig.setRegister(false);
            map.put("notify", "false");
        }
        //獲取上下文路徑
        String contextPath = protocolConfig.getContextpath();
        if ((contextPath == null || contextPath.length() == 0) && provider != null) {
            contextPath = provider.getContextpath();
        }
        //獲取主機名
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        //獲取端口
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        //組裝URL,將map中的協議,版本號信息等
        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
         //如果url使用的協議存在擴展,調用對應的擴展來修改原url
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }
       
        String scope = url.getParameter(Constants.SCOPE_KEY);
         //配置為none不暴露
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                //如果不是remote,則暴露本地服務
                exportLocal(url);
            }
             //如果配置不是local則暴露為遠程服務
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
               //忽略日志 如果注冊中心地址不為null
                if (registryURLs != null && registryURLs.size() > 0) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                      //忽略不相干的代碼  
                      // 通過代理工廠將ref對象轉化成invoker對象
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        //代理invoker對象
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        // 暴露服務
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                         //一個服務可能有多個提供者,保存在一起
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

doExportUrlsFor1Protocol代碼再簡化一下,如下:

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
     Map map=builderUrl();
     // 通過代理工廠將ref對象轉化成invoker對象
     Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
     //代理invoker對象
     DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
     // 暴露服務
     Exporter<?> exporter = protocol.export(wrapperInvoker);
     //一個服務可能有多個提供者,保存在一起
     exporters.add(exporter);
}

拼接后的url:

dubbo://192.168.1.100:20880/com.dubbo.demo.api.DemoRpcService?anyhost=true&application=dubbo-demo-provider&bind.ip=192.168.1.100&bi

nd.port=20880&dubbo=2.6.0&generic=false&interface=com.dubbo.demo.api.DemoRpcService&methods=getUserName&pid=18740&side=provider&timestamp=1538311737815

 

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

而在上面proxyFactory.getInvoker中,很顯然是獲取到的是接口的代理類。

而在 protocol.export(wrapperInvoker)中,將服務暴露出去。

代碼如下:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        //忽略若干代碼
        //打開服務
        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }
private void openServer(URL url) {
       
        String key = url.getAddress();
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
         //是否server端
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                //如果服務不存在,創建服務
                serverMap.put(key, createServer(url));
            } else {
                server.reset(url);
            }
        }
    }
private ExchangeServer createServer(URL url) {
        //忽略若干代碼
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        return server;
    }

而在headerExchangerbind中,調用了Transporters.bind(),一直調用到NettyServer,綁定了端口和鏈接。

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

//Transporters.bind
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        //忽略很多代碼
        return getTransporter().bind(url, handler);
    }
//上段代碼的getTransporter()
 public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }

而在Transporter的定義中看到下面代碼:

@SPI("netty")
public interface Transporter {
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
}

所以這里調用的是NettyTransporter,這里啟動了一個新的NettyServer

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty4";

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }
}

在NettyServer的構造方法中,調用了父類的構造方法,調用了doOpen()方法指定了端口

public class NettyServer extends AbstractServer implements Server {
  public NettyServer(URL url, ChannelHandler handler) throws   RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
}

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
       super(url, handler);
       //忽略很多代碼  
      doOpen();
       //忽略很多代碼
}


 @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();

        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

到這里dubbo服務就啟動了,但是有一點還是有疑惑,那么,dubbo服務什么時候注冊到注冊中心的?帶着疑惑看了一下官方文檔。

也就是說,在調用DubboProtocol暴露服務之前,回去調用攔截器,當發現是regiester,則去注冊中心注冊服務。

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    //如果是registerProtocol,則調用RegisterProtocol.export方法
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return new ListenerExporterWrapper<T>(protocol.export(invoker),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }

而在RegisterProtocol.export

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        URL registryUrl = getRegistryUrl(originInvoker);

        //根據SPI機制獲取具體的Registry實例,這里獲取到的是ZookeeperRegistry
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        boolean register = registedProviderUrl.getParameter("register", true);
        if (register) {
            //在這里注冊服務
            register(registryUrl, registedProviderUrl);
           //忽略很多代碼
        }
        //忽略很多代碼
    }

    public void register(URL registryUrl, URL registedProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }

ZookeeperRegistry繼承父類FailbackRegistry,在父類的register方法中,調用了 doRegister,doRegister中,創建了ZK節點,這樣就將自己的服務暴露到注冊中心zk上:

@Override
    public void register(URL url) {
             //忽略很多代碼
            doRegister(url);
            //忽略很多代碼
    }

 protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

這樣,整個dubbo服務就啟動了。再回頭看官方文檔上的說明,就很清楚了。

 

三、類調用關系

1、provider提供方

ClassPathXmlApplicationContext <init>(構造方法)
-> ClassPathXmlApplicationContext refresh()
-> ClassPathXmlApplicationContext finishRefresh()
-> AbstractApplicationContext publishEvent()
-> ServiceBean onApplicationEvent()
-> ServiceConfig doExport()
#構造dubbo對象 application provider module protocol registry service reference consume等
 
-> ServiceConfig doExportUrls #導出URL,獲取注冊中心RegistryConfig
#注冊中心:registry://10.199.101.228:2181/com.alibaba.dubbo.registry.RegistryService?application=demo&backup=10.199.101.227:2181,10.199.101.229:2181&dubbo=2.4.9&pid=8045&registry=zookeeper&timestamp=1491546077803
 
-> ServiceConfig doExportUrlsFor1Protocol()
#需要暴露 dubbo://10.199.66.242:20880/com.unj.dubbotest.provider.DemoService?anyhost=true&application=dubbo_demo_provider&dubbo=2.4.9&interface=com.unj.dubbotest.provider.DemoService&methods=sayHello,getUsers&pid=8045&revision=0.0.1&side=provider&timestamp=1491546674441&version=0.0.1
 
-> ServiceConfig exportLocal()
-> Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
#暴露Invoker<XxxService>調用服務代理類
 
-> proxyFactory.getInvoker(ref, (Class) interfaceClass, local)
#返回 AbstractProxyInvoker代理ProxyInvoker<XxxService>
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
private final T proxy; //代理目標實例 XxxServiceImpl
private final Class<T> type;
private final URL url;
}
-> InvokerInvocationHandler.invoke()
#invoker.invoke(new RpcInvocation(method, args)).recreate();
 
-> DubboProtocol export(Invoker<T> invoker)
# 返回暴露Exporter<T>
public class DubboExporter<T> extends AbstractExporter<T> {
private final String key; //com.unj.dubbotest.provider.DemoService:0.0.1:20880
private final Map<String, Exporter<?>> exporterMap;
public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap){
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
}
 
-> DubboProtocol openServer(url)
#url dubbo://10.199.66.242:20880/com.unj.dubbotest.provider.DemoService?anyhost=true&application=dubbo_demo&dubbo=2.4.9&interface=com.unj.dubbotest.provider.DemoService&methods=sayHello,getUsers&pid=8045&revision=0.0.1&side=provider&timestamp=1491546674441&version=0.0.
#serverMap.put(key, createServer(url)); key:10.199.66.242:20880 value:ExchangeServer
 
-> DubboProtocol createServer(URL url)
#返回HeaderExchangeServer,添加參數列表 如心跳,心跳時間
-> Exchangers.bind(url, requestHandler);
#返回HeaderExchangeServer,getTransporter()獲取的實例來源於配置,默認返回一個NettyTransporter
-> HeaderExchangeServer.bind(URL url, ExchangeHandler handler);
 
-> HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
#HeaderExchangeServer包裝實例NettyServer
 
-> NettyTransporter.bind(URL url, ChannelHandler listener)
#return new NettyServer(url, listener)
 
-> NettyServer.doOpen();
#打開socket監聽端口准備接收消息
#ServerBootstrap bind(getBindAddress())綁定地址端口
#RpcInvocation 具體類名、方法名、調用參數
#DubboInvoker – 執行具體的遠程調用,包含初始化信息如client
#Protocol – 服務地址的發布和訂閱
#Exporter – 暴露服務的引用,或取消暴露

2、consume(消費方)

->ReferenceConfig.init
#consume端啟動初始化
->DubboProtocol.refer
#根據參數url,接口等構建Invoker
->JavassistProxyFactory.getProxy(Invoker<T> invoker, Class<?>[] interfaces)
#構建代理對象Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
 
->DemoService.say(String hello);#真正調用時候
->InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args)
#invoker.invoke(new RpcInvocation(method, args)).recreate();RpcInvocation包裝參數方法名
->DubboInvoker.doInovke(final Invocation invocation)
->MockClusterInvoker是否Mock
->FailfastClusterInvoker.invoke
->RegistryDirectory的methodInvokerMap 獲取invock列表
->Loadbalance RegistryDirectory.doList(Invocation)負載均衡
 
#統一代理調用
->ExchangeClient.send(invocation, isSent);
->HeaderExchangeChannel.request(Object request, int timeout)
->NettyChannel.send(Object message, boolean sent) 
 
 

3、dubbo 底層通訊

NettyClient <-- 異步NIO傳輸 socket監聽-> NettyServer
 

4、consume --> provider 調用過程

 
-> NettyServer->NettyHandler.messageReceived #接收消息處理器
-> MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol$requestHandler
#NettyServer啟動時候綁定MultiMessageHandler
#DubboProtocol.getServers() 檢索serverMap獲取Exporter<?>
#DubboProtocol.getServers() 檢索serverMap獲取ExchangeServer
-> ExchangeHandlerAdapter.reply
#真正獲取Invoker,將傳入message 轉換 invocation
-> invoker.invoke(invocation)
-> JavassistProxyFactory$AbstractProxyInvoker.doInvoke
#服務端Invoker代理 AbstractProxyInvoker調用目標引用service,客戶端DubboInvoker


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM