dubbo事件通知機制:http://dubbo.io/books/dubbo-user-book/demos/events-notify.html
一、使用方式
兩個服務:
- DemoService:真正要調用的服務
- Notify:事件通知服務(用在consumer端)
provider:
1 package com.alibaba.dubbo.demo; 2 3 public interface DemoService { 4 String sayHello(String name); 5 }
1 public class DemoServiceImpl implements DemoService { 2 @Override 3 public String sayHello(String name) { 4 throw new RpcException("ex, param: " + name);//測試onthrow方法 5 // return "Hello " + name;//測試onreturn方法 6 } 7 }
consumer:
通知服務:Notify
1 package com.alibaba.dubbo.demo.consumer.eventnotify; 2 3 public interface Notify { 4 void oninvoke(String name); // 調用之前 5 void onreturnWithoutParam(String result); // 調用之后 6 void onreturn(String result, String name); // 調用之后 7 void onthrow(Throwable ex, String name); // 出現異常 8 }
1 package com.alibaba.dubbo.demo.consumer.eventnotify; 2 3 public class NotifyService implements Notify { 4 @Override 5 public void oninvoke(String name) { 6 System.out.println("======oninvoke======, param: " + name); 7 } 8 9 @Override 10 public void onreturnWithoutParam(String result) { 11 System.out.println("======onreturn======, result: " + result); 12 } 13 14 @Override 15 public void onreturn(String result, String name) { 16 System.out.println("======onreturn======, param: " + name + ", result: " + result); 17 } 18 19 @Override 20 public void onthrow(Throwable ex, String name) { 21 System.out.println("======onthrow======, param: " + name + ", exception: " + ex.getMessage()); 22 } 23 }
xml配置:
1 <bean id="notifyService" class="com.alibaba.dubbo.demo.consumer.eventnotify.NotifyService"/> 2 <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"> 3 <dubbo:method name="sayHello" timeout="60000" oninvoke="notifyService.oninvoke" onreturn="notifyService.onreturnWithoutParam" onthrow="notifyService.onthrow"/> 4 </dubbo:reference>
之后就可以運行Consumer啟動類,之后調用demoService.sayHello(String name)了。
注意:
- oninvoke方法:
- 必須具有與真實的被調用方法sayHello相同的入參列表:例如,oninvoke(String name)
- onreturn方法:
- 至少要有一個入參且第一個入參必須與sayHello的返回類型相同,接收返回結果:例如,onreturnWithoutParam(String result)
- 可以有多個參數,多個參數的情況下,第一個后邊的所有參數都是用來接收sayHello入參的:例如, onreturn(String result, String name)
- onthrow方法:
- 至少要有一個入參且第一個入參類型為Throwable或其子類,接收返回結果;例如,onthrow(Throwable ex)
- 可以有多個參數,多個參數的情況下,第一個后邊的所有參數都是用來接收sayHello入參的:例如,onthrow(Throwable ex, String name)
- 如果是consumer在調用provider的過程中,出現異常時不會走onthrow方法的,onthrow方法只會在provider返回的RpcResult中含有Exception對象時,才會執行。(dubbo中下層服務的Exception會被放在響應RpcResult的exception對象中傳遞給上層服務)
二、源碼解析
整個事件通知的邏輯都在FutureFilter中,來看一下源碼:
1 /** 2 * EventFilter 3 */ 4 @Activate(group = Constants.CONSUMER) 5 public class FutureFilter implements Filter { 6 7 protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class); 8 9 public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException { 10 final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); 11 12 //1 調用服務之前:執行xxxService.oninvoke方法 13 fireInvokeCallback(invoker, invocation); 14 //2 調用服務 15 Result result = invoker.invoke(invocation); 16 //3 調用服務之后 17 if (isAsync) { 18 asyncCallback(invoker, invocation); 19 } else { 20 syncCallback(invoker, invocation, result); 21 } 22 //4 返回調用結果 23 return result; 24 } 25 26 private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) { 27 if (result.hasException()) { 28 //3.1 調用服務之后:如果返回結果異常信息(注意:如果是consumer自己throw的異常,會在2的時候直接拋走,不會走到這里),直接執行xxxService.onthrow方法 29 fireThrowCallback(invoker, invocation, result.getException()); 30 } else { 31 //3.2 調用服務之后:如果返回值正常,執行xxxService.onreturn方法 32 fireReturnCallback(invoker, invocation, result.getValue()); 33 } 34 } 35 36 private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) { 37 Future<?> f = RpcContext.getContext().getFuture(); 38 if (f instanceof FutureAdapter) { 39 ResponseFuture future = ((FutureAdapter<?>) f).getFuture(); 40 // 3.1 調用服務之后:設置回調ResponseCallback對象到DefaultFuture中,當provider返回響應時,執行DefaultFuture.doReceived方法,該方法會調用ResponseCallback對象的done或者caught方法 41 future.setCallback(new ResponseCallback() { 42 public void done(Object rpcResult) { 43 if (rpcResult == null) { 44 logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName())); 45 return; 46 } 47 ///must be rpcResult 48 if (!(rpcResult instanceof Result)) { 49 logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName())); 50 return; 51 } 52 Result result = (Result) rpcResult; 53 if (result.hasException()) { 54 fireThrowCallback(invoker, invocation, result.getException()); 55 } else { 56 fireReturnCallback(invoker, invocation, result.getValue()); 57 } 58 } 59 60 public void caught(Throwable exception) { 61 fireThrowCallback(invoker, invocation, exception); 62 } 63 }); 64 } 65 } 66 67 /** 68 * 反射執行xxxService.oninvoke方法:必須具有與真實的被調用方法sayHello相同的入參列表。 69 */ 70 private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) { 71 final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY)); 72 final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY)); 73 74 if (onInvokeMethod == null && onInvokeInst == null) { 75 return; 76 } 77 if (onInvokeMethod == null || onInvokeInst == null) { 78 throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); 79 } 80 if (onInvokeMethod != null && !onInvokeMethod.isAccessible()) { 81 onInvokeMethod.setAccessible(true); 82 } 83 // 獲取真實方法sayHello傳入的參數 84 Object[] params = invocation.getArguments(); 85 try { 86 onInvokeMethod.invoke(onInvokeInst, params); 87 } catch (InvocationTargetException e) { 88 fireThrowCallback(invoker, invocation, e.getTargetException()); 89 } catch (Throwable e) { 90 fireThrowCallback(invoker, invocation, e); 91 } 92 } 93 94 /** 95 * 反射執行xxxService.onreturn方法:至少要有一個入參,接收返回結果 96 */ 97 private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) { 98 final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY)); 99 final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY)); 100 101 //not set onreturn callback 102 if (onReturnMethod == null && onReturnInst == null) { 103 return; 104 } 105 106 if (onReturnMethod == null || onReturnInst == null) { 107 throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); 108 } 109 if (onReturnMethod != null && !onReturnMethod.isAccessible()) { 110 onReturnMethod.setAccessible(true); 111 } 112 113 Object[] args = invocation.getArguments(); 114 Object[] params; 115 Class<?>[] rParaTypes = onReturnMethod.getParameterTypes(); 116 if (rParaTypes.length > 1) { 117 // onreturn(xx, Object[]) 兩個參數:第一個參數與真實方法sayHello方法返回結果類型相同,第二個接收所有的真實請求參數 118 if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) { 119 params = new Object[2]; 120 params[0] = result; // 真實方法的執行結果 121 params[1] = args; // 真實方法sayHello傳入的參數 122 // onreturn(xx, Object... args) 多個參數:第一個參數與真實方法sayHello方法返回結果類型相同,后邊幾個接收所有的真實請求參數 123 } else { 124 params = new Object[args.length + 1]; 125 params[0] = result; // 真實方法的執行結果 126 System.arraycopy(args, 0, params, 1, args.length); 127 } 128 } else { 129 // onreturn(xx) 只有一個參數:接收返回執行結果 130 params = new Object[]{result}; // 執行結果 131 } 132 try { 133 onReturnMethod.invoke(onReturnInst, params); 134 } catch (InvocationTargetException e) { 135 fireThrowCallback(invoker, invocation, e.getTargetException()); 136 } catch (Throwable e) { 137 fireThrowCallback(invoker, invocation, e); 138 } 139 } 140 141 /** 142 * 反射執行xxxService.onthrow方法:至少要有一個入參且第一個入參類型為Throwable或其子類,接收返回結果 143 */ 144 private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) { 145 final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY)); 146 final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY)); 147 148 //onthrow callback not configured 149 if (onthrowMethod == null && onthrowInst == null) { 150 return; 151 } 152 if (onthrowMethod == null || onthrowInst == null) { 153 throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); 154 } 155 if (onthrowMethod != null && !onthrowMethod.isAccessible()) { 156 onthrowMethod.setAccessible(true); 157 } 158 Class<?>[] rParaTypes = onthrowMethod.getParameterTypes(); 159 if (rParaTypes[0].isAssignableFrom(exception.getClass())) { 160 try { 161 Object[] args = invocation.getArguments(); 162 Object[] params; 163 164 if (rParaTypes.length > 1) { 165 // onthrow(xx, Object[]) 兩個參數:第一個參數接收exception,第二個接收所有的真實請求參數 166 if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) { 167 params = new Object[2]; 168 params[0] = exception; 169 params[1] = args; 170 // onthrow(xx, Object... args) 多個參數:第一個參數接收exception,后邊幾個接收所有的真實請求參數 171 } else { 172 params = new Object[args.length + 1]; 173 params[0] = exception; 174 System.arraycopy(args, 0, params, 1, args.length); 175 } 176 } else { 177 // onthrow(xx) 只有一個參數:接收exception 178 params = new Object[]{exception}; 179 } 180 onthrowMethod.invoke(onthrowInst, params); 181 } catch (Throwable e) { 182 logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e); 183 } 184 } else { 185 logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception); 186 } 187 } 188 }
從@Activate(group = Constants.CONSUMER)來看FutureFilter只用在consumer端;不管是同步調用還是異步調用,都會走FutureFilter。
原理:
- 首先走oninvoke(String name)方法;
- 然后走sayHello(String name)
- 最后根據同步還是異步分別走不同的邏輯。
其中同步很簡單,看sayHello(String name)的返回結果RpcResult中是否有exception對象,如果有,執行onthrow(Throwable ex, String name);如果沒有執行onreturnWithoutParam(String result)。
異步的操作:由於不知道provider什么時候回執行完畢,所以要添加回調等待provider端返回結果后,再執行onthrow(Throwable ex, String name)或者onreturnWithoutParam(String result),這種模式很重要,這是統計異步方法調用時間的一種非常好的模式。
重點看一下異步!
三、異步回調模式
1 private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) { 2 Future<?> f = RpcContext.getContext().getFuture(); 3 if (f instanceof FutureAdapter) { 4 ResponseFuture future = ((FutureAdapter<?>) f).getFuture(); 5 // 3.1 調用服務之后:設置回調ResponseCallback對象到DefaultFuture中,當provider返回響應時,執行DefaultFuture.doReceived方法,該方法會調用ResponseCallback對象的done或者caught方法 6 future.setCallback(new ResponseCallback() { 7 public void done(Object rpcResult) { 8 if (rpcResult == null) { 9 logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName())); 10 return; 11 } 12 ///must be rpcResult 13 if (!(rpcResult instanceof Result)) { 14 logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName())); 15 return; 16 } 17 Result result = (Result) rpcResult; 18 if (result.hasException()) { 19 fireThrowCallback(invoker, invocation, result.getException()); 20 } else { 21 fireReturnCallback(invoker, invocation, result.getValue()); 22 } 23 } 24 25 public void caught(Throwable exception) { 26 fireThrowCallback(invoker, invocation, exception); 27 } 28 }); 29 } 30 }
上述的future對象是DefaultFuture,這里首先new了一個ResponseCallback回調函數,設置到了DefaultFuture的ResponseCallback callback屬性中。來看一下DefaultFuture類:
1 private volatile Response response; 2 private volatile ResponseCallback callback; 3 4 public boolean isDone() { 5 return response != null; 6 } 7 8 public void setCallback(ResponseCallback callback) { 9 if (isDone()) { 10 invokeCallback(callback); 11 } else { 12 boolean isdone = false; 13 lock.lock(); 14 try { 15 if (!isDone()) { 16 this.callback = callback; 17 } else { 18 isdone = true; 19 } 20 } finally { 21 lock.unlock(); 22 } 23 if (isdone) { 24 invokeCallback(callback); 25 } 26 } 27 }
1 private void invokeCallback(ResponseCallback c) { 2 ResponseCallback callbackCopy = c; 3 if (callbackCopy == null) { 4 throw new NullPointerException("callback cannot be null."); 5 } 6 c = null; 7 Response res = response; 8 if (res == null) { 9 throw new IllegalStateException("response cannot be null. url:" + channel.getUrl()); 10 } 11 12 if (res.getStatus() == Response.OK) { 13 try { 14 // 返回正常,回調ResponseCallback回調函數的done方法 15 callbackCopy.done(res.getResult()); 16 } catch (Exception e) { 17 logger.error("callback invoke error .reasult:" + res.getResult() + ",url:" + channel.getUrl(), e); 18 } 19 } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { 20 try { 21 TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); 22 // 如果超時,回調ResponseCallback回調函數的caught方法 23 callbackCopy.caught(te); 24 } catch (Exception e) { 25 logger.error("callback invoke error ,url:" + channel.getUrl(), e); 26 } 27 } else { 28 try { 29 RuntimeException re = new RuntimeException(res.getErrorMessage()); 30 // 其他異常,回調ResponseCallback回調函數的caught方法 31 callbackCopy.caught(re); 32 } catch (Exception e) { 33 logger.error("callback invoke error ,url:" + channel.getUrl(), e); 34 } 35 } 36 }
從setCallback(ResponseCallback callback),如果此時provider端已經返回了響應(response!=null),則直接執行ResponseCallback回調函數中的done方法或者caught方法;否則,將上邊創建的ResponseCallback實例賦值給DefaultFuture的ResponseCallback callback屬性中。那么之后會在什么時候執行回調函數的方法呢?當consumer接收到provider的響應的時候!
1 public static void received(Channel channel, Response response) { 2 try { 3 DefaultFuture future = FUTURES.remove(response.getId()); 4 if (future != null) { 5 future.doReceived(response); 6 } else { 7 logger.warn("The timeout response finally returned at " 8 + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 9 + ", response " + response 10 + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 11 + " -> " + channel.getRemoteAddress())); 12 } 13 } finally { 14 CHANNELS.remove(response.getId()); 15 } 16 } 17 18 private void doReceived(Response res) { 19 lock.lock(); 20 try { 21 response = res; 22 if (done != null) { 23 done.signal(); 24 } 25 } finally { 26 lock.unlock(); 27 } 28 // 調用回調函數 29 if (callback != null) { 30 invokeCallback(callback); 31 } 32 }
當provider返回響應時,會調用DefaultFuture.received(Channel channel, Response response)方法(9.3 客戶端接收響應信息(異步轉同步的實現)),此時會執行回調函數。事件通知的源碼就分析完了!最后看一個回調模式的使用場景:統計異步方法的調用時間。
1 private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) { 2 Future<?> f = RpcContext.getContext().getFuture(); 3 final long start = System.currentTimeMillis(); 4 if (f instanceof FutureAdapter) { 5 ResponseFuture future = ((FutureAdapter<?>) f).getFuture(); 6 future.setCallback(new ResponseCallback() { 7 public void done(Object rpcResult) { 8 long cost = System.currentTimeMillis() - start; 9 } 10 }); 11 } 12 }
上邊的代碼只是一個形式,實際上start時間需要在調用sayHello方法之前進行記錄。