RPC-非阻塞通信下的同步API實現原理,以Dubbo為例


  Netty在Java NIO領域基本算是獨占鰲頭,涉及到高性能網絡通信,基本都會以Netty為底層通信框架,Dubbo 也不例外。以下將以Dubbo實現為例介紹其是如何在NIO非阻塞通信基礎上實現同步通信的。

    Dubbo為一種RPC通信框架,提供進程間的通信,在使用dubbo協議+Netty作為傳輸層時,提供三種API調用方式:

  1. 同步接口
  2. 異步帶回調接口
  3. 異步不帶回調接口

    同步接口適用在大部分環境,通信方式簡單、可靠,客戶端發起調用,等待服務端處理,調用結果同步返回。這種方式下,在高吞吐、高性能(響應時間很快)的服務接口場景中最為適用,可以減少異步帶來的額外的消耗,也方便客戶端做一致性保證。

 

    異步帶回調接口,用在任務處理時間較長,客戶端應用線程不願阻塞等待,而是為了提高自身處理能力希望服務端處理完成后可以異步通知應用線程。這種方式可以大大提升客戶端的吞吐量,避免因為服務端的耗時問題拖死客戶端。

    異步不帶回調接口,一些場景為了進一步提升客戶端的吞吐能力,只需發起一次服務端調用,不需關系調用結果,可以使用此種通信方式。一般在不需要嚴格保證數據一致性或者有其他補償措施的情況下,選用這種,可以最小化遠程調用帶來的性能損耗。

    

    來看一下Dubbo是如何實現這三種API的。核心代碼在com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker,如下圖對應的位置,屬於協議層的實現部分。為方便大家可以准確定位代碼所在位置,使用截圖的方式,而不是直接貼代碼了。

    上文描述的是三種API方式,Dubbo里面通過參數isOneway、isAsync來控制,isOneway=true表示異步不帶回調,isAsync=true表示異步帶回調,否則是同步API。具體是如何控制,看以下代碼:

    isOneway==true時,客戶端send完請求后,直接return一個空結果的RpcResult;isAsync==true時,客戶端發起請求,設置一個ResponseFuture,直接return一個空結果的RpcResult,接下來當服務端處理完成,客戶端Netty層在收到響應后會通過Future通知應用線程;最后是同步情況下,客戶端發起請求,並通過get()方法阻塞等待服務端的響應結果。

    異步API情況下,結合NIO模型比較好理解是如何實現的(當然需要先了解NIO的reactor模型),接下來重點理解下,這個get()阻塞方法是如何做到基於非阻塞NIO實現同步阻塞效果。

    直接進入get()方法內部。

    可以看到是利用Java的鎖機制實現,循環判斷是否收到響應,如果收到或者等待超時則返回。done的實例對象如下:

private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition();

    使用可重入鎖ReentrantLock,獲取一個Condition對象在其上做await操作。這里有await操作,何時被喚醒呢,有兩個條件,第一個是等待timeout超時,默認dubbo是1s,第二個就是被其他線程喚醒,即收到了服務端的響應。

    signal信號一發出,上文循環檢測內的await操作會立即返回,下一次isDone判斷會變成true,直接跳出循環。

    仔細看代碼會發現,被喚醒的地方還有一個是在DefaultFuture內部有一個超時輪詢檢測的線程,這個線程主要是處理響應超時后觸發資源回收、記錄異常日志等操作。    

private static class RemotingInvocationTimeoutScan implements Runnable {

        public void run() {
            while (true) {
                try {
                    for (DefaultFuture future : FUTURES.values()) {
                        if (future == null || future.isDone()) {
                            continue;
                        }
                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                            // create exception response.
                            Response timeoutResponse = new Response(future.getId());
                            // set timeout status.
                            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
                            // handle response.
                            DefaultFuture.received(future.getChannel(), timeoutResponse);
                        }
                    }
                    Thread.sleep(30);
                } catch (Throwable e) {
                    logger.error("Exception when scan the timeout invocation of remoting.", e);
                }
            }
        }
    }

    static {
        Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
        th.setDaemon(true);
        th.start();
    }

 

      可能會有疑問,這個觸發操作為何不直接在get()方法內部檢測到超時直接調用DefaultFuture.received(Channel channel, Response response)來清理,而是要額外開啟一個后台線程。

    單獨啟動一個超時線程有兩個好處:

  1.  提高超時精度

      get()方法內部的輪詢有一個timeout,每次超時喚醒的時間間隔至少是timeout時長,最差的情況可能會等待2*timeout作出超時反應。在超時輪詢線程中,每隔30ms遍歷檢測一次,可以很大程度的提升超時精度。

       2.  提升性能,降低響應時間

      剝離超時處理邏輯到一個單獨線程,可以減少對業務線程的時間占用,這個超時后的處理對應用來說並無直接作用,完全可以放到后台異步去處理。另外單獨在一個線程中,實際上有批量處理的表現。

    以上是就NIO通信基礎上實現三種API調用的實現原理,或許有更多優於Dubbo的處理方式,可以拿出來討論。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM