上篇dubbo源碼解析(二)中說到創建代理時會通過refprotocol.refer(interfaceClass, urls.get(0))先創建一個invoker對象出來
以DubboProtocol為例
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
執行refer時,會創建一個DubboInvoker對象返回
我們再看DubboInvoker這個類
這個類繼承自AbstractInvoker,而AbstractInvoker這個類實現了Invoker接口,也就實現了,invoke方法
方法如下:
public Result invoke(Invocation inv) throws RpcException { if(destroyed) { throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + " is DESTROYED, can not be invoked any more!"); } RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker(this); if (attachment != null && attachment.size() > 0) { invocation.addAttachmentsIfAbsent(attachment); } Map<String, String> context = RpcContext.getContext().getAttachments(); if (context != null) { invocation.addAttachmentsIfAbsent(context); } if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){ invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { return doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception Throwable te = e.getTargetException(); if (te == null) { return new RpcResult(e); } else { if (te instanceof RpcException) { ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } return new RpcResult(te); } } catch (RpcException e) { if (e.isBiz()) { return new RpcResult(e); } else { throw e; } } catch (Throwable e) { return new RpcResult(e); } }
主要做了三件事
1. 做了一些初始化操作
2. 調用了抽象方法 doInvoke(由子類實現)
3. 捕獲了 doInvoke方法的異常,將結果返回
我們接着看一下DubboInvoker的doInvoke方法
protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
判斷與遠程交互的客戶端,如果是一個,直接用,如果是多個則輪訓
判斷接口是單方向的,直接向遠端發送請求send(),如果不是,則判斷是不是異步的,如果是異步調用將future放到context里,如果是同步則等接口執行完之后get出結果
