dubbo協議下的單一長連接與多線程並發如何協同工作


上班的路上突然就冒出了這么個問題:既然在dubbo中描述消費者和提供者之間采用的是單一長連接,那么如果消費者端是高並發多線程模型的web應用,單一長連接如何解決多線程並發請求問題呢?

其實如果不太了解socket或者多線程編程的相關知識,不太容易理解這個問題。傳統的最簡單的RPC方式,應該是為每次遠程調用請求創建一個對應的線程,我們先不說這種方式的缺點。至少優點很明顯,就是簡單。簡單體現在哪兒?

通信雙方一對一(相比NIO來說)。

通俗點來說,socket通信的雙方發送和接受數據不會被其它(線程)干擾,這種干擾不同於數數據包的“粘包問題”。其實說白了就相當於電話線路的場景:

試想一下如果多個人同時對着同一個話筒大喊,對方接受到的聲音就會是重疊且雜亂的。

對於單一的socket通道來說,如果發送方多線程的話,不加控制就會導致通道中的數據亂七八糟,接收端無法區分數據的單位,也就無法正確的處理請求。

乍一看,似乎dubbo協議所說的單一長連接與客戶端多線程並發請求之間,是水火不容的。但其實稍加設計,就可以讓它們和諧相處。

socket中的粘包問題是怎么解決的?用的最多的其實是定義一個定長的數據包頭,其中包含了完整數據包的長度,以此來完成服務器端拆包工作。

那么解決多線程使用單一長連接並發請求時包干擾的方法也有點雷同,就是給包頭中添加一個標識id,服務器端響應請求時也要攜帶這個id,供客戶端多線程領取對應的響應數據提供線索。

其實如果不考慮性能的話,dubbo完全也可以為每個客戶端線程創建一個對應的服務器端線程,但這是海量高並發場景所不能接受的~~

那么腦補一張圖:

下面咱們試圖從代碼中找到痕跡。

一路追蹤,我們來到這個類:com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.java,先來看看其中的request方法,大概在第101行左右:

 public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); //這個future就是前面我們提到的:客戶端並發請求線程阻塞的對象 DefaultFuture future = new DefaultFuture(channel, req, timeout); try{ channel.send(req); //非阻塞調用 }catch (RemotingException e) { future.cancel(); throw e; } return future; } 

注意這個方法返回的ResponseFuture對象,當前處理客戶端請求的線程在經過一系列調用后,會拿到ResponseFuture對象,最終該線程會阻塞在這個對象的下面這個方法調用上,如下:

public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (! isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (! isDone()) { //無限連 done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (! isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); } 

上面我已經看到請求線程已經阻塞,那么又是如何被喚醒的呢?再看一下com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.java,其實所有實現了ChannelHandler接口的類都被設計為裝飾器模式,所以你可以看到類似這樣的代碼:

 protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler( new HeartbeatHandler( ExtensionLoader.getExtensionLoader(Dispather.class).getAdaptiveExtension().dispath(handler, url) )); } 

現在來仔細看一下HeaderExchangeHandler類的定義,先看一下它定義的received方法,下面是代碼片段:

public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { ..... } else if (message instanceof Response) { //這里就是作為消費者的dubbo客戶端在接收到響應后,觸發通知對應等待線程的起點 handleResponse(channel, (Response) message); } else if (message instanceof String) { ..... } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } 

我們主要看中間的那個條件分支,它是用來處理響應消息的,也就是說當dubbo客戶端接收到來自服務端的響應后會執行到這個分支,它簡單的調用了handleResponse方法,我們追過去看看:

static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { //排除心跳類型的響應 DefaultFuture.received(channel, response); } } 

熟悉的身影:DefaultFuture,它是實現了我們上面說的ResponseFuture接口類型,實際上細心的童鞋應該可以看到,上面request方法中其實實例化的就是這個DefaultFutrue對象:

DefaultFuture future = new DefaultFuture(channel, req, timeout); 

那么我們可以繼續來看一下DefaultFuture.received方法的實現細節:

public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } } 

留一下我們之前提到的id的作用,這里可以看到它已經開始發揮作用了。通過idDefaultFuture.FUTURES可以拿到具體的那個DefaultFuture對象,它就是上面我們提到的,阻塞請求線程的那個對象。好,找到目標后,調用它的doReceived方法,這就是標准的java多線程編程知識了:

private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } } 

這樣我們就可以證實上圖中左邊的綠色箭頭所標注的兩點。


接下來我們再來看看右邊綠色箭頭提到的兩點是如何實現的?其實dubbo在NIO的實現上默認依賴的是netty,也就是說真正在長連接兩端發包和接包的苦力是netty。由於哥們我對netty不是很熟悉,所以暫時我們就直接把netty當做黑箱,只需要知道它可以很好的完成NIO通信即可。

 

 

 

參考鏈接:https://blog.csdn.net/joeyon1985/article/details/51046548


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM