9.1 客戶端發起請求源碼


來看一下客戶端請求代碼:

1         DemoService demoService = (DemoService) context.getBean("demoService"); // 獲取遠程服務代理
2         String hello = demoService.sayHello("world"); // 執行遠程方法

8.2 構建客戶端源碼解析中我們看到最終得到的demoService是一個proxy0代理對象。現在來分析第二行代碼。

一 客戶端請求總體流程

//代理發出請求
proxy0.sayHello(String paramString) -->InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args) -->new RpcInvocation(method, args) -->MockClusterInvoker.invoke(Invocation invocation)//服務降級的地方 //ClusterInvoker將多個Invoker偽裝成一個集群版的Invoker -->AbstractClusterInvoker.invoke(final Invocation invocation) //獲取Invokers -->list(Invocation invocation) -->AbstractDirectory.list(Invocation invocation) -->RegistryDirectory.doList(Invocation invocation)//從Map<String, List<Invoker<T>>> methodInvokerMap中獲取key為sayHello的List<Invoker<T>> -->MockInvokersSelector.getNormalInvokers(final List<Invoker<T>> invokers)//對上述的List<Invoker<T>>再進行一次過濾(這里比如說過濾出所有協議為mock的Invoker,如果一個也沒有就全部返回),這就是router的作用 //獲取負載均衡器 -->loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE))//默認為random -->RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation)//異步操作添加invocationID -->FailoverClusterInvoker.doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) //使用負載均衡器選擇一個Invoker出來:RegistryDirectory$InvokerDelegete實例 -->AbstractClusterInvoker.select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) -->doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) -->AbstractLoadBalance.select(List<Invoker<T>> invokers, URL url, Invocation invocation) -->RandomLoadBalance.doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) //執行listener和filter鏈 -->ListenerInvokerWrapper.invoke -->ConsumerContextFilter.invoke(Invoker<?> invoker, Invocation invocation)//設置一些RpcContext屬性,並且設置invocation中的invoker屬性 -->FutureFilter.invoke(Invocation invocation) -->MonitorFilter.invoke(Invocation invocation)//monitor在這里收集數據 -->AbstractInvoker.invoke(Invocation inv)//重新設置了invocation中的invoker屬性和attachment屬性 -->DubboInvoker.doInvoke(final Invocation invocation) //獲取ExchangeClient進行消息的發送 -->ReferenceCountExchangeClient.request(Object request, int timeout) -->HeaderExchangeClient.request(Object request, int timeout) -->HeaderExchangeChannel.request(Object request, int timeout) -->AbstractClient.send(Object message, boolean sent)//NettyClient的父類 -->getChannel()//NettyChannel實例,其內部channel實例=NioClientSocketChannel實例 -->NettyChannel.send(Object message, boolean sent) -->NioClientSocketChannel.write(Object message)//已經是netty的東西了,這里的message=Request實例:最重要的是RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]

總體流程:

  • 將請求參數(方法名,方法參數類型,方法參數值,服務名,附加參數)封裝成一個Invocation
    • 附加參數中的path:即接口名,將會用於服務端接收請求信息后從exportMap中選取Exporter實例
    • 方法名,方法參數類型,方法參數值:將用於JavassistProxyFactory$AbstractProxyInvoker執行對應的方法
  • 使用Directory從Map<String, List<Invoker<T>>> methodInvokerMap中獲取key為sayHello(指定方法名)的List<Invoker<T>>
  • 使用Router對上述的List<Invoker<T>>再進行一次過濾,得到subList
  • 使用LoadBalancer從subList中再獲取一個Invoker,實際上是InvokerDelegete實例
  • 使用InvokerDelegete實例執行真正的DubboInvoker的listener和filter鏈,然后執行到真正的DubboInvoker
  • DubboInvoker使用NettyClient向服務端發出了請求

二 源碼分析

首先來看proxy0.sayHello

 1     public String sayHello(String paramString) {
 2         Object[] arrayOfObject = new Object[1];
 3         arrayOfObject[0] = paramString;
 4         Object localObject = null;
 5         try {
 6             localObject = this.handler.invoke(this, DemoService.class.getMethod("sayHello"), arrayOfObject);
 7         } catch (Throwable e) {
 8             // TODO Auto-generated catch block
 9             e.printStackTrace();
10         }
11         return (String) localObject;
12     }

這里的handler就是InvokerInvocationHandler

 1 public class InvokerInvocationHandler implements InvocationHandler {
 2     private final Invoker<?> invoker;//MockClusterInvoker實例
 3 
 4     public InvokerInvocationHandler(Invoker<?> handler) {
 5         this.invoker = handler;
 6     }
 7 
 8     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
 9         String methodName = method.getName();
10         Class<?>[] parameterTypes = method.getParameterTypes();
11         if (method.getDeclaringClass() == Object.class) {
12             return method.invoke(invoker, args);
13         }
14         if ("toString".equals(methodName) && parameterTypes.length == 0) {
15             return invoker.toString();
16         }
17         if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
18             return invoker.hashCode();
19         }
20         if ("equals".equals(methodName) && parameterTypes.length == 1) {
21             return invoker.equals(args[0]);
22         }
23         return invoker.invoke(new RpcInvocation(method, args)).recreate();
24     }
25 }

首先將請求參數封裝成一個RpcInvocation實例,如下:

-->String methodName=sayHello
-->Class<?>[] parameterTypes=[class java.lang.String]
-->Object[] arguments=[world]
-->Map<String, String> attachments={}

之后使用MockClusterInvoker.invoke(Invocation invocation)進行遠程調用:

 1     private final Directory<T> directory;//RegistryDirectory
 2     private final Invoker<T> invoker;//FailoverClusterInvoker
 3 
 4     /**
 5      * 這里實際上會根據配置的mock參數來做服務降級 6      * 1 如果沒有配置mock參數或者mock=false,則進行遠程調用;
 7      * 2 如果配置了mock=force:return null,則直接返回null,不進行遠程調用;
 8      * 3 如果配置了mock=fail:return null,先進行遠程調用,失敗了在進行mock調用。
 9      */
10     public Result invoke(Invocation invocation) throws RpcException {
11         Result result = null;
12         //sayHello.mock->mock->default.mock
13         String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
14         if (value.length() == 0 || value.equalsIgnoreCase("false")) {
15             //no mock
16             result = this.invoker.invoke(invocation);
17         } else if (value.startsWith("force")) {
18             if (logger.isWarnEnabled()) {
19                 logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
20             }
21             //force:direct mock
22             result = doMockInvoke(invocation, null);
23         } else {
24             //fail-mock
25             try {
26                 result = this.invoker.invoke(invocation);
27             } catch (RpcException e) {
28                 if (e.isBiz()) {
29                     throw e;
30                 } else {
31                     if (logger.isWarnEnabled()) {
32                         logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
33                     }
34                     result = doMockInvoke(invocation, e);
35                 }
36             }
37         }
38         return result;
39     }

注意:這里可以做服務降級,后續會說。

之后調用FailoverClusterInvoker.invoke方法,該方法在其父類AbstractClusterInvoker中,

 1     protected final Directory<T> directory;//RegistryDirectory    
 2     
 3     public Result invoke(final Invocation invocation) throws RpcException {
 4         ...
 5         LoadBalance loadbalance;
 6 
 7         List<Invoker<T>> invokers = list(invocation);
 8         if (invokers != null && invokers.size() > 0) {
 9             loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
10                     .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
11         } else {
12             loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
13         }
14         RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);//異步調用加調用ID
15         return doInvoke(invocation, invokers, loadbalance);
16     }
17 
18     protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
19         List<Invoker<T>> invokers = directory.list(invocation);
20         return invokers;
21     }

首先是獲取一個List<Invoker<T>>,之后獲取一個LoadBalance,最后調用doInvoke進行調用。

首先來看通過RegistryDirectory.list(Invocation invocation),該方法在RegistryDirectory的父類AbstractDirectory中:

 1     private volatile List<Router> routers;
 2     public List<Invoker<T>> list(Invocation invocation) throws RpcException {
 3         ...
 4         List<Invoker<T>> invokers = doList(invocation);
 5         List<Router> localRouters = this.routers; // local reference
 6         if (localRouters != null && localRouters.size() > 0) {
 7             for (Router router : localRouters) {
 8                 try {
 9                     if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
10                         invokers = router.route(invokers, getConsumerUrl(), invocation);
11                     }
12                 } catch (Throwable t) {
13                     logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
14                 }
15             }
16         }
17         return invokers;
18     }

首先執行doList(invocation)方法獲取出List<Invoker<T>>,之后使用router循環過濾,最后返回過濾后的List<Invoker<T>>。

RegistryDirectory.doList(invocation)

 1     public List<Invoker<T>> doList(Invocation invocation) {
 2         ...
 3         List<Invoker<T>> invokers = null;
 4         Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
 5         if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
 6             String methodName = RpcUtils.getMethodName(invocation);
 7             Object[] args = RpcUtils.getArguments(invocation);
 8             if (args != null && args.length > 0 && args[0] != null
 9                     && (args[0] instanceof String || args[0].getClass().isEnum())) {
10                 invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根據第一個參數枚舉路由 sayHello.world
11             }
12             if (invokers == null) {
13                 invokers = localMethodInvokerMap.get(methodName);
14             }
15             if (invokers == null) {
16                 invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
17             }
18             if (invokers == null) {
19                 Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
20                 if (iterator.hasNext()) {
21                     invokers = iterator.next();
22                 }
23             }
24         }
25         return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
26     }

其中Map<String, List<Invoker<T>>> methodInvokerMap在8.2 構建客戶端源碼解析已經初始化好了:

Map<String, List<Invoker<T>>> methodInvokerMap={
sayHello=[provider1的RegistryDirectory$InvokerDelegete實例, provider2的RegistryDirectory$InvokerDelegete實例], *=[provider1的RegistryDirectory$InvokerDelegete實例, provider2的RegistryDirectory$InvokerDelegete實例]}

這里根據方法名sayHello取出兩個RegistryDirectory$InvokerDelegete實例。最后通過Router進行過濾,這里只有一個Router,就是MockInvokersSelector

 1     public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,
 2                                       URL url, final Invocation invocation) throws RpcException {
 3         if (invocation.getAttachments() == null) {
 4             return getNormalInvokers(invokers);
 5         } else {
 6             String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
 7             if (value == null)
 8                 return getNormalInvokers(invokers);
 9             else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
10                 return getMockedInvokers(invokers);
11             }
12         }
13         return invokers;
14     }
15 
16     private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) {
17         if (!hasMockProviders(invokers)) {
18             return invokers;
19         } else {
20             List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size());
21             for (Invoker<T> invoker : invokers) {
22                 if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
23                     sInvokers.add(invoker);
24                 }
25             }
26             return sInvokers;
27         }
28     }

這里直接返回了。到此就已經選出可以被調用的RegistryDirectory$InvokerDelegete實例子集了。記下來先獲取負載均衡器,默認是RandomLoadBalance。最后執行FailoverClusterInvoker.

doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance):

 1     public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
 2         List<Invoker<T>> copyinvokers = invokers;
 3         checkInvokers(copyinvokers, invocation);
 4         int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;//默認是2+1次
 5         if (len <= 0) {
 6             len = 1;
 7         }
 8         // retry loop.
 9         RpcException le = null; // last exception.
10         List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
11         Set<String> providers = new HashSet<String>(len);
12         for (int i = 0; i < len; i++) {
13             //重試時,進行重新選擇,避免重試時invoker列表已發生變化.
14             //注意:如果列表發生了變化,那么invoked判斷會失效,因為invoker示例已經改變
15             if (i > 0) {
16                 checkWhetherDestroyed();
17                 copyinvokers = list(invocation);
18                 //重新檢查一下
19                 checkInvokers(copyinvokers, invocation);
20             }
21             Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
22             invoked.add(invoker);
23             RpcContext.getContext().setInvokers((List) invoked);
24             try {
25                 Result result = invoker.invoke(invocation);
26                 ...
27                 return result;
28             } catch (RpcException e) {
29                 if (e.isBiz()) { // biz exception.
30                     throw e;
31                 }
32                 le = e;
33             } catch (Throwable e) {
34                 le = new RpcException(e.getMessage(), e);
35             } finally {
36                 providers.add(invoker.getUrl().getAddress());
37             }
38         }
39         throw new RpcException(le ...);
40     }

首先使用負載均衡器獲取一個RegistryDirectory$InvokerDelegete實例,然后使用選出的RegistryDirectory$InvokerDelegete.invoke進行請求發送。

 1     protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
 2         ...
 3         Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
 4         ..
 5         return invoker;
 6     }
 7 
 8     private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
 9         if (invokers == null || invokers.size() == 0)
10             return null;
11         if (invokers.size() == 1)
12             return invokers.get(0);
13         // 如果只有兩個invoker,並且其中一個已經有至少一個被選過了,退化成輪循
14         if (invokers.size() == 2 && selected != null && selected.size() > 0) {
15             return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
16         }
17         Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
18 
19         //如果 selected中包含(優先判斷) 或者 不可用&&availablecheck=true 則重試.
20         if ((selected != null && selected.contains(invoker))
21                 || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
22             try {
23                 Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
24                 ...
25             } catch (Throwable t) {
26                ...
27             }
28         }
29         return invoker;
30     }

RandomLoadBalance.doSelect

1     private final Random random = new Random();
2 
3     protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
4         int length = invokers.size(); // 總個數
5         ...//權重計算
6         // 如果權重相同或權重為0則均等隨機
7         return invokers.get(random.nextInt(length));
8     }

最后來看RegistryDirectory$InvokerDelegete.invoke,該方法實際在其父類InvokerWrapper中:

1     private final Invoker<T> invoker;//ListenerInvokerWrapper
2 
3     public Result invoke(Invocation invocation) throws RpcException {
4         return invoker.invoke(invocation);
5     }

ListenerInvokerWrapper.invoke

1     private final Invoker<T> invoker;//ProtocolFilterWrapper$Invoker
2 
3     public Result invoke(Invocation invocation) throws RpcException {
4         return invoker.invoke(invocation);
5     }

之后就會執行一系列的filter,這些filter后續會講,現在直接執行到DubboInvoker.invoke,實際上該方法在其父類AbstractInvoker中,AbstractInvoker又調用了DubboInvoker.doInvoke:

 1     private final ExchangeClient[] clients;
 2 
 3     protected Result doInvoke(final Invocation invocation) throws Throwable {
 4         RpcInvocation inv = (RpcInvocation) invocation;
 5         final String methodName = RpcUtils.getMethodName(invocation);
 6         inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
 7         inv.setAttachment(Constants.VERSION_KEY, version);
 8 
 9         ExchangeClient currentClient;
10         if (clients.length == 1) {
11             currentClient = clients[0];//單一長連接。默認
12         } else {
13             currentClient = clients[index.getAndIncrement() % clients.length];
14         }
15         try {
16             boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);//是否異步
17             boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//是否沒有返回值
18             int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
19             if (isOneway) {
20                 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
21                 currentClient.send(inv, isSent);
22                 RpcContext.getContext().setFuture(null);
23                 return new RpcResult();
24             } else if (isAsync) {
25                 ResponseFuture future = currentClient.request(inv, timeout);
26                 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
27                 return new RpcResult();
28             } else {
29                 RpcContext.getContext().setFuture(null);
30                 return (Result) currentClient.request(inv, timeout).get();
31             }
32         } catch (TimeoutException e) {
33             throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
34         } catch (RemotingException e) {
35             throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
36         }
37     }

其中ExchangeClient[] clients在8.2 構建客戶端源碼解析已經被初始化好了:

1 ExchangeClient[] clients = [ReferenceCountExchangeClient實例]//如果設置了多條連接,此處有多個client

ReferenceCountExchangeClient.request

1     private ExchangeClient client;//HeaderExchangeClient
2 
3     public ResponseFuture request(Object request, int timeout) throws RemotingException {
4         return client.request(request, timeout);
5     }

HeaderExchangeClient.request

1     private final ExchangeChannel channel;//HeaderExchangeChannel
2 
3     public ResponseFuture request(Object request, int timeout) throws RemotingException {
4         return channel.request(request, timeout);
5     }

HeaderExchangeChannel.request

 1     private final Channel channel;//NettyClient
 2 
 3     public ResponseFuture request(Object request, int timeout) throws RemotingException {
 4         if (closed) {
 5             throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
 6         }
 7         // create request.
 8         Request req = new Request();
 9         req.setVersion("2.0.0");
10         req.setTwoWay(true);
11         req.setData(request);
12         DefaultFuture future = new DefaultFuture(channel, req, timeout);
13         try {
14  channel.send(req);
15         } catch (RemotingException e) {
16             future.cancel();
17             throw e;
18         }
19         return future;
20     }

上邊的channel是NettyClient實例,這里的send實際上是調用其父類AbstractClient的父類AbstractPeer,AbstractPeer調用AbstractClient.send:

 1     public void send(Object message, boolean sent) throws RemotingException {
 2         if (send_reconnect && !isConnected()) {
 3             connect();
 4         }
 5         Channel channel = getChannel();//NettyChannel
 6         //TODO getChannel返回的狀態是否包含null需要改進
 7         if (channel == null || !channel.isConnected()) {
 8             throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
 9         }
10         channel.send(message, sent);
11     }

NettyChannel.send

 1     private final org.jboss.netty.channel.Channel channel;//NioClientSocketChannel
 2 
 3     public void send(Object message, boolean sent) throws RemotingException {
 4         super.send(message, sent);
 5 
 6         boolean success = true;
 7         int timeout = 0;
 8         try {
 9             ChannelFuture future = channel.write(message);
10             if (sent) {
11                 timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
12                 success = future.await(timeout);
13             }
14             Throwable cause = future.getCause();
15             if (cause != null) {
16                 throw cause;
17             }
18         } catch (Throwable e) {
19             throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
20         }
21 
22         if (!success) {
23             throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
24                     + "in timeout(" + timeout + "ms) limit");
25         }
26     }

這里就執行到了netty內部,通過netty自己的NioClientSocketChannel將消息發送給服務端。(這里發送之前有編碼行為,后續會講)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM