執行過程如下圖所示
代理bean方法調用,即代理bean方法調用
我們知道demoService的bean是一個代理類,並且這個代理類繼承com.alibaba.dubbo.common.bytecode.Proxy這個類,代理類中sayHello方法內部代碼如下:
(來源於Dubbo官網)
/** * Arthas 反編譯步驟: * 1. 啟動 Arthas * java -jar arthas-boot.jar * * 2. 輸入編號選擇進程 * Arthas 啟動后,會打印 Java 應用進程列表,如下: * [1]: 11232 org.jetbrains.jps.cmdline.Launcher * [2]: 22370 org.jetbrains.jps.cmdline.Launcher * [3]: 22371 com.alibaba.dubbo.demo.consumer.Consumer * [4]: 22362 com.alibaba.dubbo.demo.provider.Provider * [5]: 2074 org.apache.zookeeper.server.quorum.QuorumPeerMain * 這里輸入編號 3,讓 Arthas 關聯到啟動類為 com.....Consumer 的 Java 進程上 * * 3. 由於 Demo 項目中只有一個服務接口,因此此接口的代理類類名為 proxy0,此時使用 sc 命令搜索這個類名。 * $ sc *.proxy0 * com.alibaba.dubbo.common.bytecode.proxy0 * * 4. 使用 jad 命令反編譯 com.alibaba.dubbo.common.bytecode.proxy0 * $ jad com.alibaba.dubbo.common.bytecode.proxy0 * * 更多使用方法請參考 Arthas 官方文檔: * https://alibaba.github.io/arthas/quick-start.html */ public class proxy0 implements ClassGenerator.DC, EchoService, DemoService { // 方法數組 public static Method[] methods; private InvocationHandler handler; public proxy0(InvocationHandler invocationHandler) { this.handler = invocationHandler; } public proxy0() { } public String sayHello(String string) { // 將參數存儲到 Object 數組中 Object[] arrobject = new Object[]{string}; // 調用 InvocationHandler 實現類的 invoke 方法得到調用結果 Object object = this.handler.invoke(this, methods[0], arrobject); // 返回調用結果 return (String)object; } /** 回聲測試方法 */ public Object $echo(Object object) { Object[] arrobject = new Object[]{object}; Object object2 = this.handler.invoke(this, methods[1], arrobject); return object2; } }
上述代碼中,有一個handler屬性,這個handler是什么呢?
dubbo-demo-consumer項目示例截圖
通過調試得出handler類型為InvokerInvocationHandler,下面就是它真正調用的方法。
代理bean(proxy0)調用sayHello方法流程
(1)執行InvokerInvocationHandler中的invoke方法
當demoService調用sayHello方法時,即調用的是代理類proxy0中的sayHello方法,當執行到handler.invoke方法時,則進入到InvokerInvocationHandler類中的invoke方法
// method為sayHello,參數為,方法中的參數值 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); // 攔截定義在 Object 類中的方法(未被子類重寫),比如 wait/notify if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } // 當調用的是toString方法時 if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } // 當調用的是hashCode方法時 if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } // 當調用的是equals方法時 if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } // 核心代碼。。。 return invoker.invoke(new RpcInvocation(method, args)).recreate(); }

通過new RpcInvocation(method, args) 將一些參數、方法、屬性封裝起來
public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String, String> attachments, Invoker<?> invoker) { this.methodName = methodName; this.parameterTypes = parameterTypes == null ? new Class<?>[0] : parameterTypes; this.arguments = arguments == null ? new Object[0] : arguments; this.attachments = attachments == null ? new HashMap<String, String>() : attachments; this.invoker = invoker; }
recreate()方法直接返回一個result
// exception public Object recreate() throws Throwable { if (exception != null) { throw exception; } return result; }
(2)執行MockClusterInvoker類中的
invoke方法
MockClusterInvoker類是一個與服務降級相關的類
public Result invoke(Invocation invocation) throws RpcException { Result result = null; String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")) { //no mock // 關鍵部分.... result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); } //force:direct mock result = doMockInvoke(invocation, null); } else { //fail-mock try { result = this.invoker.invoke(invocation); } catch (RpcException e) { if (e.isBiz()) { throw e; } else { if (logger.isWarnEnabled()) { logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); } result = doMockInvoke(invocation, e); } } } return result; }
關鍵部分result = this.invoker.invoke(invocation);
(3)執行FailoverClusterInvoker中的invoke方法
this.invoker是一個FailoverClusterInvoker類型的對象,這個類繼承 AbstractClusterInvoker 類,先調用AbstractClusterInvoker
類的invoke方法
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation); // 獲取負載均衡,默認RandomLoadBalance if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
當調用時doInvoke,則執行 FailoverClusterInvoker 類的doInvoke方法
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); // check again checkInvokers(copyinvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { // 關鍵代碼... Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { // 日志打印... } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } // 拋出異常... }

接下來調用RegistryDirectory$InvokerDelegate 類的invoke方法,這個類繼承InvokerWrapper,因此我們再來看一下InvokerWrapper這個類的invoke方法
ProtocolFilterWrapper$1這個類
ListenerInvokerWrapper 這個類的invoke方法
DubboInvoke中的doInvoke
Dubbo 支持同步和異步兩種調用方式,其中異步調用還可細分為“有返回值”的異步調用和“無返回值”的異步調用。所謂“無返回值”異步調用是指服務消費方只管調用,但不關心調用結果,此時 Dubbo 會直接返回一個空的 RpcResult。若要使用異步特性,需要服務消費方手動進行配置。默認情況下,Dubbo 使用同步調用方式。
protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); // 設置 path 和 version 到 attachment 中 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { // 從 clients 數組中獲取 ExchangeClient currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 獲取異步配置 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // isOneway 為 true,表示“單向”通信 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 異步無返回值 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); // 異步有返回值 } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } // 同步調用 else { RpcContext.getContext().setFuture(null); // 核心方法,currentClient為ReferenceCountExchangeClient類型 // 發送請求,得到一個 ResponseFuture 實例,並調用該實例的 get 方法進行等待 return (Result) currentClient.request(inv, timeout).get(); } } // 拋出異常...省略 }
下面開始發送請求了
ReferenceCountExchangeClient 中的request方法
ReferenceCountExchangeClient 內部定義了一個引用計數變量 referenceCount,每當該對象被引用一次 referenceCount 都會進行自增。每當 close 方法被調用時,referenceCount 進行自減。
ReferenceCountExchangeClient 內部僅實現了一個引用計數的功能,其他方法並無復雜邏輯,均是直接調用被裝飾對象的相關方法。所以這里就不多說了,繼續向下分析,這次是 HeaderExchangeClient。
HeaderExchangeClient中的request方法
HeaderExchangeClient封裝了一些關於心跳檢測的邏輯
HeaderExchangeChannel中的request方法
public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // 創建 Request 對象 Request req = new Request(); req.setVersion("2.0.0"); // 設置雙向通信標志為 true req.setTwoWay(true); // 這里的 request 變量類型為 RpcInvocation req.setData(request); // 創建 DefaultFuture 對象 DefaultFuture future = new DefaultFuture(channel, req, timeout); try { // 調用 NettyClient 的 send 方法發送請求 channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } // 返回 DefaultFuture 對象 return future; }
上面的方法首先定義了一個 Request 對象,然后再將該對象傳給 NettyClient 的 send 方法,進行后續的調用。需要說明的是,NettyClient 中並未實現 send 方法,該方法繼承自父類 AbstractPeer,下面直接分析 AbstractPeer 的代碼。
public void send(Object message) throws RemotingException { // 該方法由 AbstractClient 類實現 send(message, url.getParameter(Constants.SENT_KEY, false)); }
AbstractClient 中的send方法
public void send(Object message, boolean sent) throws RemotingException { if (send_reconnect && !isConnected()) { connect(); } // 獲取 Channel,getChannel 是一個抽象方法,具體由子類實現 Channel channel = getChannel(); //TODO Can the value returned by getChannel() be null? need improvement. if (channel == null || !channel.isConnected()) { throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl()); } // 繼續向下調用 channel.send(message, sent); }
默認情況下,Dubbo 使用 Netty 作為底層的通信框架,因此下面我們到 NettyClient 類中看一下 getChannel 方法的實現邏輯。
// 這里的 Channel 全限定名稱為 org.jboss.netty.channel.Channel private volatile Channel channel; @Override protected com.alibaba.dubbo.remoting.Channel getChannel() { Channel c = channel; if (c == null || !c.isConnected()) return null; // 獲取一個 NettyChannel 類型對象 return NettyChannel.getOrAddChannel(c, getUrl(), this); }
NettyChannel 中getOrAddChannel方法
static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) { if (ch == null) { return null; } // 嘗試從集合中獲取 NettyChannel 實例 NettyChannel ret = channelMap.get(ch); if (ret == null) { // 如果 ret = null,則創建一個新的 NettyChannel 實例 NettyChannel nc = new NettyChannel(ch, url, handler); if (ch.isConnected()) { // 將 <Channel, NettyChannel> 鍵值對存入 channelMap 集合中 ret = channelMap.putIfAbsent(ch, nc); } if (ret == null) { ret = nc; } } return ret; }
獲取到 NettyChannel 實例后,即可進行后續的調用。下面看一下 NettyChannel 的 send 方法。
public void send(Object message, boolean sent) throws RemotingException { super.send(message, sent); boolean success = true; int timeout = 0; try { // 發送消息(包含請求和響應消息) ChannelFuture future = channel.write(message); // sent 的值源於 <dubbo:method sent="true/false" /> 中 sent 的配置值,有兩種配置值: // 1. true: 等待消息發出,消息發送失敗將拋出異常 // 2. false: 不等待消息發出,將消息放入 IO 隊列,即刻返回 // 默認情況下 sent = false; if (sent) { timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 等待消息發出,若在規定時間沒能發出,success 會被置為 false success = future.await(timeout); } Throwable cause = future.getCause(); if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message ..."); } // 若 success 為 false,這里拋出異常 if (!success) { throw new RemotingException(this, "Failed to send message ..."); } }
線程棧
