9.4 dubbo異步調用原理


9.1 客戶端發起請求源碼9.2 服務端接收請求消息並發送響應消息源碼9.3 客戶端接收響應信息(異步轉同步的實現) 分析了dubbo同步調用的源碼,現在來看一下dubbo異步調用。

一、使用方式

服務提供方不變,調用方代碼如下:

1     <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
2         <dubbo:method name="sayHello" async="true" timeout="60000"/>
3         <dubbo:method name="sayBye" async="true" timeout="60000"/>
4     </dubbo:reference>

配置里添加<dubbo:method name="xxx" async="true"/>,表示單個方法xxx使用異步方式;如果demoService下的所有方法都使用異步,直接配置為<dubbo:reference async="true"/>。

 1     public static void main(String[] args) throws Exception {
 2         //Prevent to get IPV6 address,this way only work in debug mode
 3         //But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not
 4         System.setProperty("java.net.preferIPv4Stack", "true");
 5 
 6         asyncFuture2();
 7     }
 8 
 9     public static void asyncFuture1() throws ExecutionException, InterruptedException {
10         ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
11         context.start();
12         DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
13 
14         long start = System.currentTimeMillis();
15 
16         demoService.sayHello("zhangsan");
17         Future<String> helloFuture = RpcContext.getContext().getFuture();
18 
19         demoService.sayBye("lisi");
20         Future<String> byeFuture = RpcContext.getContext().getFuture();
21 
22         final String helloStr = helloFuture.get();//消耗5s
23         final String byeStr = byeFuture.get();//消耗8s
24 
25         System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//總消耗8s
26     }
27 
28     public static void asyncFuture2() throws ExecutionException, InterruptedException {
29         ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
30         context.start();
31         DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
32 
33         long start = System.currentTimeMillis();
34 
35         Future<String> helloFuture = RpcContext.getContext().asyncCall(()-> demoService.sayHello("zhangsan"));
36         Future<String> byeFuture = RpcContext.getContext().asyncCall(()->demoService.sayBye("lisi"));
37 
38         final String helloStr = helloFuture.get();//消耗5s
39         final String byeStr = byeFuture.get();//消耗8s
40 
41         System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//總消耗8s
42     }

Consumer啟動主類。其中asyncFuture2()方法是推薦用法,注意Callable(asyncCall方法的入參)只是一個任務task,不會新建線程;所以asyncFuture2()和asyncFuture1()相似,資源占用相同,都是用一根線程進行異步操作的。

 

二、asyncFuture1()源碼解析

先來看asyncFuture1(),總體步驟:

  • demoService.sayHello("zhangsan"); 創建一個Future對象,存入當前線程的上下文中
  • Future<String> helloFuture = RpcContext.getContext().getFuture(); 從當前線程的上下文中獲取第一步存入的Future對象
  • final String helloStr = helloFuture.get(); 阻塞等待,從Future中獲取結果

代碼主要執行流(代碼詳細執行流看文章開頭的三篇博客):

1、demoService.sayHello("zhangsan"); 

-->FutureFilter.invoke(final Invoker<?> invoker, final Invocation invocation)
   -->DubboInvoker.doInvoke(final Invocation invocation)

FutureFilter:

 1     public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
 2         final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
 3 
 4         fireInvokeCallback(invoker, invocation);
 5         // need to configure if there's return value before the invocation in order to help invoker to judge if it's
 6         // necessary to return future.
 7         Result result = invoker.invoke(invocation);
 8         if (isAsync) {
 9             asyncCallback(invoker, invocation);
10         } else {
11             syncCallback(invoker, invocation, result);
12         }
13         return result;
14     }

對於如上異步操作(asyncFuture1()和asyncFuture2()),FutureFilter沒起任何作用,該Filter主要會用在事件通知中,后續再說。

DubboInvoker.doInvoke(final Invocation invocation):

 1     protected Result doInvoke(final Invocation invocation) throws Throwable {
 2         RpcInvocation inv = (RpcInvocation) invocation;
 3         final String methodName = RpcUtils.getMethodName(invocation);
 4         inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
 5         inv.setAttachment(Constants.VERSION_KEY, version);
 6 
 7         ExchangeClient currentClient;
 8         if (clients.length == 1) {
 9             currentClient = clients[0];
10         } else {
11             currentClient = clients[index.getAndIncrement() % clients.length];
12         }
13         try {
14             boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
15             boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
16             int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
17             if (isOneway) { //無返回值
18                 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
19                 currentClient.send(inv, isSent);
20                 RpcContext.getContext().setFuture(null);
21                 return new RpcResult();
22             } else if (isAsync) { //異步有返回值
23                 ResponseFuture future = currentClient.request(inv, timeout);
24                 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
25                 return new RpcResult();
26             } else { //同步有返回值
27                 RpcContext.getContext().setFuture(null);
28                 return (Result) currentClient.request(inv, timeout).get();
29             }
30         } catch (TimeoutException e) {
31             throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
32         } catch (RemotingException e) {
33             throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
34         }
35     }

模式:

  • 如果是isOneway(不需要返回值),不管同步還是異步,請求直接發出,不會創建Future,直接返回RpcResult空對象。
  • 如果是isAsync(異步),則
    • 先創建ResponseFuture對象,之后使用FutureAdapter包裝該ResponseFuture對象;(創建ResponseFuture對象與同步的代碼相同,最后得到的是一個DefaultFuture對象)
    • 然后將該FutureAdapter對象設入當前線程的上下文中RpcContext.getContext();
    • 最后返回空的RpcResult
  • 如果是同步,則先創建ResponseFuture對象,之后直接調用其get()方法進行阻塞調用(見文章開頭的三篇文章)

簡單來看一下FutureAdapter:

 1 public class FutureAdapter<V> implements Future<V> {
 2 
 3     private final ResponseFuture future;
 4 
 5     public FutureAdapter(ResponseFuture future) {
 6         this.future = future;
 7     }
 8 
 9     public ResponseFuture getFuture() {
10         return future;
11     }
12 
13     public boolean cancel(boolean mayInterruptIfRunning) {
14         return false;
15     }
16 
17     public boolean isCancelled() {
18         return false;
19     }
20 
21     public boolean isDone() {
22         return future.isDone();
23     }
24 
25     @SuppressWarnings("unchecked")
26     public V get() throws InterruptedException, ExecutionException {
27         try {
28             return (V) (((Result) future.get()).recreate());
29         } catch (RemotingException e) {
30             throw new ExecutionException(e.getMessage(), e);
31         } catch (Throwable e) {
32             throw new RpcException(e);
33         }
34     }
35 
36     @SuppressWarnings("unchecked")
37     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
38         int timeoutInMillis = (int) unit.convert(timeout, TimeUnit.MILLISECONDS);
39         try {
40             return (V) (((Result) future.get(timeoutInMillis)).recreate());
41         } catch (com.alibaba.dubbo.remoting.TimeoutException e) {
42             throw new TimeoutException(StringUtils.toString(e));
43         } catch (RemotingException e) {
44             throw new ExecutionException(e.getMessage(), e);
45         } catch (Throwable e) {
46             throw new RpcException(e);
47         }
48     }
49 }

最后,回頭看一下FutureFilter:

 1     public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
 2         final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
 3 
 4         fireInvokeCallback(invoker, invocation);
 5         // need to configure if there's return value before the invocation in order to help invoker to judge if it's
 6         // necessary to return future.
 7         Result result = invoker.invoke(invocation);
 8         if (isAsync) {
 9             asyncCallback(invoker, invocation);
10         } else {
11             syncCallback(invoker, invocation, result);
12         }
13         return result;
14     }
 1     private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
 2         Future<?> f = RpcContext.getContext().getFuture();
 3         if (f instanceof FutureAdapter) {
 4             ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
 5             future.setCallback(new ResponseCallback() {
 6                 public void done(Object rpcResult) {
 7                     if (rpcResult == null) {
 8                         logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
 9                         return;
10                     }
11                     ///must be rpcResult
12                     if (!(rpcResult instanceof Result)) {
13                         logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
14                         return;
15                     }
16                     Result result = (Result) rpcResult;
17                     if (result.hasException()) {
18                         fireThrowCallback(invoker, invocation, result.getException());
19                     } else {
20                         fireReturnCallback(invoker, invocation, result.getValue());
21                     }
22                 }
23 
24                 public void caught(Throwable exception) {
25                     fireThrowCallback(invoker, invocation, exception);
26                 }
27             });
28         }
29     }

這里的future對象時之前創建好的DefaultFuture對象。

 1     private volatile Response response;
 2     private volatile ResponseCallback callback;
 3 
 4     public boolean isDone() {
 5         return response != null;
 6     }
 7 
 8     public void setCallback(ResponseCallback callback) {
 9         if (isDone()) {
10             invokeCallback(callback);
11         } else {
12             boolean isdone = false;
13             lock.lock();
14             try {
15                 if (!isDone()) {
16                     this.callback = callback;
17                 } else {
18                     isdone = true;
19                 }
20             } finally {
21                 lock.unlock();
22             }
23             if (isdone) {
24                 invokeCallback(callback);
25             }
26         }
27     }

這里判斷響應是否已經返回了,如果返回了,直接執行invokeCallback(callback),否則將傳入的ResponseCallback對象賦值給callback對象。

 

2、Future<String> helloFuture = RpcContext.getContext().getFuture(); 

RpcContext:

 1     private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
 2         @Override
 3         protected RpcContext initialValue() {
 4             return new RpcContext();
 5         }
 6     };
 7 
 8     private Future<?> future;
 9 
10     public static RpcContext getContext() {
11         return LOCAL.get();
12     }
13 
14     public <T> Future<T> getFuture() {
15         return (Future<T>) future;
16     }

從當前線程上下文中獲取之前存進去的FutureAdapter對象。

 

3、final String helloStr = helloFuture.get(); 

helloFuture是上述的FutureAdapter對象,其get()調用的是內部的DefaultFuture的get(),該方法與同步調用時相同,源碼分析見文章開頭的三篇文章。

1     public V get() throws InterruptedException, ExecutionException {
2         try {
3             return (V) (((Result) future.get()).recreate());
4         } catch (RemotingException e) {
5             throw new ExecutionException(e.getMessage(), e);
6         } catch (Throwable e) {
7             throw new RpcException(e);
8         }
9     }

get方法的超時設置除了直接在xml中配置之外,還可以在代碼中手動執行(優先級高) 

1 final String helloStr2 = helloFuture.get(7000, TimeUnit.MILLISECONDS);

 

三、asyncFuture2()源碼解析

下面來看一下asyncFuture2()源碼:

1、Future<String> helloFuture = RpcContext.getContext().asyncCall(()-> demoService.sayHello("zhangsan"));

 1     public <T> Future<T> asyncCall(Callable<T> callable) {
 2         try {
 3             try {
 4                 setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
 5                 // 1 執行傳入的任務(此處創建FutureAdapter對象,並且設置到當前線程的RpcContext的future對象中)
 6                 final T o = callable.call();
 7                 //local invoke will return directly
 8                 if (o != null) {
 9                     FutureTask<T> f = new FutureTask<T>(new Callable<T>() {
10                         public T call() throws Exception {
11                             return o;
12                         }
13                     });
14                     f.run();
15                     return f;
16                 } else {
17 
18                 }
19             } catch (Exception e) {
20                 throw new RpcException(e);
21             } finally {
22                 removeAttachment(Constants.ASYNC_KEY);
23             }
24         } catch (final RpcException e) {
25             return new Future<T>() {
26                 public boolean cancel(boolean mayInterruptIfRunning) {
27                     return false;
28                 }
29 
30                 public boolean isCancelled() {
31                     return false;
32                 }
33 
34                 public boolean isDone() {
35                     return true;
36                 }
37 
38                 public T get() throws InterruptedException, ExecutionException {
39                     throw new ExecutionException(e.getCause());
40                 }
41 
42                 public T get(long timeout, TimeUnit unit)
43                         throws InterruptedException, ExecutionException,
44                         TimeoutException {
45                     return get();
46                 }
47             };
48         }
49         // 2 從當前線程的RpcContext中獲取future對象
50         return ((Future<T>) getContext().getFuture());
51     }

這里外層的catch的作用是什么?沒搞清楚 https://github.com/alibaba/dubbo/issues/1346

2、final String helloStr = helloFuture.get();

與同步相同。

 

總結:dubbo異步與同步的差別:

  • 同步:創建DefaultFuture之后,直接get阻塞等待;
  • 異步:創建DefaultFuture之后,使用FutureAdapter進行包裝,之后設置到當前線程的RpcContext中;后續用戶在合適的時候自己從RpcContext獲取future,之后get。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM