dubbo同步/異步調用的方式


我們知道,Dubbo 缺省協議采用單一長連接,底層實現是 Netty 的 NIO 異步通訊機制;基於這種機制,Dubbo 實現了以下幾種調用方式: 

  • 同步調用(默認)
  • 異步調用
  • 參數回調
  • 事件通知

同步調用

同步調用是一種阻塞式的調用方式,即 Consumer 端代碼一直阻塞等待,直到 Provider 端返回為止;

通常,一個典型的同步調用過程如下:

  1. Consumer 業務線程調用遠程接口,向 Provider 發送請求,同時當前線程處於阻塞狀態;
  2. Provider 接到 Consumer 的請求后,開始處理請求,將結果返回給 Consumer;
  3. Consumer 收到結果后,當前線程繼續往后執行。

這里有 2 個問題:

  1. Consumer 業務線程是怎么進入阻塞狀態的?
  2. Consumer 收到結果后,如何喚醒業務線程往后執行的?

其實,Dubbo 的底層 IO 操作都是異步的。Consumer 端發起調用后,得到一個 Future 對象。對於同步調用,業務線程通過Future#get(timeout),阻塞等待 Provider 端將結果返回;timeout則是 Consumer 端定義的超時時間。當結果返回后,會設置到此 Future,並喚醒阻塞的業務線程;當超時時間到結果還未返回時,業務線程將會異常返回。

異步調用

1、NIO future主動獲取結果,返回結果放在RpcContext中

  基於 Dubbo 底層的異步 NIO 實現異步調用,對於 Provider 響應時間較長的場景是必須的,它能有效利用 Consumer 端的資源,相對於 Consumer 端使用多線程來說開銷較小。

   異步調用,對於 Provider 端不需要做特別的配置。下面的例子中,Provider 端接口定義如下:

public interface AsyncService {
    String goodbye(String name);
}

Consumer 配置

<dubbo:reference id="asyncService" interface="com.alibaba.dubbo.samples.async.api.AsyncService">
    <dubbo:method name="goodbye" async="true"/>
</dubbo:reference>

需要異步調用的方法,均需要使用 <dubbo:method/>標簽進行描述

Consumer 端發起調用

AsyncService service = ...;
String result = service.goodbye("samples");// 這里的返回值為空,請不要使用
Future<String> future = RpcContext.getContext().getFuture();
... // 業務線程可以開始做其他事情
result = future.get(); // 阻塞需要獲取異步結果時,也可以使用 get(timeout, unit) 設置超時時間

Dubbo Consumer 端發起調用后,同時通過RpcContext.getContext().getFuture()獲取跟返回結果關聯的Future對象,然后就可以開始處理其他任務;當需要這次異步調用的結果時,可以在任意時刻通過future.get(timeout)來獲取。

一些特殊場景下,為了盡快調用返回,可以設置是否等待消息發出:

  • sent="true" 等待消息發出,消息發送失敗將拋出異常;
  • sent="false" 不等待消息發出,將消息放入 IO 隊列,即刻返回。

默認為false。配置方式如下:

<dubbo:method name="goodbye" async="true" sent="true" />

如果你只是想異步,完全忽略返回值,可以配置 return="false",以減少 Future 對象的創建和管理成本:

<dubbo:method name="goodbye" async="true" return="false"/>

此時,RpcContext.getContext().getFuture()將返回null

整個異步調用的時序圖如下:

 

 

 此示例代碼位於:https://github.com/dubbo/dubbo-samples/tree/master/dubbo-samples-async

2、參數回調

參數回調有點類似於本地 Callback 機制,但 Callback 並不是 Dubbo 內部的類或接口,而是由 Provider 端自定義的;Dubbo 將基於長連接生成反向代理,從而實現從 Provider 端調用 Consumer 端的邏輯。

Provider 端定義 Service 和 Callback

public interface CallbackService {
    void addListener(String key, CallbackListener listener);
}

public interface CallbackListener {
    void changed(String msg);
}

Provider 端 Service 實現

public class CallbackServiceImpl implements CallbackService {

    private final Map<String, CallbackListener> listeners = new ConcurrentHashMap<String, CallbackListener>();

    public CallbackServiceImpl() {
        Thread t = new Thread(new Runnable() {
            public void run() {
                while (true) {
                    try {
                        for (Map.Entry<String, CallbackListener> entry : listeners.entrySet()) {
                            try {
                                entry.getValue().changed(getChanged(entry.getKey()));
                            } catch (Throwable t) {
                                listeners.remove(entry.getKey());
                            }
                        }
                        Thread.sleep(5000); // timely trigger change event
                    } catch (Throwable t) {
                        t.printStackTrace();
                    }
                }
            }
        });
        t.setDaemon(true);
        t.start();
    }

    public void addListener(String key, CallbackListener listener) {
        listeners.put(key, listener);
        listener.changed(getChanged(key)); // send notification for change
    }

    private String getChanged(String key) {
        return "Changed: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
    }
}

Provider 端暴露服務

<bean id="callbackService" class="com.alibaba.dubbo.samples.callback.impl.CallbackServiceImpl"/>

<dubbo:service interface="com.alibaba.dubbo.samples.callback.api.CallbackService" ref="callbackService" connections="1" callbacks="1000">
    <dubbo:method name="addListener">
        <dubbo:argument index="1" callback="true"/>
        <!--<dubbo:argument type="com.demo.CallbackListener" callback="true" />-->
    </dubbo:method>
</dubbo:service>

這里,Provider 需要在方法中聲明哪個參數是 Callback 參數。

Consumer 端實現 Callback 接口

CallbackService callbackService = ...;
callbackService.addListener("foo.bar", new CallbackListener() {
        public void changed(String msg) {
            System.out.println("callback1:" + msg);
        }
});

Callback 接口的實現類在 Consumer 端,當方法發生調用時,Consumer 端會自動 export 一個 Callback 服務。而 Provider 端在處理調用時,判斷如果參數是 Callback,則生成了一個 proxy,因此服務實現類里在調用 Callback 方法的時候,會被傳遞到 Consumer 端執行 Callback 實現類的代碼。

上述示例代碼位於:https://github.com/dubbo/dubbo-samples/tree/master/dubbo-samples-callback

這種調用方式有點像消息的發布和訂閱,但又有區別。比如當 Consumer 端 完成了Callback 服務的 export 后,如果后續重啟了,這時 Provider 端就會調不通;同時 Provider 端如何清理掉這個 proxy 也是一個問題。

3、事件通知

事件通知允許 Consumer 端在調用之前、調用之后或出現異常時,觸發 oninvokeonreturnonthrow 三個事件。

可以通過在配置 Consumer 時,指定事件需要通知的方法,如:

<bean id="demoCallback" class="com.alibaba.dubbo.samples.notify.impl.NotifyImpl" />

<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.samples.notify.api.DemoService" version="1.0.0" group="cn">
    <dubbo:method name="sayHello" onreturn="demoCallback.onreturn" onthrow="demoCallback.onthrow"/>
</dubbo:reference>

其中,NotifyImpl 的代碼如下:

public class NotifyImpl implements Notify{

    public Map<Integer, String> ret = new HashMap<Integer, String>();
    
    public void onreturn(String name, int id) {
        ret.put(id, name);
        System.out.println("onreturn: " + name);
    }

    public void onthrow(Throwable ex, String name, int id) {
        System.out.println("onthrow: " + name);
    }
}

這里要強調一點,自定義 Notify 接口中的三個方法的參數規則如下:

  • oninvoke 方法參數與調用方法的參數相同;
  • onreturn方法第一個參數為調用方法的返回值,其余為調用方法的參數;
  • onthrow方法第一個參數為調用異常,其余為調用方法的參數。

上述配置中,sayHello方法為同步調用,因此事件通知方法的執行也是同步執行。可以配置 async=true讓方法調用為異步,這時事件通知的方法也是異步執行的。特別強調一下,oninvoke方法不管是否異步調用,都是同步執行的。

事件通知的示例代碼請參考:https://github.com/dubbo/dubbo-samples/tree/master/dubbo-samples-notify


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM