Dubbo-服務消費者遠程調用


 

 

執行過程如下圖所示
代理bean方法調用,即代理bean方法調用
 
我們知道demoService的bean是一個代理類,並且這個代理類繼承com.alibaba.dubbo.common.bytecode.Proxy這個類,代理類中sayHello方法內部代碼如下:
(來源於Dubbo官網)
/**
 * Arthas 反編譯步驟:
 * 1. 啟動 Arthas
 *    java -jar arthas-boot.jar
 *
 * 2. 輸入編號選擇進程
 *    Arthas 啟動后,會打印 Java 應用進程列表,如下:
 *    [1]: 11232 org.jetbrains.jps.cmdline.Launcher
 *    [2]: 22370 org.jetbrains.jps.cmdline.Launcher
 *    [3]: 22371 com.alibaba.dubbo.demo.consumer.Consumer
 *    [4]: 22362 com.alibaba.dubbo.demo.provider.Provider
 *    [5]: 2074 org.apache.zookeeper.server.quorum.QuorumPeerMain
 * 這里輸入編號 3,讓 Arthas 關聯到啟動類為 com.....Consumer 的 Java 進程上
 *
 * 3. 由於 Demo 項目中只有一個服務接口,因此此接口的代理類類名為 proxy0,此時使用 sc 命令搜索這個類名。
 *    $ sc *.proxy0
 *    com.alibaba.dubbo.common.bytecode.proxy0
 *
 * 4. 使用 jad 命令反編譯 com.alibaba.dubbo.common.bytecode.proxy0
 *    $ jad com.alibaba.dubbo.common.bytecode.proxy0
 *
 * 更多使用方法請參考 Arthas 官方文檔:
 *   https://alibaba.github.io/arthas/quick-start.html
 */
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
    // 方法數組
    public static Method[] methods;
    private InvocationHandler handler;

    public proxy0(InvocationHandler invocationHandler) {
        this.handler = invocationHandler;
    }

    public proxy0() {
    }

    public String sayHello(String string) {
        // 將參數存儲到 Object 數組中
        Object[] arrobject = new Object[]{string};
        // 調用 InvocationHandler 實現類的 invoke 方法得到調用結果
        Object object = this.handler.invoke(this, methods[0], arrobject);
        // 返回調用結果
        return (String)object;
    }

    /** 回聲測試方法 */
    public Object $echo(Object object) {
        Object[] arrobject = new Object[]{object};
        Object object2 = this.handler.invoke(this, methods[1], arrobject);
        return object2;
    }
}

 


 
上述代碼中,有一個handler屬性,這個handler是什么呢?
 
dubbo-demo-consumer項目示例截圖
通過調試得出handler類型為InvokerInvocationHandler,下面就是它真正調用的方法。
 
代理bean(proxy0)調用sayHello方法流程
 
(1)執行InvokerInvocationHandler中的invoke方法
當demoService調用sayHello方法時,即調用的是代理類proxy0中的sayHello方法,當執行到handler.invoke方法時,則進入到InvokerInvocationHandler類中的invoke方法
// method為sayHello,參數為,方法中的參數值
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    // 攔截定義在 Object 類中的方法(未被子類重寫),比如 wait/notify
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    // 當調用的是toString方法時
    if ("toString".equals(methodName) && parameterTypes.length == 0) {
        return invoker.toString();
    }
    // 當調用的是hashCode方法時
    if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
        return invoker.hashCode();
    }
    // 當調用的是equals方法時
    if ("equals".equals(methodName) && parameterTypes.length == 1) {
        return invoker.equals(args[0]);
    }
    // 核心代碼。。。
    return invoker.invoke(new RpcInvocation(method, args)).recreate();
}

通過new RpcInvocation(method, args) 將一些參數、方法、屬性封裝起來
public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String, String> attachments, Invoker<?> invoker) {
    this.methodName = methodName;
    this.parameterTypes = parameterTypes == null ? new Class<?>[0] : parameterTypes;
    this.arguments = arguments == null ? new Object[0] : arguments;
    this.attachments = attachments == null ? new HashMap<String, String>() : attachments;
    this.invoker = invoker;
}

 

recreate()方法直接返回一個result
// exception
public Object recreate() throws Throwable {
    if (exception != null) {
        throw exception;
    }
    return result;
}

 

(2)執行MockClusterInvoker類中的 invoke方法
MockClusterInvoker類是一個與服務降級相關的類
public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;

    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
    if (value.length() == 0 || value.equalsIgnoreCase("false")) {
        //no mock
        // 關鍵部分....
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        if (logger.isWarnEnabled()) {
            logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
        }
        //force:direct mock
        result = doMockInvoke(invocation, null);
    } else {
        //fail-mock
        try {
            result = this.invoker.invoke(invocation);
        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            } else {
                if (logger.isWarnEnabled()) {
                    logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                }
                result = doMockInvoke(invocation, e);
            }
        }
    }
    return result;
}

 

關鍵部分result = this.invoker.invoke(invocation); 
 
(3)執行FailoverClusterInvoker中的invoke方法
this.invoker是一個FailoverClusterInvoker類型的對象,這個類繼承 AbstractClusterInvoker 類,先調用AbstractClusterInvoker  類的invoke方法
public Result invoke(final Invocation invocation) throws RpcException {

    checkWhetherDestroyed();

    LoadBalance loadbalance;

    List<Invoker<T>> invokers = list(invocation);
    // 獲取負載均衡,默認RandomLoadBalance
    if (invokers != null && invokers.size() > 0) {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                                                                                         .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    } else {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

 

當調用時doInvoke,則執行 FailoverClusterInvoker 類的doInvoke方法
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyinvokers = invokers;
    checkInvokers(copyinvokers, invocation);
    int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
            checkWhetherDestroyed();
            copyinvokers = list(invocation);
            // check again
            checkInvokers(copyinvokers, invocation);
        }
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            // 關鍵代碼...
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
                // 日志打印...
            }
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    // 拋出異常...
}

 

接下來調用RegistryDirectory$InvokerDelegate 類的invoke方法,這個類繼承InvokerWrapper,因此我們再來看一下InvokerWrapper這個類的invoke方法
ProtocolFilterWrapper$1這個類
ListenerInvokerWrapper 這個類的invoke方法
 
DubboInvoke中的doInvoke
Dubbo 支持同步和異步兩種調用方式,其中異步調用還可細分為“有返回值”的異步調用和“無返回值”的異步調用。所謂“無返回值”異步調用是指服務消費方只管調用,但不關心調用結果,此時 Dubbo 會直接返回一個空的 RpcResult。若要使用異步特性,需要服務消費方手動進行配置。默認情況下,Dubbo 使用同步調用方式。
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    // 設置 path 和 version 到 attachment 中
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        // 從 clients 數組中獲取 ExchangeClient
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 獲取異步配置
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // isOneway 為 true,表示“單向”通信
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 異步無返回值
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        // 異步有返回值
        } else if (isAsync) {
            ResponseFuture future = currentClient.request(inv, timeout);
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        }
        // 同步調用
        else {
            RpcContext.getContext().setFuture(null);
            // 核心方法,currentClient為ReferenceCountExchangeClient類型
            // 發送請求,得到一個 ResponseFuture 實例,並調用該實例的 get 方法進行等待
            return (Result) currentClient.request(inv, timeout).get();
        }
    } 
    // 拋出異常...省略
}
 
下面開始發送請求了
ReferenceCountExchangeClient 中的request方法
ReferenceCountExchangeClient 內部定義了一個引用計數變量 referenceCount,每當該對象被引用一次 referenceCount 都會進行自增。每當 close 方法被調用時,referenceCount 進行自減。
ReferenceCountExchangeClient 內部僅實現了一個引用計數的功能,其他方法並無復雜邏輯,均是直接調用被裝飾對象的相關方法。所以這里就不多說了,繼續向下分析,這次是 HeaderExchangeClient。
 
HeaderExchangeClient中的request方法
HeaderExchangeClient封裝了一些關於心跳檢測的邏輯
 
HeaderExchangeChannel中的request方法
public ResponseFuture request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " 
                                    + request + ", cause: The channel " + this + " is closed!");
    }
    // 創建 Request 對象
    Request req = new Request();
    req.setVersion("2.0.0");
    // 設置雙向通信標志為 true
    req.setTwoWay(true);
    // 這里的 request 變量類型為 RpcInvocation
    req.setData(request);
    // 創建 DefaultFuture 對象
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try {
        // 調用 NettyClient 的 send 方法發送請求
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    // 返回 DefaultFuture 對象
    return future;
}

 

上面的方法首先定義了一個 Request 對象,然后再將該對象傳給 NettyClient 的 send 方法,進行后續的調用。需要說明的是,NettyClient 中並未實現 send 方法,該方法繼承自父類 AbstractPeer,下面直接分析 AbstractPeer 的代碼。
public void send(Object message) throws RemotingException {
    // 該方法由 AbstractClient 類實現
    send(message, url.getParameter(Constants.SENT_KEY, false));
}

 

AbstractClient 中的send方法
public void send(Object message, boolean sent) throws RemotingException {
    if (send_reconnect && !isConnected()) {
        connect();
    }
    // 獲取 Channel,getChannel 是一個抽象方法,具體由子類實現
    Channel channel = getChannel();
    //TODO Can the value returned by getChannel() be null? need improvement.
    if (channel == null || !channel.isConnected()) {
        throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
    }
    // 繼續向下調用
    channel.send(message, sent);
}

 

默認情況下,Dubbo 使用 Netty 作為底層的通信框架,因此下面我們到 NettyClient 類中看一下 getChannel 方法的實現邏輯。
// 這里的 Channel 全限定名稱為 org.jboss.netty.channel.Channel
private volatile Channel channel;

@Override
protected com.alibaba.dubbo.remoting.Channel getChannel() {
    Channel c = channel;
    if (c == null || !c.isConnected())
        return null;
    // 獲取一個 NettyChannel 類型對象
    return NettyChannel.getOrAddChannel(c, getUrl(), this);
}

NettyChannel 中getOrAddChannel方法

static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
    if (ch == null) {
        return null;
    }
    // 嘗試從集合中獲取 NettyChannel 實例
    NettyChannel ret = channelMap.get(ch);
    if (ret == null) {
        // 如果 ret = null,則創建一個新的 NettyChannel 實例
        NettyChannel nc = new NettyChannel(ch, url, handler);
        if (ch.isConnected()) {
            // 將 <Channel, NettyChannel> 鍵值對存入 channelMap 集合中
            ret = channelMap.putIfAbsent(ch, nc);
        }
        if (ret == null) {
            ret = nc;
        }
    }
    return ret;
}

 

獲取到 NettyChannel 實例后,即可進行后續的調用。下面看一下 NettyChannel 的 send 方法。
public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 發送消息(包含請求和響應消息)
        ChannelFuture future = channel.write(message);
        
        // sent 的值源於 <dubbo:method sent="true/false" /> 中 sent 的配置值,有兩種配置值:
        //   1. true: 等待消息發出,消息發送失敗將拋出異常
        //   2. false: 不等待消息發出,將消息放入 IO 隊列,即刻返回
        // 默認情況下 sent = false;
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 等待消息發出,若在規定時間沒能發出,success 會被置為 false
            success = future.await(timeout);
        }
        Throwable cause = future.getCause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message ...");
    }

    // 若 success 為 false,這里拋出異常
    if (!success) {
        throw new RemotingException(this, "Failed to send message ...");
    }
}

線程棧

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM