一、Dubbo官方說明
Provider端異步執行將阻塞的業務從Dubbo內部線程池切換到業務自定義線程,避免Dubbo線程池的過度占用,有助於避免不同服務間的互相影響。異步執行無益於節省資源或提升RPC響應性能,因為如果業務執行需要阻塞,則始終還是要有線程來負責執行。
第一種使用方式
服務接口定義:
public interface AsyncService { CompletableFuture<String> sayHello(String name); }
服務實現:
public class AsyncServiceImpl implements AsyncService { @Override public CompletableFuture<String> sayHello(String name) { RpcContext savedContext = RpcContext.getContext(); // 建議為supplyAsync提供自定義線程池,避免使用JDK公用線程池 return CompletableFuture.supplyAsync(() -> { System.out.println(savedContext.getAttachment("consumer-key1")); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "async response from provider."; }); } }
通過return CompletableFuture.supplyAsync()
,業務執行已從Dubbo線程切換到業務線程,避免了對Dubbo線程池的阻塞。
第二種使用方式:使用AsyncContext
Dubbo提供了一個類似Serverlet 3.0的異步接口AsyncContext
,在沒有CompletableFuture簽名接口的情況下,也可以實現Provider端的異步執行。
服務接口定義:
public interface AsyncService { String sayHello(String name); }
服務暴露,和普通服務完全一致:
<bean id="asyncService" class="org.apache.dubbo.samples.governance.impl.AsyncServiceImpl"/> <dubbo:service interface="org.apache.dubbo.samples.governance.api.AsyncService" ref="asyncService"/>
服務實現:
public class AsyncServiceImpl implements AsyncService { public String sayHello(String name) { final AsyncContext asyncContext = RpcContext.startAsync(); new Thread(() -> { // 如果要使用上下文,則必須要放在第一句執行 asyncContext.signalContextSwitch(); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } // 寫回響應 asyncContext.write("Hello " + name + ", response from provider."); }).start(); return null; } }
二、Dubbo源碼的實現:
看處理請求的關鍵入口HeaderExchangeHandler的handleRequest方法:
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { ... } // find handler by message class. Object msg = req.getData(); try { CompletionStage<Object> future = handler.reply(channel, msg); future.whenComplete((appResult, t) -> { try { if (t == null) { res.setStatus(Response.OK); res.setResult(appResult); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } finally { // HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }
最關鍵的handler.reply(channel, msg);這句代碼返回是CompletionStage<Object>,然后我們進到DubboProtocol的reply方法:
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv); return result.completionFuture().thenApply(Function.identity()); }
里面主要調用AbstractProxyInvoker的invoke方法返回了Result,代碼如下:
@Override public Result invoke(Invocation invocation) throws RpcException { try { Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); CompletableFuture<Object> future = wrapWithFuture(value, invocation); AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation); future.whenComplete((obj, t) -> { AppResponse result = new AppResponse(); if (t != null) { if (t instanceof CompletionException) { result.setException(t.getCause()); } else { result.setException(t); } } else { result.setValue(obj); } asyncRpcResult.complete(result); }); return asyncRpcResult; } catch (InvocationTargetException e) { if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) { logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e); } return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } }
這里面最主要是CompletableFuture方法:
private CompletableFuture<Object> wrapWithFuture (Object value, Invocation invocation) { //如果啟用了異步,則直接返回異步上下文中啟動時創建的CompletableFuture if (RpcContext.getContext().isAsyncStarted()) { return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture(); } //如果返回類型是CompletableFuture,則直接返回結果 else if (value instanceof CompletableFuture) { return (CompletableFuture<Object>) value; } //否則直接從結果中包裝成CompletableFuture return CompletableFuture.completedFuture(value); }
這里面的邏輯就是就是Provider異步實現的邏輯。