Dubbo事件回調


方法說明
  • oninvoke方法:必須具有與真實的被調用方法sayHello相同的入參列表:例如,oninvoke(String name)
  • onreturn方法:至少要有一個入參且第一個入參必須與sayHello的返回類型相同,接收返回結果:例如,onreturnWithoutParam(String result),可以有多個參數,多個參數的情況下,第一個后邊的所有參數都是用來接收sayHello入參的:例如, onreturn(String result, String name)
  • onthrow方法:至少要有一個入參且第一個入參類型為Throwable或其子類,接收返回結果;例如,onthrow(Throwable ex),可以有多個參數,多個參數的情況下,第一個后邊的所有參數都是用來接收sayHello入參的:例如,onthrow(Throwable ex, String name)
  • 如果是consumer在調用provider的過程中,出現異常時不會走onthrow方法的,onthrow方法只會在provider返回的RpcResult中含有Exception對象時,才會執行。(dubbo中下層服務的Exception會被放在響應RpcResult的exception對象中傳遞給上層服務)
 
 
一、使用示例
 
服務消費者端
public interface Notify {
    void onreturnNoParam(String result); // 調用之后,沒有參數的
    void onreturn(String result, String name); // 調用之后
    void onthrow(Throwable ex, String name); // 出現異常
}

public class NotifyService implements Notify {

    @Override
    public void onreturnNoParam(String result) {
        System.out.println("======onreturnNoParam======, result: " + result);
    }

    @Override
    public void onreturn(String result, String name) {
        System.out.println("======onreturn======, param: " + name + ", result: " + result);
    }

    @Override
    public void onthrow(Throwable ex, String name) {
        System.out.println("======onthrow======, param: " + name + ", exception: " + ex.getMessage());
    }
}

 

<bean id="notifyService"  class="com.notify.NotifyService"/>
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
    <dubbo:method name="sayHello" retries="0" 
                  async="true"  onreturn="notifyService.onreturn" onthrow="notifyService.onthrow"></dubbo:method>
</dubbo:reference>

 

 
測試結果輸出:
main方法直接返回的結果:null
======onreturn======, param: world, result: Hello world, response form provider: 192.168.215.1:20880

注意:如果上面使用的是異步,則返回的結果為null

 
二、源碼分析
 
 
整個事件通知的邏輯都在FutureFilter中,來看一下源碼:
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
    final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
    // 1 調用服務之前:執行xxxService.oninvoke方法
    fireInvokeCallback(invoker, invocation);
    // need to configure if there's return value before the invocation in order to help invoker to judge if it's
    // necessary to return future.
    // 2 調用服務
    Result result = invoker.invoke(invocation);
    // 3 調用服務之后
    if (isAsync) {
        asyncCallback(invoker, invocation);
    } else {
        syncCallback(invoker, invocation, result);
    }
    // 4 返回調用結果
    return result;
}

 

fireInvokeCallback
反射執行xxxService.oninvoke方法:必須具有與真實的被調用方法sayHello相同的入參列表。
private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
    final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
    final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));

    if (onInvokeMethod == null && onInvokeInst == null) {
        return;
    }
    if (onInvokeMethod == null || onInvokeInst == null) {
        throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() 
                                        + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") 
                                        + " found. url:" + invoker.getUrl());
    }
    if (onInvokeMethod != null && !onInvokeMethod.isAccessible()) {
        onInvokeMethod.setAccessible(true);
    }
    // 獲取真實方法sayHello傳入的參數
    Object[] params = invocation.getArguments();
    try {
        onInvokeMethod.invoke(onInvokeInst, params);
    } catch (InvocationTargetException e) {
        fireThrowCallback(invoker, invocation, e.getTargetException());
    } catch (Throwable e) {
        fireThrowCallback(invoker, invocation, e);
    }
}

 

 
再來看一下同步調用和異步調用
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
    Future<?> f = RpcContext.getContext().getFuture();
    if (f instanceof FutureAdapter) {
        ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
        // 3.1 調用服務之后:設置回調ResponseCallback對象到DefaultFuture中,
        // 當provider返回響應時,執行DefaultFuture.doReceived方法,該方法會調用ResponseCallback對象的done或者caught方法
        future.setCallback(new ResponseCallback() {
            public void done(Object rpcResult) {
                if (rpcResult == null) {
                    logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
                    return;
                }
                ///must be rpcResult
                if (!(rpcResult instanceof Result)) {
                    logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " 
                                                           + Result.class.getName()));
                    return;
                }
                Result result = (Result) rpcResult;
                if (result.hasException()) {
                    fireThrowCallback(invoker, invocation, result.getException());
                } else {
                    fireReturnCallback(invoker, invocation, result.getValue());
                }
            }

            public void caught(Throwable exception) {
                fireThrowCallback(invoker, invocation, exception);
            }
        });
    }
}

 

private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
    if (result.hasException()) {
        //3.1 調用服務之后:如果返回結果異常信息(注意:如果是consumer自己throw的異常,會在2的時候直接拋走,不會走到這里),直接執行xxxService.onthrow方法
        fireThrowCallback(invoker, invocation, result.getException());
    } else {
        //3.2 調用服務之后:如果返回值正常,執行xxxService.onreturn方法
        fireReturnCallback(invoker, invocation, result.getValue());
    }
}

 

 
 
異步與同步的區別
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM