Dubbo缺省協議采用單一長連接和NIO異步通訊,適合於小數據量大並發的服務調用,以及服務消費者機器數遠大於服務提供者機器數的情況。

Dubbo缺省協議,使用基於mina1.1.7+hessian3.2.1的tbremoting交互。
- 連接個數:單連接
- 連接方式:長連接
- 傳輸協議:TCP
- 傳輸方式:NIO異步傳輸
- 序列化:Hessian二進制序列化
- 適用范圍:傳入傳出參數數據包較小(建議小於100K),消費者比提供者個數多,單一消費者無法壓滿提供者,盡量不要用dubbo協議傳輸大文件或超大字符串。
- 適用場景:常規遠程服務方法調用
通常,一個典型的同步遠程調用應該是這樣的:
1,
客戶端線程調用遠程接口,向服務端發送請求,同時當前線程應該處於“暫停“狀態,即線程不能向后執行了,必需要拿到服務端給自己的結果后才能向后執行
2, 服務端接到客戶端請求后,處理請求,將結果給客戶端
3, 客戶端收到結果,然后當前線程繼續往后執行
Dubbo里使用到了Socket(采用apache mina框架做底層調用)來建立長連接,發送、接收數據,底層使用apache mina框架的IoSession進行發送消息。
Dubbo底層使用Socket發送消息的形式進行數據傳遞,結合了mina框架,使用IoSession.write()方法,這個方法調用后對於整個遠程調用(從發出請求到接收到結果)來說是一個異步的,即對於當前線程來說,將請求發送出來,線程就可以往后執行了,至於服務端的結果,是服務端處理完成后,再以消息的形式發送給客戶端的。
於是這里出現了2個問題:
- 當前線程怎么讓它“暫停”,等結果回來后,再向后執行?
- 正如前面所說,Socket通信是一個全雙工的方式,如果有多個線程同時進行遠程方法調用,這時建立在client server之間的socket連接上會有很多雙方發送的消息傳遞,前后順序也可能是亂七八糟的,server處理完結果后,將結果消息發送給client,client收到很多消息,怎么知道哪個消息結果是原先哪個線程調用的?
基本原理如下:
- client一個線程調用遠程接口,生成一個唯一的ID(比如一段隨機字符串,UUID等),Dubbo是使用AtomicLong從0開始累計數字的
- 將打包的方法調用信息(如調用的接口名稱,方法名稱,參數值列表等),和處理結果的回調對象callback,全部封裝在一起,組成一個對象object
- 向專門存放調用信息的全局ConcurrentHashMap里面put(ID, object)
- 將ID和打包的方法調用信息封裝成一對象connRequest,使用IoSession.write(connRequest)異步發送出去
- 當前線程再使用callback的get()方法試圖獲取遠程返回的結果,在get()內部,則使用synchronized獲取回調對象callback的鎖, 再先檢測是否已經獲取到結果,如果沒有,然后調用callback的wait()方法,釋放callback上的鎖,讓當前線程處於等待狀態。
- 服務端接收到請求並處理后,將結果(此結果中包含了前面的ID,即回傳)發送給客戶端,客戶端socket連接上專門監聽消息的線程收到消息,分析結果,取到ID,再從前面的ConcurrentHashMap里面get(ID),從而找到callback,將方法調用結果設置到callback對象里。
- 監聽線程接着使用synchronized獲取回調對象callback的鎖(因為前面調用過wait(),那個線程已釋放callback的鎖了),再notifyAll(),喚醒前面處於等待狀態的線程繼續執行(callback的get()方法繼續執行就能拿到調用結果了),至此,整個過程結束。
需要注意的是,這里的callback對象是每次調用產生一個新的,不能共享,否則會有問題;另外ID必需至少保證在一個Socket連接里面是唯一的。
現在,前面兩個問題已經有答案了,
- 當前線程怎么讓它“暫停”,等結果回來后,再向后執行?
- 正如前面所說,Socket通信是一個全雙工的方式,如果有多個線程同時進行遠程方法調用,這時建立在client server之間的socket連接上會有很多雙方發送的消息傳遞,前后順序也可能是亂七八糟的,server處理完結果后,將結果消息發送給client,client收到很多消息,怎么知道哪個消息結果是原先哪個線程調用的?
答:使用一個ID,讓其唯一,然后傳遞給服務端,再服務端又回傳回來,這樣就知道結果是原先哪個線程的了。
另外:
服務端在處理客戶端的消息,然后再處理時,使用了線程池來並行處理,不用一個一個消息的處理
同樣,客戶端接收到服務端的消息,也是使用線程池來處理消息,再回調
消息中間件rabbitmq遠程接口調用,同步調用的原理跟這類似,詳見:rabbitmq 學習-9- RpcClient發送消息和同步接收消息原理
關鍵代碼:
com.taobao.remoting.impl.DefaultClient.java //同步調用遠程接口 public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException { byte protocol = getProtocol(control); if (!TRConstants.isValidProtocol(protocol)) { throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync."); } ResponseFuture future = invokeWithFuture(appRequest, control); return future.get(); //獲取結果時讓當前線程等待,ResponseFuture其實就是前面說的callback } public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) { byte protocol = getProtocol(control); long timeout = getTimeout(control); ConnectionRequest request = new ConnectionRequest(appRequest); request.setSerializeProtocol(protocol); Callback2FutureAdapter adapter = new Callback2FutureAdapter(request); connection.sendRequestWithCallback(request, adapter, timeout); return adapter; }
Callback2FutureAdapter implements ResponseFuture public Object get() throws RemotingException, InterruptedException { synchronized (this) { // 旋鎖 while (!isDone) { // 是否有結果了 wait(); //沒結果是釋放鎖,讓當前線程處於等待狀態 } } if (errorCode == TRConstants.RESULT_TIMEOUT) { throw new TimeoutException("Wait response timeout, request[" + connectionRequest.getAppRequest() + "]."); } else if (errorCode > 0) { throw new RemotingException(errorMsg); } else { return appResp; } } 客戶端收到服務端結果后,回調時相關方法,即設置isDone = true並notifyAll() public void handleResponse(Object _appResponse) { appResp = _appResponse; //將遠程調用結果設置到callback中來 setDone(); } public void onRemotingException(int _errorType, String _errorMsg) { errorCode = _errorType; errorMsg = _errorMsg; setDone(); } private void setDone() { isDone = true; synchronized (this) { //獲取鎖,因為前面wait()已經釋放了callback的鎖了 notifyAll(); // 喚醒處於等待的線程 } }
CallbackExecutorTask static private class CallbackExecutorTask implements Runnable { final ConnectionResponse resp; final ResponseCallback callback; final Thread createThread; CallbackExecutorTask(ConnectionResponse _resp, ResponseCallback _cb) { resp = _resp; callback = _cb; createThread = Thread.currentThread(); } public void run() { // 預防這種情況:業務提供的Executor,讓調用者線程來執行任務 if (createThread == Thread.currentThread() && callback.getExecutor() != DIYExecutor.getInstance()) { StringBuilder sb = new StringBuilder(); sb.append("The network callback task [" + resp.getRequestId() + "] cancelled, cause:"); sb.append("Can not callback task on the network io thhread."); LOGGER.warn(sb.toString()); return; } if (TRConstants.RESULT_SUCCESS == resp.getResult()) { callback.handleResponse(resp.getAppResponse()); //設置調用結果 } else { callback.onRemotingException(resp.getResult(), resp .getErrorMsg()); //處理調用異常 } } }
轉自http://blog.163.com/tsing_hua/blog/static/1396222242012819557547/