Dubbo消費方服務調用過程源碼分析


參考:
dubbo消費方服務調用過程源碼分析
dubbo基於spring的構建分析
Dubbo概述--調用過程
dubbo 請求調用過程分析
dubbo集群容錯機制代碼分析1
dubbo集群容錯策略的代碼分析2
dubbo spi機制源碼學習
Dubbo-服務調用過程

一、通過分析一個典型rpc方法調用的調用棧來說明調用過程。

1.定義一個接口
public interface DemoService {

    /**
     * class_name: sayHello
     * param: [param]
     * describe: say hello
     * creat_user: CoderZZ
     * creat_date: 2018-10-11
     * creat_time: 23:06
     **/
    String sayHello(String param);
    
        /**
     * class_name: sayGoodbye
     * param: [param]
     * describe: TODO
     * creat_user: CoderZZ
     * creat_date: 2018-10-12
     * creat_time: 0:27
     **/
    String sayGoodbye(String param);

    Person getPerson(String name);
}
View Code
2.實現該接口
public class DemoServiceImpl implements DemoService{

    public String sayHello(String param) {
        // 本端是否為提供端,這里會返回true
        boolean isProviderSide = RpcContext.getContext().isProviderSide();
        System.out.println("isProviderSide:"+isProviderSide);
        // 獲取調用方IP地址
        String clientIP = RpcContext.getContext().getRemoteHost();
        System.out.println("clientIP:"+clientIP);
        // 獲取當前服務配置信息,所有配置信息都將轉換為URL的參數
        String application = RpcContext.getContext().getUrl().getParameter("application");
        System.out.println("application:"+application);
        String address = RpcContext.getContext().getUrl().getAddress();
        System.out.println("address:"+address);
        String index = RpcContext.getContext().getAttachment("index");
        System.out.println("getAttachment index:"+index);
        return "Hello "+param;
    }
    //以下省略
    ...............
}

 

3.服務端通過注冊中心發布服務,默認是dubbo協議發布(dubbo-provider.xml)
<dubbo:registry id="workpalceRegistry" address="zookeeper://192.168.33.117:2181" default="true"/>
<dubbo:protocol name="dubbo" port="20880" dispatcher="all" threadpool="fixed" threads="100" accepts="10" status="test"/>
<dubbo:service interface="com.zxd.dubbo.learning.api.DemoService" ref="demoServiceImpl" protocol="dubbo" registry="workpalceRegistry" executes="10" timeout="60000"/>
<bean id="demoServiceImpl" class="com.zxd.dubbo.learning.provider.DemoServiceImpl"/>

 

4.客戶端通過注冊中心引用這個服務,注冊中心用zookeepr協議實現(dubbo-consumer2.xml)
 <dubbo:registry address="zookeeper://192.168.33.117:2181"/>
 <!--客戶端連接控制-->
 <!--connections="10"-->
 <!--限制客戶端服務使用連接不能超過 10 個-->
 <dubbo:reference interface="com.zxd.dubbo.learning.api.DemoService" id="demoService" retries="2" loadbalance="random" actives="10" connections="10"/>
 <!--如果是線上需求需要點對點,可在 <dubbo:reference> 中配置 url 指向提供者,將繞過注冊中心,多個地址用分號隔開-->
 <!--<dubbo:reference id="demoService" interface="com.zxd.dubbo.learning.api.DemoService" url="dubbo://localhost:20880"/>-->
5.啟動服務端,客戶端調用Dubbo服務
    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
        ClassPathXmlApplicationContext classPathXmlApplicationContext =
                new ClassPathXmlApplicationContext("classpath:dubbo-consumer2.xml");
//        classPathXmlApplicationContext.start();
        System.out.println("Consumer started!");
        DemoService demoService = classPathXmlApplicationContext.getBean("demoService",DemoService.class);
        //隱式參數
        //注意:path, group, version, dubbo, token, timeout 幾個 key 是保留字段,請使用其它值
        RpcContext.getContext().setAttachment("index","1");
        String rt = demoService.sayHello("world");
        System.out.println(rt);
    }
6.執行main方法獲得以下輸出
log4j:WARN No appenders could be found for logger (org.springframework.core.env.StandardEnvironment).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Consumer started!
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Hello world

Process finished with exit code 0

二、斷點跟蹤(dubbo默認底層的傳輸框架是netty)

看下 com.alibaba.dubbo.remoting.transport.netty.NettyClient 類 doOpen 方法

 /***
     * 打開到遠端服務機器的連接
     * @throws Throwable
     */
    @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        bootstrap = new ClientBootstrap(channelFactory);
        // config
        // @see org.jboss.netty.channel.socket.SocketChannelConfig
        bootstrap.setOption("keepAlive", true);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("connectTimeoutMillis", getConnectTimeout());
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                //設置消息流的處理handler,發出去的消息先經過handler再經過encoder,
                //這里斷點可以設置在nettyHandler類里。
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
    }

 NettyHandler 類繼承了netty的 SimpleChannelHandler 類,並實現了 writeRequested 方法:

    @Override
    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        super.writeRequested(ctx, e);//此處打斷點
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.sent(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }

NettyHandler類的繼承關系圖:
NettyHandler
在 super.writeRequested(ctx, e); 處打斷點,然后運行main方法,在程序斷點處得到以下線程堆棧信息:

"main@1" prio=5 tid=0x1 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(NettyHandler.java:98)
      at org.jboss.netty.channel.SimpleChannelHandler.handleDownstream(SimpleChannelHandler.java:266)
      at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
      at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
      at org.jboss.netty.channel.Channels.write(Channels.java:611)
      at org.jboss.netty.channel.Channels.write(Channels.java:578)
      at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:251)
      at com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(NettyChannel.java:100)
      at com.alibaba.dubbo.remoting.transport.AbstractClient.send(AbstractClient.java:265)
      at com.alibaba.dubbo.remoting.transport.AbstractPeer.send(AbstractPeer.java:53)
      at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.request(HeaderExchangeChannel.java:116)
      at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeClient.request(HeaderExchangeClient.java:90)
      at com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker.doInvoke(DubboInvoker.java:95)
      at com.alibaba.dubbo.rpc.protocol.AbstractInvoker.invoke(AbstractInvoker.java:155)
      at com.alibaba.dubbo.rpc.listener.ListenerInvokerWrapper.invoke(ListenerInvokerWrapper.java:77)
      at com.alibaba.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:75)
      at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:72)
      at com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:54)
      at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:72)
      at com.alibaba.dubbo.rpc.filter.ActiveLimitFilter.invoke(ActiveLimitFilter.java:70)
      at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:72)
      at com.alibaba.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:49)
      at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:72)
      at com.alibaba.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56)
      at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:78)
      at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:244)
      at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:75)
      at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52)
      at com.alibaba.dubbo.common.bytecode.proxy0.sayHello(proxy0.java:-1)
      at com.zxd.dubbo.learning.consumer.Consumer2.main(Consumer2.java:37)

從下向上看,可以看到客戶端方法調用經過的類和方法。
第二行棧信息:

at com.alibaba.dubbo.common.bytecode.proxy0.sayHello(proxy0.java:-1)

 com.alibaba.dubbo.common.bytecode.proxy0 類是一個代理類,代理了所有RPC服務接口的方法調用。
這個類實例什么時候創建,類代碼是什么樣的?參見博文dubbo基於spring的構建分析
大體過程為:

//Dubbo xml scheme解析處理類
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    @Override
    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));//處理<dubbo:reference>標簽
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }
}

以下為 ReferenceBean 的類繼承關系圖:
ReferenceBean
代理的創建是由 ReferenceBean 類里的 getObject() 方法里觸發:

   @Override
    public Object getObject() throws Exception {
        return get();
    }

 get() 方法在 ReferenceConfig 類中:

    public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("Already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }
    
   private void init() {
           .............
           ref = createProxy(map);
        .............
   }
   
    /***
     * 創建客戶端rpc調用代理
     * @param map
     * @return
     */
       @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
       //....用於生成invoker的邏輯
        //創建服務代理
       // create service proxy
       return (T) proxyFactory.getProxy(invoker);
    }

 proxyFactory 的聲明為:

 private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); 

由博文dubbo spi機制源碼學習可以得到 ProxyFactory 接口的 Adaptive 類的 getProxy 方法源碼如下:

public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
    public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
         //這里默認用了ProxyFactory javassist擴展的getProxy方法創建代理
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0);
    }
}

 ProxyFactory 為接口類,實現類有抽象類 AbstractProxyFactory 和類 StubProxyFactoryWrapper 。 AbstractProxyFactory 包含抽象方法 public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types); ,該抽象方法實現類有 JavassistProxyFactory 和 JdkProxyFactory ,如下:
AbstractProxyFactory
 ProxyFactory 接口的javassist擴展類 JavassistProxyFactory 的 getProxy 方法實現如下:

/**
 * JavaassistRpcProxyFactory
 */
public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
          //代理類實現化以new InvokerInvocationHandler(invoker)為參數
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
}

生成動態代理的 Proxy 類

    /**
     * Get proxy.
     *
     * @param ics interface class array.
     * @return Proxy instance.
     */
    public static Proxy getProxy(Class<?>... ics) {
        return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
    }
        /**
     * Get proxy.
     *
     * @param cl  class loader.
     * @param ics interface class array.可以實現多個接口
     * @return Proxy instance.
     */
    public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
        if (ics.length > 65535)
            throw new IllegalArgumentException("interface limit exceeded");

        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < ics.length; i++) {
            String itf = ics[i].getName();
            if (!ics[i].isInterface())
                throw new RuntimeException(itf + " is not a interface.");

            Class<?> tmp = null;
            try {
                tmp = Class.forName(itf, false, cl);
            } catch (ClassNotFoundException e) {
            }

            if (tmp != ics[i])
                throw new IllegalArgumentException(ics[i] + " is not visible from class loader");

            sb.append(itf).append(';');
        }

        // use interface class name list as key.
        // 用接口類名做key,多個接口以";"分開。
        String key = sb.toString();

        // get cache by class loader.
        // 緩存
        Map<String, Object> cache;
        synchronized (ProxyCacheMap) {
            cache = ProxyCacheMap.get(cl);
            if (cache == null) {
                cache = new HashMap<String, Object>();
                ProxyCacheMap.put(cl, cache);
            }
        }

        Proxy proxy = null;
        synchronized (cache) {
            do {
                Object value = cache.get(key);
                if (value instanceof Reference<?>) {
                    //如果有存在引用對象,返回緩存對象。
                    proxy = (Proxy) ((Reference<?>) value).get();
                    if (proxy != null)
                        return proxy;
                }
                //對象正在生成,線程掛起,等待
                if (value == PendingGenerationMarker) {
                    try {
                        cache.wait();
                    } catch (InterruptedException e) {
                    }
                } else {
                    //放入正在生成標識
                    cache.put(key, PendingGenerationMarker);
                    break;
                }
            }
            while (true);
        }
        //類名稱后自動加序列號 0,1,2,3...
        long id = PROXY_CLASS_COUNTER.getAndIncrement();
        String pkg = null;
        //ClassGenerator dubbo用javassist實現的工具類
        ClassGenerator ccp = null, ccm = null;
        try {
            ccp = ClassGenerator.newInstance(cl);

            Set<String> worked = new HashSet<String>();
            List<Method> methods = new ArrayList<Method>();

            for (int i = 0; i < ics.length; i++) {
                //檢查包名稱及不同包的修飾符
                if (!Modifier.isPublic(ics[i].getModifiers())) {
                    String npkg = ics[i].getPackage().getName();
                    if (pkg == null) {
                        pkg = npkg;
                    } else {
                        if (!pkg.equals(npkg))
                            throw new IllegalArgumentException("non-public interfaces from different packages");
                    }
                }
                //代理類添加要實現的接口Class對象
                ccp.addInterface(ics[i]);

                for (Method method : ics[i].getMethods()) {
                    //獲取方法描述符,不同接口,同樣的方法,只能被實現一次。
                    String desc = ReflectUtils.getDesc(method);
                    if (worked.contains(desc))
                        continue;
                    worked.add(desc);

                    int ix = methods.size();
                    //方法返回類型
                    Class<?> rt = method.getReturnType();
                    //方法參數類型列表
                    Class<?>[] pts = method.getParameterTypes();
                    //生成接口的實現代碼,每個方法都一樣
                    StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                    for (int j = 0; j < pts.length; j++)
                        code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                    code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
                    if (!Void.TYPE.equals(rt))
                        code.append(" return ").append(asArgument(rt, "ret")).append(";");

                    methods.add(method);
                    ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
                }
            }

            if (pkg == null)
                pkg = PACKAGE_NAME;

            // create ProxyInstance class.
            // 具體代理類名稱,這里是類全名
            String pcn = pkg + ".proxy" + id;
            ccp.setClassName(pcn);
            ccp.addField("public static java.lang.reflect.Method[] methods;");
            ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
            //創建構造函數
            ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
            ccp.addDefaultConstructor();
            Class<?> clazz = ccp.toClass();
            //通過反射,把method數組放入,靜態變量methods中,
            clazz.getField("methods").set(null, methods.toArray(new Method[0]));

            // create Proxy class.
            String fcn = Proxy.class.getName() + id;
            ccm = ClassGenerator.newInstance(cl);
            ccm.setClassName(fcn);
            ccm.addDefaultConstructor();
            //設置父類為Proxy類
            ccm.setSuperClass(Proxy.class);
            //生成實現它的抽象方法newInstance代碼,new 的實例對象,是上面生成的代理類 pcn
            ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
            Class<?> pc = ccm.toClass();
            proxy = (Proxy) pc.newInstance();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        } finally {
            // release ClassGenerator
            if (ccp != null)
                ccp.release();
            if (ccm != null)
                ccm.release();
            synchronized (cache) {
                if (proxy == null)
                    cache.remove(key);
                else
                    //放入緩存,key:實現的接口名,value 代理對象,這個用弱引用,
                    //當jvm gc時,會打斷對實例對象的引用,對象接下來就等待被回收。
                    cache.put(key, new WeakReference<Proxy>(proxy));
                cache.notifyAll();
            }
        }
        return proxy;
    }
View Code

通過 javaagent 可以導出動態代理class文件源碼,以下為生成的代理類源碼,動態生成了兩個類:

package com.alibaba.dubbo.common.bytecode;

import java.lang.reflect.InvocationHandler;

public class Proxy0 extends Proxy
  implements ClassGenerator.DC
{
  public Object newInstance(InvocationHandler paramInvocationHandler)
  {
    return new proxy0(paramInvocationHandler);
  }
}

這個類繼承了抽象類 Proxy ,實現了它的抽象方法 newInstance ,接口 ClassGenerator.DC 是dubbo內部作為動態類標識的接口。
還有一個類 proxy0 ,就是在開始方法棧里看到的代理類,源碼如下:

package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.rpc.service.EchoService;
import com.zxd.dubbo.learning.api.DemoService;
import com.zxd.dubbo.learning.api.Person;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class proxy0
  implements ClassGenerator.DC, EchoService, DemoService
{
  public static Method[] methods;
  private InvocationHandler handler;
  //實現了接口方法
  public String sayHello(String paramString)
  {
    Object[] arrayOfObject = new Object[1];
    arrayOfObject[0] = paramString;
    Object localObject = this.handler.invoke(this, methods[0], arrayOfObject);//實際調用邏輯
    return (String)localObject;
  }
  //實現了接口方法
  public String sayGoodbye(String paramString)
  {
    Object[] arrayOfObject = new Object[1];
    arrayOfObject[0] = paramString;
    Object localObject = this.handler.invoke(this, methods[1], arrayOfObject);
    return (String)localObject;
  }
 //實現了接口方法
  public Person getPerson(String paramString)
  {
    Object[] arrayOfObject = new Object[1];
    arrayOfObject[0] = paramString;
    Object localObject = this.handler.invoke(this, methods[2], arrayOfObject);
    return (Person)localObject;
  }
  //回顯測試接口
  public Object $echo(Object paramObject)
  {
    Object[] arrayOfObject = new Object[1];
    arrayOfObject[0] = paramObject;
    Object localObject = this.handler.invoke(this, methods[3], arrayOfObject);
    return (Object)localObject;
  }

  public proxy0()
  {
  }

  public proxy0(InvocationHandler paramInvocationHandler)
  {
    //public構造函數,這里handler是
    //由Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker))語句傳入的InvokerInvocationHandler對象
    this.handler = paramInvocationHandler;
  }
}

可以看到代理類實現了3個接口。
 ClassGeneratr.DC 是dubbo動態類標識接口;
 DemoService 是實際業務接口。這樣代理就可以調用服務方法了;
 EchoService 是回顯測試接口,它能為所有dubbo rpc服務加上的一個回顯測試方法,只有一個方法:

package com.alibaba.dubbo.rpc.service;

/**
 * Echo service.
 * @export
 */
public interface EchoService {

    /**
     * echo test.
     *
     * @param message message.
     * @return message.
     */
    Object $echo(Object message);

}

 EchoService echoService = (EchoService) demoService; // 通過強制轉型為EchoService,可以測試。 
通過查看 proxy0.class 的 sayHello 方法,其實際調用的是bject localObject = this.handler.invoke(this, methods[0], arrayOfObject); 
與上述堆棧信息第三行一致:

 at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52) 

 InvokerInvocationHandler 如下:

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;
    //通過構造函數傳入invoker
    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        //如果是Object類方法
        if (method.getDeclaringClass() == Object.class) {
            //反射調用
            return method.invoke(invoker, args);
        }
        //對3個特殊方法的調用,做了處理
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        //其他業務方法通過invoker.invoke方法調用
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

這里的 invoker 對象,通過 InvokerInvocationHandler 構造方法傳入,而 InvokerInvocationHandler 對象是由 JavassistProxyFactory 類 getProxy(Invoker<T> invoker, Class<?>[] interfaces) 方法創建。
回到調用 proxyFactory.getProxy(invoker); 方法的地方,即 ReferenceConfig 類的 createProxy(Map<String, String> map) 方法:

    @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        final boolean isJvmRefer;
        if (isInjvm() == null) {
            if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                isJvmRefer = false;
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                // by default, reference local service if there is
                isJvmRefer = true;
            } else {
                isJvmRefer = false;
            }
        } else {
            isJvmRefer = isInjvm().booleanValue();
        }

        if (isJvmRefer) {
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // assemble URL from register center's configuration
                List<URL> us = loadRegistries(false);
                if (us != null && !us.isEmpty()) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }
            //只有一個直連地址或一個注冊中心配置地址
            if (urls.size() == 1) {
                  //這里的urls.get(0)協議,可能是直連地址(默認dubbo協議),也可能是regiter注冊地址(zookeeper協議)
                //本例通過配置一個注冊中心的形式
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
               //多個直連地址或者多個注冊中心地址,甚至是兩者的組合。
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    //創建invoker放入invokers
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        // 多個注冊中心,用最后一個registry url
                        registryURL = url; // use last registry url
                    }
                }
                //有注冊中心協議的URL,
                //對多個url,其中存在有注冊中心的,寫死用AvailableCluster集群策略
                 //這其中包括直連和注冊中心混合或者都是注冊中心兩種情況
                if (registryURL != null) { // registry url is available
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url (多個直連的url)
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        if (c && !invoker.isAvailable()) {
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        // create service proxy
        return (T) proxyFactory.getProxy(invoker);
    }
View Code

可以看到 invoker 是通過 refprotocol.refer(interfaceClass, urls.get(0)); 或者 cluster.join(new StaticDirectory(u, invokers)); 、 cluster.join(new StaticDirectory(invokers)); 三種構建語句依照條件選一種調用生成。
這里分析第一種生成 invokder 的情況,根據spi機制這里 refprotocol 對象是 Protocol$Adpative 實例,具體refer實現是:

public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
    if (arg1 == null)
        throw new IllegalArgumentException("url == null");
    com.alibaba.dubbo.common.URL url = arg1;
    String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
    if (extName == null)
        throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.refer(arg0, arg1);
}

通過代碼可以得知,Protocol具體實現要根據urlProtocol值再通過spi得到.如果是直連地址,這里就是dubbo協議,最后走 DubboProtocol 類的refer方法.
具體實現是:

    @Override
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

如果是注冊中心,這里protocol是 register ,會走 RegistryProtocol 類的 refer 方法:

    @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        //通過register 可以獲取具體注冊中心協議,這里是zookeeper,並設置為url的協議值。
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        //獲取zookeeper Registry 實現,即ZookeeperRegistryFactory ,並調用getRegistry方法實現
         //獲取zookeeper類型的registry對象
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        //這里cluster是Cluster$Adpative類對象
        return doRefer(cluster, registry, type, url);
    }
    
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
       //這里的RegistryDirectory和StaticDirectory向對應的,前者是動態從注冊中心獲取url目錄對象,后者是靜態指定url目錄。
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        //訂閱注冊中心,可以獲取服務提供方地址等信息
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));
        //通過調用Cluster$Adpative類的join方法返回Invoker對象(***看這里***)
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

這里看下 Cluster$Adpative 類 join 方法實現:

public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
    if (arg0 == null)
        throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
    if (arg0.getUrl() == null)
        throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
    com.alibaba.dubbo.common.URL url = arg0.getUrl();
    //通過cluster獲取集群策略,默認是failover
    //本例是使用failover機制
    String extName = url.getParameter("cluster", "failover");
    if(extName == null)
        throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
    com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
    //通過spi這里得到FailoverCluster對象
    return extension.join(arg0);
    }

再看下 FailoverCluster 的 join 方法:

public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        //返回FailoverClusterInvoker對象
        return new FailoverClusterInvoker<T>(directory);
    }

}

由於Cluster spi實現中有個 MockClusterWrapper 是包裝類,這里牽涉到是dubbo的aop機制,這里先調用它的join方法:

public class MockClusterWrapper implements Cluster {

    private Cluster cluster;

    public MockClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new MockClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }

}

又由於 FailoverClusterInvoker 是 AbstractClusterInvoker 的子類,它的 invoke 方法實現在其父類中的,所以如下方法棧信息:

      at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:78)
      at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:244)
      at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:75)

這些類都是dubbo的集群容錯.博文dubbo集群容錯機制代碼分析是關於集群容錯的介紹.

再往下看 AbstractClusterInvoker 的 invoke 方法實現:

    @Override
    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
        LoadBalance loadbalance = null;

        // binding attachments into invocation.
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addAttachments(contextAttachments);
        }
        //會調用directory的list方法 返回要調用invokers集合。
        //其實是AbstractDirectory的list方法,這個方法里就是利用路由規則(如果有),從所有
        //提供者中,選出符合規則的提供者
        //接下里才是,集群容錯和負載均衡。
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && !invokers.isEmpty()) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }

    protected void checkWhetherDestroyed() {

        if (destroyed.get()) {
            throw new RpcException("Rpc cluster invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost()
                    + " use dubbo version " + Version.getVersion()
                    + " is now destroyed! Can not invoke any more.");
        }
    }

list方法:

    protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        List<Invoker<T>> invokers = directory.list(invocation);
        return invokers;
    }

跟到 RegistryDirectory 類的 list 方法,實現在其父類 AbstractDirectory 中:

    @Override
    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }
        //獲取所有的提供者
        //這里doList是個抽象方法,由RegistryDirectory實現具體:
        List<Invoker<T>> invokers = doList(invocation);
        List<Router> localRouters = this.routers; // local reference
        if (localRouters != null && !localRouters.isEmpty()) {
            for (Router router : localRouters) {
                try {
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                        //Router接口,實現類的rout的方法。路由獲取服務提供者
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
        }
        return invokers;
    }

 RegistryDirectory 實現具體的 doList 方法:

    @Override
    public List<Invoker<T>> doList(Invocation invocation) {
        if (forbidden) {
            // 1. 沒有服務提供者 2. 服務提供者被禁用
            // 1. No service provider 2. Service providers are disabled
            throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
                "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()
                        + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
        }
        List<Invoker<T>> invokers = null;
        //methodInvokerMap在refreshInvoker方法里賦值
        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
            String methodName = RpcUtils.getMethodName(invocation);
            Object[] args = RpcUtils.getArguments(invocation);
            if (args != null && args.length > 0 && args[0] != null
                    && (args[0] instanceof String || args[0].getClass().isEnum())) {
                invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter(可根據第一個參數枚舉路由)
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(methodName);
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
            }
            if (invokers == null) {
                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                if (iterator.hasNext()) {
                    invokers = iterator.next();
                }
            }
        }
        return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
    }

下面是 refreshInvoker(List<URL> invokerUrls) 方法:

    private void refreshInvoker(List<URL> invokerUrls) {
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            this.forbidden = true; // Forbid to access
            this.methodInvokerMap = null; // Set the method invoker map to null
            destroyAllInvokers(); // Close all invokers
        } else {
            this.forbidden = false; // Allow to access
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<URL>();
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            if (invokerUrls.isEmpty()) {
                return;
            }
            //生成Invoker方法 toInvokers
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
            // state change
            // If the calculation is wrong, it is not processed.
            //如果計算錯誤,則不進行處理
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                return;
            }
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try {
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker(關閉未使用的Invoker)
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }

 refreshInvoker() 方法會在 RegistryDirectory 類的 notify(List<URL> urls) 方法里調用,這個方法也是訂閱注冊中心回調方法.
以下是 toInvokers 方法:

    /**
     * Turn urls into invokers, and if url has been refer, will not re-reference.
     * 將urls轉成invokers,如果url已經被refer過,不再重新引用。
     * @param urls
     * @return invokers
     */
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<String>();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            // If protocol is configured at the reference side, only the matching protocol is selected
            //如果reference端配置了protocol,則只選擇匹配的protocol
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    continue;
                }
            }
            if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
                        + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // The parameter urls are sorted(URL參數是排序的)
            if (keys.contains(key)) { // Repeated url(重復URL)
                continue;
            }
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
            //緩存key為沒有合並消費端參數的URL,不管消費端如何合並參數,如果服務端URL發生變化,則重新refer
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again( 緩存中沒有,重新refer)
                try {
                    boolean enabled = true;
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
                       //這里是invoker的創建的地方
                        invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                if (invoker != null) { // Put new invoker in cache(將新的引用放入緩存)
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }

 invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl) 是 invoker 的創建語句。
 InvokerDelegate 是 RegistryDirectory 的內部類:

    /**
     * The delegate class, which is mainly used to store the URL address sent by the registry,and can be reassembled on the basis of providerURL queryMap overrideMap for re-refer.
     * 代理類,主要用於存儲注冊中心下發的url地址, 用於重新refer時能夠根據providerURL queryMap overrideMap重新組裝
     * @param <T>
     */
    private static class InvokerDelegate<T> extends InvokerWrapper<T> {
        private URL providerUrl;

        public InvokerDelegate(Invoker<T> invoker, URL url, URL providerUrl) {
            //調用父類構造方法
            super(invoker, url);
            this.providerUrl = providerUrl;
        }

        public URL getProviderUrl() {
            return providerUrl;
        }
    }

 invoke 方法在其父類 InvokerWrapper 里實現:

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        //這里的invoker是從它的構造方法里傳入的
        return invoker.invoke(invocation);
    }

所以方法棧里可以看到下面一行棧信息:

 at com.alibaba.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56) 

 InvokerDelegete 構造方法調用的父類 InvokerWrapper 的構造方法並傳入 invoker ,回頭看 new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl); 這句。可知上面的 invoker 是由 protocol.refer(serviceType, url) 創建的。

通過debug,可知這里的 protocol 是 Protocol$Adpative 類型,這里的urlProtocoldubbo,通過spi可以得到這里最后走 DubboProtocol 類refer方法但是由於 Protocal 接口實現中,有兩個包裝類:

filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper

所以這里先執行 ProtocolFilterWrapper 的 refer 方法,再執行 ProtocolListenerWrapper 的 refer 方法,最后才執行 DubboProtocol 類 refer 方法。

 ProtocolFilterWrapper 的 refer 方法如下:

    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }
    
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        //先獲取激活的過濾器,我們這里手動配置了monitor MonitorFilter過濾器,
        // 另外兩個自動激活的過濾器是FutureFilter,ConsumerContextFilter
        //這里需要看spi機制的getActivateExtension方法相關代碼
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    //實現invoker的 invoke方法
                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        //嵌套進過濾器鏈
                        return filter.invoke(next, invocation);
                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }

所以有以下調用棧信息:

      at com.alibaba.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:75)
      at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:72)
      at com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:54)
      at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:72)
      at com.alibaba.dubbo.rpc.filter.ActiveLimitFilter.invoke(ActiveLimitFilter.java:70)
      at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:72)
      at com.alibaba.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:49)
      at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:72)

接着 ProtocolListenerWrapper 的 refer 方法:

    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
               //獲取激活的監聽器,目前dubbo沒有提供合適的監聽器,只有一個DeprecatedInvokerListener實現類,還是個Deprecated的
               //所以這里為空
                Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class).getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
    }

這個可以解釋下面這句堆棧信息:

 at com.alibaba.dubbo.rpc.listener.ListenerInvokerWrapper.invoke(ListenerInvokerWrapper.java:77) 

最后看下 DubboProtocol 類 refer 方法,這里創建了 DubboInvoker 對象:

    @Override
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

 DubboInvoker 的父類 AbstractInvoker 實現了 invoke 方法:

    @Override
    public Result invoke(Invocation inv) throws RpcException {
        // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
        if (destroyed.get()) {
            logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                    + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
        }

        RpcInvocation invocation = (RpcInvocation) inv;
        invocation.setInvoker(this);
        if (attachment != null && attachment.size() > 0) {
            invocation.addAttachmentsIfAbsent(attachment);
        }
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            /**
             * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
             * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
             * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
             * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
             */
            invocation.addAttachments(contextAttachments);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);


        try {
            //doInvoke 具體實現在子類中
            return doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            Throwable te = e.getTargetException();
            if (te == null) {
                return new RpcResult(e);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                }
                return new RpcResult(te);
            }
        } catch (RpcException e) {
            if (e.isBiz()) {
                return new RpcResult(e);
            } else {
                throw e;
            }
        } catch (Throwable e) {
            return new RpcResult(e);
        }
    }

 DubboInvoker 實現的 doInvoke 方法:

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                //實際的請求語句 ,這里的currentClient是自身對象屬性clients[0]值
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

所以有以下兩句調用棧輸出信息:

      at com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker.doInvoke(DubboInvoker.java:95)
      at com.alibaba.dubbo.rpc.protocol.AbstractInvoker.invoke(AbstractInvoker.java:155)

接下來看用於發起請求的 currentClient 對象的的實現,它的實現可追蹤到 DubboProtocol 類 refer 方法里:

    @Override
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // create rpc invoker.
        //getClients(url) 創建 DubboInvoker 屬性clients對象,
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }
    
    
    private ExchangeClient[] getClients(URL url) {
        // whether to share connection
        //是否共享連接
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        // if not configured, connection is shared, otherwise, one connection for one service
        //如果connections不配置,則共享連接,否則一個連接一個服務
        if (connections == 0) {
            service_share_connect = true;
            connections = 1;
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect) {
                  //獲取共享連接
                clients[i] = getSharedClient(url);
            } else {
                //初始化client,本例子不是共享連接,走這個邏輯
                clients[i] = initClient(url);
            }
        }
        return clients;
    }
    
    /**
     * Create new connection
     */
    private ExchangeClient initClient(URL url) {

        // client type setting.
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        // enable heartbeat by default
        //默認開啟heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        // BIO is not allowed since it has severe performance issue.
        // BIO存在嚴重性能問題,暫時不允許使用
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client;
        try {
            // connection should be lazy
            //設置連接應該是lazy的
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                //通過 Exchangers.connect(url, requestHandler); 構建client ,接下來跟蹤Exchangers.connect方法
                   //這里會傳入一個requestHandler,這個是客戶端接收服務端方法返回回調的
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }

這里用到了 facade 設計模式, Exchangers 是個門面類,封裝了具體查找合適的 Exchanger 實現,並調用 connect 方法返回 ExchangeClient 的過程,相關方法代碼如下:

    public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        //把codec key 設置為exchange
        return getExchanger(url).connect(url, handler);
    }

    public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        //通過exchanger key 獲取 Exchanger的spi實現,默認是header,這里是HeaderExchanger類
        return getExchanger(type);
    }

    public static Exchanger getExchanger(String type) {
        //這里返回Exchanger接口的header擴展類HeaderExchanger
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }

 HeaderExchanger 類 connect 方法如下:

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    //客戶端的連接操作
    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        //返回HeaderExchangeClient對象
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

所以有棧信息:

at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeClient.request(HeaderExchangeClient.java:90)

再看 HeaderExchangeClient 的 request 方法:

    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        //這里channel對象是從類構造函數中賦值,this.channel = new HeaderExchangeChannel(client);如下
        return channel.request(request);
    }
    
    
    public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        this.channel = new HeaderExchangeChannel(client);//channel賦值
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        if (needHeartbeat) {
            startHeartbeatTimer();
        }
    }

所以有棧信息:

 atcom.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.request(HeaderExchangeChannel.java:116) 

繼續查看 HeaderExchangeChannel 類 request 方法:

    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
    }

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
           //通過具體channel 發送請求
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

這里有 channel 對象,這里的 channel 對象也是通過 HeaderExchangeChannel 類的構造函數,從上層方法傳進來的,而 HeaderExchangeChannel 是由 HeaderExchangeClient 構造的, HeaderExchangeClient 對象是由 HeaderExchanger 的 connect 方法里創建的。這里回到 HeaderExchanger 的 connect 方法:

/**
 * DefaultMessenger
 *
 *
 */
public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

可以看到上文中 HeaderExchangeChannel 類中發送消息的 channel 對象是 Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))) 這句創建的。這里的 Transporters 也是個門面類,是 facade 設計模式的實現, Transporters 具體 connect 方法實現如下:

    public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        //這里具體走 NettyTransporter.connect
        //        public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        //            return new NettyClient(url, listener);
        //        }
        /所以這里默認返回的NettyClient
        return getTransporter().connect(url, handler);
    }
    //這個方法根據spi返回NettyTransporter擴展類
    public static Transporter getTransporter() {
       //這里通過生成的Transporter$Adaptive 的實現如下:
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }

所以最后是通過 NettyClient 類實例的 send 方法發送的具體請求, NettyClient 類 send 方法實現在其祖先類 AbstractPeer 中:

    @Override
    public void send(Object message) throws RemotingException {
        send(message, url.getParameter(Constants.SENT_KEY, false));
    }

這個實現又調用 NettyClient 父類 AbstractClient 的 send 方法實現:

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (send_reconnect && !isConnected()) {
            connect();
        }
        //獲取具體channel實例
        Channel channel = getChannel();
        //TODO Can the value returned by getChannel() be null? need improvement.
        //TODO getChannel返回的狀態是否包含null需要改進
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
        }
        channel.send(message, sent);
    }

這里的 getChannel() 方法由NettyClient自身實現,如下:

    @Override
    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isConnected())
            return null;
        return NettyChannel.getOrAddChannel(c, getUrl(), this);
    }
    
    
   //再到NettyChannel.getOrAddChannel方法
   static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
        if (ch == null) {
            return null;
        }
        //返回NettyChannel類
        NettyChannel ret = channelMap.get(ch);
        if (ret == null) {
            NettyChannel nc = new NettyChannel(ch, url, handler);
            if (ch.isConnected()) {
                ret = channelMap.putIfAbsent(ch, nc);
            }
            if (ret == null) {
                ret = nc;
            }
        }
        return ret;
    }

所以有以下棧信息:

      at com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(NettyChannel.java:100)
      at com.alibaba.dubbo.remoting.transport.AbstractClient.send(AbstractClient.java:265)
      at com.alibaba.dubbo.remoting.transport.AbstractPeer.send(AbstractPeer.java:53)

后面就是jboss內部的調用和消息轉換:

      at org.jboss.netty.channel.SimpleChannelHandler.handleDownstream(SimpleChannelHandler.java:266)
      at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
      at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
      at org.jboss.netty.channel.Channels.write(Channels.java:611)
      at org.jboss.netty.channel.Channels.write(Channels.java:578)
      at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:251)

最后就走到開始打斷點的 NettyHandler 類 writeRequested 方法:

 at com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(NettyHandler.java:98) 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM