我們知道,Dubbo 缺省協議采用單一長連接,底層實現是 Netty 的 NIO 異步通訊機制;基於這種機制,Dubbo 實現了以下幾種調用方式:
- 同步調用(默認)
- 異步調用
- 參數回調
- 事件通知
同步調用
同步調用是一種阻塞式的調用方式,即 Consumer 端代碼一直阻塞等待,直到 Provider 端返回為止;
通常,一個典型的同步調用過程如下:
- Consumer 業務線程調用遠程接口,向 Provider 發送請求,同時當前線程處於
阻塞
狀態; - Provider 接到 Consumer 的請求后,開始處理請求,將結果返回給 Consumer;
- Consumer 收到結果后,當前線程繼續往后執行。
這里有 2 個問題:
- Consumer 業務線程是怎么進入
阻塞
狀態的? - Consumer 收到結果后,如何喚醒業務線程往后執行的?
其實,Dubbo 的底層 IO 操作都是異步的。Consumer 端發起調用后,得到一個 Future 對象。對於同步調用,業務線程通過Future#get(timeout)
,阻塞等待 Provider 端將結果返回;timeout
則是 Consumer 端定義的超時時間。當結果返回后,會設置到此 Future,並喚醒阻塞的業務線程;當超時時間到結果還未返回時,業務線程將會異常返回。
異步調用
1、NIO future主動獲取結果,返回結果放在RpcContext中
基於 Dubbo 底層的異步 NIO 實現異步調用,對於 Provider 響應時間較長的場景是必須的,它能有效利用 Consumer 端的資源,相對於 Consumer 端使用多線程來說開銷較小。
異步調用,對於 Provider 端不需要做特別的配置。下面的例子中,Provider 端接口定義如下:
public interface AsyncService { String goodbye(String name); }
Consumer 配置
<dubbo:reference id="asyncService" interface="com.alibaba.dubbo.samples.async.api.AsyncService">
<dubbo:method name="goodbye" async="true"/>
</dubbo:reference>
需要異步調用的方法,均需要使用 <dubbo:method/>
標簽進行描述
Consumer 端發起調用
AsyncService service = ...; String result = service.goodbye("samples");// 這里的返回值為空,請不要使用 Future<String> future = RpcContext.getContext().getFuture(); ... // 業務線程可以開始做其他事情 result = future.get(); // 阻塞需要獲取異步結果時,也可以使用 get(timeout, unit) 設置超時時間
Dubbo Consumer 端發起調用后,同時通過RpcContext.getContext().getFuture()
獲取跟返回結果關聯的Future
對象,然后就可以開始處理其他任務;當需要這次異步調用的結果時,可以在任意時刻通過future.get(timeout)
來獲取。
一些特殊場景下,為了盡快調用返回,可以設置是否等待消息發出:
sent="true"
等待消息發出,消息發送失敗將拋出異常;sent="false"
不等待消息發出,將消息放入 IO 隊列,即刻返回。
默認為false
。配置方式如下:
<dubbo:method name="goodbye" async="true" sent="true" />
如果你只是想異步,完全忽略返回值,可以配置 return="false"
,以減少 Future 對象的創建和管理成本:
<dubbo:method name="goodbye" async="true" return="false"/>
此時,RpcContext.getContext().getFuture()
將返回null
。
整個異步調用的時序圖如下:
此示例代碼位於:https://github.com/dubbo/dubbo-samples/tree/master/dubbo-samples-async
2、參數回調
參數回調有點類似於本地 Callback 機制,但 Callback 並不是 Dubbo 內部的類或接口,而是由 Provider 端自定義的;Dubbo 將基於長連接生成反向代理,從而實現從 Provider 端調用 Consumer 端的邏輯。
Provider 端定義 Service 和 Callback
public interface CallbackService { void addListener(String key, CallbackListener listener); } public interface CallbackListener { void changed(String msg); }
Provider 端 Service 實現
public class CallbackServiceImpl implements CallbackService { private final Map<String, CallbackListener> listeners = new ConcurrentHashMap<String, CallbackListener>(); public CallbackServiceImpl() { Thread t = new Thread(new Runnable() { public void run() { while (true) { try { for (Map.Entry<String, CallbackListener> entry : listeners.entrySet()) { try { entry.getValue().changed(getChanged(entry.getKey())); } catch (Throwable t) { listeners.remove(entry.getKey()); } } Thread.sleep(5000); // timely trigger change event } catch (Throwable t) { t.printStackTrace(); } } } }); t.setDaemon(true); t.start(); } public void addListener(String key, CallbackListener listener) { listeners.put(key, listener); listener.changed(getChanged(key)); // send notification for change } private String getChanged(String key) { return "Changed: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); } }
Provider 端暴露服務
<bean id="callbackService" class="com.alibaba.dubbo.samples.callback.impl.CallbackServiceImpl"/> <dubbo:service interface="com.alibaba.dubbo.samples.callback.api.CallbackService" ref="callbackService" connections="1" callbacks="1000"> <dubbo:method name="addListener"> <dubbo:argument index="1" callback="true"/> <!--<dubbo:argument type="com.demo.CallbackListener" callback="true" />--> </dubbo:method> </dubbo:service>
這里,Provider 需要在方法中聲明哪個參數是 Callback 參數。
Consumer 端實現 Callback 接口
CallbackService callbackService = ...; callbackService.addListener("foo.bar", new CallbackListener() { public void changed(String msg) { System.out.println("callback1:" + msg); } });
Callback 接口的實現類在 Consumer 端,當方法發生調用時,Consumer 端會自動 export 一個 Callback 服務。而 Provider 端在處理調用時,判斷如果參數是 Callback,則生成了一個 proxy,因此服務實現類里在調用 Callback 方法的時候,會被傳遞到 Consumer 端執行 Callback 實現類的代碼。
上述示例代碼位於:https://github.com/dubbo/dubbo-samples/tree/master/dubbo-samples-callback
這種調用方式有點像消息的發布和訂閱,但又有區別。比如當 Consumer 端 完成了Callback 服務的 export 后,如果后續重啟了,這時 Provider 端就會調不通;同時 Provider 端如何清理掉這個 proxy 也是一個問題。
3、事件通知
事件通知允許 Consumer 端在調用之前、調用之后或出現異常時,觸發 oninvoke
、onreturn
、onthrow
三個事件。
可以通過在配置 Consumer 時,指定事件需要通知的方法,如:
<bean id="demoCallback" class="com.alibaba.dubbo.samples.notify.impl.NotifyImpl" /> <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.samples.notify.api.DemoService" version="1.0.0" group="cn"> <dubbo:method name="sayHello" onreturn="demoCallback.onreturn" onthrow="demoCallback.onthrow"/> </dubbo:reference>
其中,NotifyImpl 的代碼如下:
public class NotifyImpl implements Notify{ public Map<Integer, String> ret = new HashMap<Integer, String>(); public void onreturn(String name, int id) { ret.put(id, name); System.out.println("onreturn: " + name); } public void onthrow(Throwable ex, String name, int id) { System.out.println("onthrow: " + name); } }
這里要強調一點,自定義 Notify 接口中的三個方法的參數規則如下:
oninvoke
方法參數與調用方法的參數相同;onreturn
方法第一個參數為調用方法的返回值,其余為調用方法的參數;onthrow
方法第一個參數為調用異常,其余為調用方法的參數。
上述配置中,sayHello
方法為同步調用,因此事件通知方法的執行也是同步執行。可以配置 async=true
讓方法調用為異步,這時事件通知的方法也是異步執行的。特別強調一下,oninvoke
方法不管是否異步調用,都是同步執行的。
事件通知的示例代碼請參考:https://github.com/dubbo/dubbo-samples/tree/master/dubbo-samples-notify