來看一下客戶端請求代碼:
1 DemoService demoService = (DemoService) context.getBean("demoService"); // 獲取遠程服務代理 2 String hello = demoService.sayHello("world"); // 執行遠程方法
在8.2 構建客戶端源碼解析中我們看到最終得到的demoService是一個proxy0代理對象。現在來分析第二行代碼。
一 客戶端請求總體流程
//代理發出請求
proxy0.sayHello(String paramString) -->InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args) -->new RpcInvocation(method, args) -->MockClusterInvoker.invoke(Invocation invocation)//服務降級的地方 //ClusterInvoker將多個Invoker偽裝成一個集群版的Invoker -->AbstractClusterInvoker.invoke(final Invocation invocation) //獲取Invokers -->list(Invocation invocation) -->AbstractDirectory.list(Invocation invocation) -->RegistryDirectory.doList(Invocation invocation)//從Map<String, List<Invoker<T>>> methodInvokerMap中獲取key為sayHello的List<Invoker<T>> -->MockInvokersSelector.getNormalInvokers(final List<Invoker<T>> invokers)//對上述的List<Invoker<T>>再進行一次過濾(這里比如說過濾出所有協議為mock的Invoker,如果一個也沒有就全部返回),這就是router的作用 //獲取負載均衡器 -->loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE))//默認為random -->RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation)//異步操作添加invocationID -->FailoverClusterInvoker.doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) //使用負載均衡器選擇一個Invoker出來:RegistryDirectory$InvokerDelegete實例 -->AbstractClusterInvoker.select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) -->doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) -->AbstractLoadBalance.select(List<Invoker<T>> invokers, URL url, Invocation invocation) -->RandomLoadBalance.doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) //執行listener和filter鏈 -->ListenerInvokerWrapper.invoke -->ConsumerContextFilter.invoke(Invoker<?> invoker, Invocation invocation)//設置一些RpcContext屬性,並且設置invocation中的invoker屬性 -->FutureFilter.invoke(Invocation invocation) -->MonitorFilter.invoke(Invocation invocation)//monitor在這里收集數據 -->AbstractInvoker.invoke(Invocation inv)//重新設置了invocation中的invoker屬性和attachment屬性 -->DubboInvoker.doInvoke(final Invocation invocation) //獲取ExchangeClient進行消息的發送 -->ReferenceCountExchangeClient.request(Object request, int timeout) -->HeaderExchangeClient.request(Object request, int timeout) -->HeaderExchangeChannel.request(Object request, int timeout) -->AbstractClient.send(Object message, boolean sent)//NettyClient的父類 -->getChannel()//NettyChannel實例,其內部channel實例=NioClientSocketChannel實例 -->NettyChannel.send(Object message, boolean sent) -->NioClientSocketChannel.write(Object message)//已經是netty的東西了,這里的message=Request實例:最重要的是RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]
總體流程:
- 將請求參數(方法名,方法參數類型,方法參數值,服務名,附加參數)封裝成一個Invocation
- 附加參數中的path:即接口名,將會用於服務端接收請求信息后從exportMap中選取Exporter實例
- 方法名,方法參數類型,方法參數值:將用於JavassistProxyFactory$AbstractProxyInvoker執行對應的方法
- 使用Directory從Map<String, List<Invoker<T>>> methodInvokerMap中獲取key為sayHello(指定方法名)的List<Invoker<T>>
- 使用Router對上述的List<Invoker<T>>再進行一次過濾,得到subList
- 使用LoadBalancer從subList中再獲取一個Invoker,實際上是InvokerDelegete實例
- 使用InvokerDelegete實例執行真正的DubboInvoker的listener和filter鏈,然后執行到真正的DubboInvoker
- DubboInvoker使用NettyClient向服務端發出了請求
二 源碼分析
首先來看proxy0.sayHello
1 public String sayHello(String paramString) { 2 Object[] arrayOfObject = new Object[1]; 3 arrayOfObject[0] = paramString; 4 Object localObject = null; 5 try { 6 localObject = this.handler.invoke(this, DemoService.class.getMethod("sayHello"), arrayOfObject); 7 } catch (Throwable e) { 8 // TODO Auto-generated catch block 9 e.printStackTrace(); 10 } 11 return (String) localObject; 12 }
這里的handler就是InvokerInvocationHandler
1 public class InvokerInvocationHandler implements InvocationHandler { 2 private final Invoker<?> invoker;//MockClusterInvoker實例 3 4 public InvokerInvocationHandler(Invoker<?> handler) { 5 this.invoker = handler; 6 } 7 8 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 9 String methodName = method.getName(); 10 Class<?>[] parameterTypes = method.getParameterTypes(); 11 if (method.getDeclaringClass() == Object.class) { 12 return method.invoke(invoker, args); 13 } 14 if ("toString".equals(methodName) && parameterTypes.length == 0) { 15 return invoker.toString(); 16 } 17 if ("hashCode".equals(methodName) && parameterTypes.length == 0) { 18 return invoker.hashCode(); 19 } 20 if ("equals".equals(methodName) && parameterTypes.length == 1) { 21 return invoker.equals(args[0]); 22 } 23 return invoker.invoke(new RpcInvocation(method, args)).recreate(); 24 } 25 }
首先將請求參數封裝成一個RpcInvocation實例,如下:
-->String methodName=sayHello -->Class<?>[] parameterTypes=[class java.lang.String] -->Object[] arguments=[world] -->Map<String, String> attachments={}
之后使用MockClusterInvoker.invoke(Invocation invocation)進行遠程調用:
1 private final Directory<T> directory;//RegistryDirectory 2 private final Invoker<T> invoker;//FailoverClusterInvoker 3 4 /** 5 * 這里實際上會根據配置的mock參數來做服務降級: 6 * 1 如果沒有配置mock參數或者mock=false,則進行遠程調用; 7 * 2 如果配置了mock=force:return null,則直接返回null,不進行遠程調用; 8 * 3 如果配置了mock=fail:return null,先進行遠程調用,失敗了在進行mock調用。 9 */ 10 public Result invoke(Invocation invocation) throws RpcException { 11 Result result = null; 12 //sayHello.mock->mock->default.mock 13 String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 14 if (value.length() == 0 || value.equalsIgnoreCase("false")) { 15 //no mock 16 result = this.invoker.invoke(invocation); 17 } else if (value.startsWith("force")) { 18 if (logger.isWarnEnabled()) { 19 logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); 20 } 21 //force:direct mock 22 result = doMockInvoke(invocation, null); 23 } else { 24 //fail-mock 25 try { 26 result = this.invoker.invoke(invocation); 27 } catch (RpcException e) { 28 if (e.isBiz()) { 29 throw e; 30 } else { 31 if (logger.isWarnEnabled()) { 32 logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); 33 } 34 result = doMockInvoke(invocation, e); 35 } 36 } 37 } 38 return result; 39 }
注意:這里可以做服務降級,后續會說。
之后調用FailoverClusterInvoker.invoke方法,該方法在其父類AbstractClusterInvoker中,
1 protected final Directory<T> directory;//RegistryDirectory 2 3 public Result invoke(final Invocation invocation) throws RpcException { 4 ... 5 LoadBalance loadbalance; 6 7 List<Invoker<T>> invokers = list(invocation); 8 if (invokers != null && invokers.size() > 0) { 9 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() 10 .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); 11 } else { 12 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); 13 } 14 RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);//異步調用加調用ID 15 return doInvoke(invocation, invokers, loadbalance); 16 } 17 18 protected List<Invoker<T>> list(Invocation invocation) throws RpcException { 19 List<Invoker<T>> invokers = directory.list(invocation); 20 return invokers; 21 }
首先是獲取一個List<Invoker<T>>,之后獲取一個LoadBalance,最后調用doInvoke進行調用。
首先來看通過RegistryDirectory.list(Invocation invocation),該方法在RegistryDirectory的父類AbstractDirectory中:
1 private volatile List<Router> routers; 2 public List<Invoker<T>> list(Invocation invocation) throws RpcException { 3 ... 4 List<Invoker<T>> invokers = doList(invocation); 5 List<Router> localRouters = this.routers; // local reference 6 if (localRouters != null && localRouters.size() > 0) { 7 for (Router router : localRouters) { 8 try { 9 if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) { 10 invokers = router.route(invokers, getConsumerUrl(), invocation); 11 } 12 } catch (Throwable t) { 13 logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); 14 } 15 } 16 } 17 return invokers; 18 }
首先執行doList(invocation)方法獲取出List<Invoker<T>>,之后使用router循環過濾,最后返回過濾后的List<Invoker<T>>。
RegistryDirectory.doList(invocation)
1 public List<Invoker<T>> doList(Invocation invocation) { 2 ... 3 List<Invoker<T>> invokers = null; 4 Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference 5 if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) { 6 String methodName = RpcUtils.getMethodName(invocation); 7 Object[] args = RpcUtils.getArguments(invocation); 8 if (args != null && args.length > 0 && args[0] != null 9 && (args[0] instanceof String || args[0].getClass().isEnum())) { 10 invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根據第一個參數枚舉路由 sayHello.world 11 } 12 if (invokers == null) { 13 invokers = localMethodInvokerMap.get(methodName); 14 } 15 if (invokers == null) { 16 invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); 17 } 18 if (invokers == null) { 19 Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator(); 20 if (iterator.hasNext()) { 21 invokers = iterator.next(); 22 } 23 } 24 } 25 return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers; 26 }
其中Map<String, List<Invoker<T>>> methodInvokerMap在8.2 構建客戶端源碼解析已經初始化好了:
Map<String, List<Invoker<T>>> methodInvokerMap={
sayHello=[provider1的RegistryDirectory$InvokerDelegete實例, provider2的RegistryDirectory$InvokerDelegete實例], *=[provider1的RegistryDirectory$InvokerDelegete實例, provider2的RegistryDirectory$InvokerDelegete實例]}
這里根據方法名sayHello取出兩個RegistryDirectory$InvokerDelegete實例。最后通過Router進行過濾,這里只有一個Router,就是MockInvokersSelector。
1 public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers, 2 URL url, final Invocation invocation) throws RpcException { 3 if (invocation.getAttachments() == null) { 4 return getNormalInvokers(invokers); 5 } else { 6 String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK); 7 if (value == null) 8 return getNormalInvokers(invokers); 9 else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) { 10 return getMockedInvokers(invokers); 11 } 12 } 13 return invokers; 14 } 15 16 private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) { 17 if (!hasMockProviders(invokers)) { 18 return invokers; 19 } else { 20 List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size()); 21 for (Invoker<T> invoker : invokers) { 22 if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) { 23 sInvokers.add(invoker); 24 } 25 } 26 return sInvokers; 27 } 28 }
這里直接返回了。到此就已經選出可以被調用的RegistryDirectory$InvokerDelegete實例子集了。記下來先獲取負載均衡器,默認是RandomLoadBalance。最后執行FailoverClusterInvoker.
doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance):
1 public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { 2 List<Invoker<T>> copyinvokers = invokers; 3 checkInvokers(copyinvokers, invocation); 4 int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;//默認是2+1次 5 if (len <= 0) { 6 len = 1; 7 } 8 // retry loop. 9 RpcException le = null; // last exception. 10 List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. 11 Set<String> providers = new HashSet<String>(len); 12 for (int i = 0; i < len; i++) { 13 //重試時,進行重新選擇,避免重試時invoker列表已發生變化. 14 //注意:如果列表發生了變化,那么invoked判斷會失效,因為invoker示例已經改變 15 if (i > 0) { 16 checkWhetherDestroyed(); 17 copyinvokers = list(invocation); 18 //重新檢查一下 19 checkInvokers(copyinvokers, invocation); 20 } 21 Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); 22 invoked.add(invoker); 23 RpcContext.getContext().setInvokers((List) invoked); 24 try { 25 Result result = invoker.invoke(invocation); 26 ... 27 return result; 28 } catch (RpcException e) { 29 if (e.isBiz()) { // biz exception. 30 throw e; 31 } 32 le = e; 33 } catch (Throwable e) { 34 le = new RpcException(e.getMessage(), e); 35 } finally { 36 providers.add(invoker.getUrl().getAddress()); 37 } 38 } 39 throw new RpcException(le ...); 40 }
首先使用負載均衡器獲取一個RegistryDirectory$InvokerDelegete實例,然后使用選出的RegistryDirectory$InvokerDelegete.invoke進行請求發送。
1 protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { 2 ... 3 Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected); 4 .. 5 return invoker; 6 } 7 8 private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { 9 if (invokers == null || invokers.size() == 0) 10 return null; 11 if (invokers.size() == 1) 12 return invokers.get(0); 13 // 如果只有兩個invoker,並且其中一個已經有至少一個被選過了,退化成輪循 14 if (invokers.size() == 2 && selected != null && selected.size() > 0) { 15 return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0); 16 } 17 Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); 18 19 //如果 selected中包含(優先判斷) 或者 不可用&&availablecheck=true 則重試. 20 if ((selected != null && selected.contains(invoker)) 21 || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { 22 try { 23 Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); 24 ... 25 } catch (Throwable t) { 26 ... 27 } 28 } 29 return invoker; 30 }
RandomLoadBalance.doSelect
1 private final Random random = new Random(); 2 3 protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { 4 int length = invokers.size(); // 總個數 5 ...//權重計算 6 // 如果權重相同或權重為0則均等隨機 7 return invokers.get(random.nextInt(length)); 8 }
最后來看RegistryDirectory$InvokerDelegete.invoke,該方法實際在其父類InvokerWrapper中:
1 private final Invoker<T> invoker;//ListenerInvokerWrapper 2 3 public Result invoke(Invocation invocation) throws RpcException { 4 return invoker.invoke(invocation); 5 }
ListenerInvokerWrapper.invoke
1 private final Invoker<T> invoker;//ProtocolFilterWrapper$Invoker 2 3 public Result invoke(Invocation invocation) throws RpcException { 4 return invoker.invoke(invocation); 5 }
之后就會執行一系列的filter,這些filter后續會講,現在直接執行到DubboInvoker.invoke,實際上該方法在其父類AbstractInvoker中,AbstractInvoker又調用了DubboInvoker.doInvoke:
1 private final ExchangeClient[] clients; 2 3 protected Result doInvoke(final Invocation invocation) throws Throwable { 4 RpcInvocation inv = (RpcInvocation) invocation; 5 final String methodName = RpcUtils.getMethodName(invocation); 6 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); 7 inv.setAttachment(Constants.VERSION_KEY, version); 8 9 ExchangeClient currentClient; 10 if (clients.length == 1) { 11 currentClient = clients[0];//單一長連接。默認 12 } else { 13 currentClient = clients[index.getAndIncrement() % clients.length]; 14 } 15 try { 16 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);//是否異步 17 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//是否沒有返回值 18 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 19 if (isOneway) { 20 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); 21 currentClient.send(inv, isSent); 22 RpcContext.getContext().setFuture(null); 23 return new RpcResult(); 24 } else if (isAsync) { 25 ResponseFuture future = currentClient.request(inv, timeout); 26 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); 27 return new RpcResult(); 28 } else { 29 RpcContext.getContext().setFuture(null); 30 return (Result) currentClient.request(inv, timeout).get(); 31 } 32 } catch (TimeoutException e) { 33 throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); 34 } catch (RemotingException e) { 35 throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); 36 } 37 }
其中ExchangeClient[] clients在8.2 構建客戶端源碼解析已經被初始化好了:
1 ExchangeClient[] clients = [ReferenceCountExchangeClient實例]//如果設置了多條連接,此處有多個client
ReferenceCountExchangeClient.request
1 private ExchangeClient client;//HeaderExchangeClient 2 3 public ResponseFuture request(Object request, int timeout) throws RemotingException { 4 return client.request(request, timeout); 5 }
HeaderExchangeClient.request
1 private final ExchangeChannel channel;//HeaderExchangeChannel 2 3 public ResponseFuture request(Object request, int timeout) throws RemotingException { 4 return channel.request(request, timeout); 5 }
HeaderExchangeChannel.request
1 private final Channel channel;//NettyClient 2 3 public ResponseFuture request(Object request, int timeout) throws RemotingException { 4 if (closed) { 5 throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); 6 } 7 // create request. 8 Request req = new Request(); 9 req.setVersion("2.0.0"); 10 req.setTwoWay(true); 11 req.setData(request); 12 DefaultFuture future = new DefaultFuture(channel, req, timeout); 13 try { 14 channel.send(req); 15 } catch (RemotingException e) { 16 future.cancel(); 17 throw e; 18 } 19 return future; 20 }
上邊的channel是NettyClient實例,這里的send實際上是調用其父類AbstractClient的父類AbstractPeer,AbstractPeer調用AbstractClient.send:
1 public void send(Object message, boolean sent) throws RemotingException { 2 if (send_reconnect && !isConnected()) { 3 connect(); 4 } 5 Channel channel = getChannel();//NettyChannel 6 //TODO getChannel返回的狀態是否包含null需要改進 7 if (channel == null || !channel.isConnected()) { 8 throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl()); 9 } 10 channel.send(message, sent); 11 }
NettyChannel.send
1 private final org.jboss.netty.channel.Channel channel;//NioClientSocketChannel 2 3 public void send(Object message, boolean sent) throws RemotingException { 4 super.send(message, sent); 5 6 boolean success = true; 7 int timeout = 0; 8 try { 9 ChannelFuture future = channel.write(message); 10 if (sent) { 11 timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 12 success = future.await(timeout); 13 } 14 Throwable cause = future.getCause(); 15 if (cause != null) { 16 throw cause; 17 } 18 } catch (Throwable e) { 19 throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); 20 } 21 22 if (!success) { 23 throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() 24 + "in timeout(" + timeout + "ms) limit"); 25 } 26 }
這里就執行到了netty內部,通過netty自己的NioClientSocketChannel將消息發送給服務端。(這里發送之前有編碼行為,后續會講)