Dubbo是一款開源的RPC中間件框架,底層數據傳輸默認使用的Netty,那么請求的處理理論上是異步的,為什么我們在使用的時候是同步的呢?肯定是Dubbo框架,做了異步轉同步的處理。
首先我們來梳理下,異步轉同步,我們的需求是怎樣的?
1、調用方請求遠程服務之后,需要等待結果,此刻,請求線程應該阻塞
2、遠程服務返回結果后,喚醒請求線程,調用方得到結果
Dubbo異步轉同步,核心類是DefaultFuture,核心方法是get(),received(Channel channel, Response response)。
DefaultFuture構造函數:
1 private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>(); 2 3 // 每次請求都會生成一個DefaultFuture對象,然后保存到FUTURES中,請求返回結果時,根據id從FUTURES中找到對應的DefaultFuture對象,並刪除 4 private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); 5 6 // AtomicLong從0開始遞增,創建Request對象時生成的id 7 private final long id; 8 private final Channel channel; 9 // 請求對象 10 private final Request request; 11 // 超時的設置 12 private final int timeout; 13 // 這里使用Lock和Condition實現等待通知機制 14 private final Lock lock = new ReentrantLock(); 15 private final Condition done = lock.newCondition(); 16 private final long start = System.currentTimeMillis(); 17 private volatile long sent; 18 // 請求的返回結果 19 private volatile Response response; 20 private volatile ResponseCallback callback; 21 22 public DefaultFuture(Channel channel, Request request, int timeout) { 23 this.channel = channel; 24 this.request = request; 25 this.id = request.getId(); 26 this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 27 // put into waiting map. 28 FUTURES.put(id, this); 29 CHANNELS.put(id, channel); 30 }
get():
1 public Object get(int timeout) throws RemotingException { 2 if (timeout <= 0) { 3 timeout = Constants.DEFAULT_TIMEOUT; 4 } 5 // isDone()方法就是判斷Response是否有值(即是否有返回結果) 6 if (!isDone()) { 7 long start = System.currentTimeMillis(); 8 lock.lock(); 9 try { 10 while (!isDone()) { 11 // 超時等待 12 done.await(timeout, TimeUnit.MILLISECONDS); 13 // 如果有返回結果了,或者,超時了,就退出循環 14 if (isDone() || System.currentTimeMillis() - start > timeout) { 15 break; 16 } 17 } 18 } catch (InterruptedException e) { 19 throw new RuntimeException(e); 20 } finally { 21 lock.unlock(); 22 } 23 // 如果是超時了,就拋出異常 24 if (!isDone()) { 25 throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); 26 } 27 } 28 // 遠程服務正常返回結果,則返回給調用方 29 return returnFromResponse(); 30 }
received(Channel channel, Response response):
1 public static void received(Channel channel, Response response) { 2 try { 3 // 根據請求id從FUTURES中獲取DefaultFuture,並刪除 4 DefaultFuture future = FUTURES.remove(response.getId()); 5 if (future != null) { 6 future.doReceived(response); 7 } else { 8 logger.warn("The timeout response finally returned at " 9 + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 10 + ", response " + response 11 + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 12 + " -> " + channel.getRemoteAddress())); 13 } 14 } finally { 15 // CHANNELS也刪除 16 CHANNELS.remove(response.getId()); 17 } 18 }
1 private void doReceived(Response res) { 2 lock.lock(); 3 try { 4 response = res; 5 if (done != null) { 6 // 喚醒阻塞的線程 7 done.signal(); 8 } 9 } finally { 10 lock.unlock(); 11 } 12 if (callback != null) { 13 invokeCallback(callback); 14 } 15 }
總結:Dubbo異步轉同步的原理,其實就是利用Lock和Condition實現了等待通知機制。請求與返回結果進行匹配,則是通過傳遞以及接收請求id實現的。