Dubbo異步轉同步


  Dubbo是一款開源的RPC中間件框架,底層數據傳輸默認使用的Netty,那么請求的處理理論上是異步的,為什么我們在使用的時候是同步的呢?肯定是Dubbo框架,做了異步轉同步的處理。

  首先我們來梳理下,異步轉同步,我們的需求是怎樣的?

  1、調用方請求遠程服務之后,需要等待結果,此刻,請求線程應該阻塞

  2、遠程服務返回結果后,喚醒請求線程,調用方得到結果

  Dubbo異步轉同步,核心類是DefaultFuture,核心方法是get(),received(Channel channel, Response response)。

  DefaultFuture構造函數:

 1     private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
 2 
 3    // 每次請求都會生成一個DefaultFuture對象,然后保存到FUTURES中,請求返回結果時,根據id從FUTURES中找到對應的DefaultFuture對象,並刪除
 4     private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
 5 
 6     // AtomicLong從0開始遞增,創建Request對象時生成的id
 7     private final long id;
 8     private final Channel channel;
 9     // 請求對象
10     private final Request request;
11     // 超時的設置
12     private final int timeout;
13     // 這里使用Lock和Condition實現等待通知機制
14     private final Lock lock = new ReentrantLock();
15     private final Condition done = lock.newCondition();
16     private final long start = System.currentTimeMillis();
17     private volatile long sent;
18     // 請求的返回結果
19     private volatile Response response;
20     private volatile ResponseCallback callback;
21 
22     public DefaultFuture(Channel channel, Request request, int timeout) {
23         this.channel = channel;
24         this.request = request;
25         this.id = request.getId();
26         this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
27         // put into waiting map.
28         FUTURES.put(id, this);
29         CHANNELS.put(id, channel);
30     }

 

  get():

 1 public Object get(int timeout) throws RemotingException {
 2         if (timeout <= 0) {
 3             timeout = Constants.DEFAULT_TIMEOUT;
 4         }
 5         // isDone()方法就是判斷Response是否有值(即是否有返回結果)
 6         if (!isDone()) {
 7             long start = System.currentTimeMillis();
 8             lock.lock();
 9             try {
10                 while (!isDone()) {
11                     // 超時等待
12                     done.await(timeout, TimeUnit.MILLISECONDS);
13                     // 如果有返回結果了,或者,超時了,就退出循環
14                     if (isDone() || System.currentTimeMillis() - start > timeout) {
15                         break;
16                     }
17                 }
18             } catch (InterruptedException e) {
19                 throw new RuntimeException(e);
20             } finally {
21                 lock.unlock();
22             }
23             // 如果是超時了,就拋出異常
24             if (!isDone()) {
25                 throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
26             }
27         }
28         // 遠程服務正常返回結果,則返回給調用方
29         return returnFromResponse();
30     }

 

  received(Channel channel, Response response):

 1 public static void received(Channel channel, Response response) {
 2         try {
 3             // 根據請求id從FUTURES中獲取DefaultFuture,並刪除
 4             DefaultFuture future = FUTURES.remove(response.getId());
 5             if (future != null) {
 6                 future.doReceived(response);
 7             } else {
 8                 logger.warn("The timeout response finally returned at "
 9                         + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
10                         + ", response " + response
11                         + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
12                         + " -> " + channel.getRemoteAddress()));
13             }
14         } finally {
15             // CHANNELS也刪除
16             CHANNELS.remove(response.getId());
17         }
18     }
 1 private void doReceived(Response res) {
 2         lock.lock();
 3         try {
 4             response = res;
 5             if (done != null) {
 6                 // 喚醒阻塞的線程
 7                 done.signal();
 8             }
 9         } finally {
10             lock.unlock();
11         }
12         if (callback != null) {
13             invokeCallback(callback);
14         }
15     }

 

  總結:Dubbo異步轉同步的原理,其實就是利用Lock和Condition實現了等待通知機制。請求與返回結果進行匹配,則是通過傳遞以及接收請求id實現的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM