一、Dubbo官方说明
Provider端异步执行将阻塞的业务从Dubbo内部线程池切换到业务自定义线程,避免Dubbo线程池的过度占用,有助于避免不同服务间的互相影响。异步执行无益于节省资源或提升RPC响应性能,因为如果业务执行需要阻塞,则始终还是要有线程来负责执行。
第一种使用方式
服务接口定义:
public interface AsyncService { CompletableFuture<String> sayHello(String name); }
服务实现:
public class AsyncServiceImpl implements AsyncService { @Override public CompletableFuture<String> sayHello(String name) { RpcContext savedContext = RpcContext.getContext(); // 建议为supplyAsync提供自定义线程池,避免使用JDK公用线程池 return CompletableFuture.supplyAsync(() -> { System.out.println(savedContext.getAttachment("consumer-key1")); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "async response from provider."; }); } }
通过return CompletableFuture.supplyAsync()
,业务执行已从Dubbo线程切换到业务线程,避免了对Dubbo线程池的阻塞。
第二种使用方式:使用AsyncContext
Dubbo提供了一个类似Serverlet 3.0的异步接口AsyncContext
,在没有CompletableFuture签名接口的情况下,也可以实现Provider端的异步执行。
服务接口定义:
public interface AsyncService { String sayHello(String name); }
服务暴露,和普通服务完全一致:
<bean id="asyncService" class="org.apache.dubbo.samples.governance.impl.AsyncServiceImpl"/> <dubbo:service interface="org.apache.dubbo.samples.governance.api.AsyncService" ref="asyncService"/>
服务实现:
public class AsyncServiceImpl implements AsyncService { public String sayHello(String name) { final AsyncContext asyncContext = RpcContext.startAsync(); new Thread(() -> { // 如果要使用上下文,则必须要放在第一句执行 asyncContext.signalContextSwitch(); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } // 写回响应 asyncContext.write("Hello " + name + ", response from provider."); }).start(); return null; } }
二、Dubbo源码的实现:
看处理请求的关键入口HeaderExchangeHandler的handleRequest方法:
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { ... } // find handler by message class. Object msg = req.getData(); try { CompletionStage<Object> future = handler.reply(channel, msg); future.whenComplete((appResult, t) -> { try { if (t == null) { res.setStatus(Response.OK); res.setResult(appResult); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } channel.send(res); } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } finally { // HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }
最关键的handler.reply(channel, msg);这句代码返回是CompletionStage<Object>,然后我们进到DubboProtocol的reply方法:
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv); return result.completionFuture().thenApply(Function.identity()); }
里面主要调用AbstractProxyInvoker的invoke方法返回了Result,代码如下:
@Override public Result invoke(Invocation invocation) throws RpcException { try { Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); CompletableFuture<Object> future = wrapWithFuture(value, invocation); AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation); future.whenComplete((obj, t) -> { AppResponse result = new AppResponse(); if (t != null) { if (t instanceof CompletionException) { result.setException(t.getCause()); } else { result.setException(t); } } else { result.setValue(obj); } asyncRpcResult.complete(result); }); return asyncRpcResult; } catch (InvocationTargetException e) { if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) { logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e); } return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } }
这里面最主要是CompletableFuture方法:
private CompletableFuture<Object> wrapWithFuture (Object value, Invocation invocation) { //如果启用了异步,则直接返回异步上下文中启动时创建的CompletableFuture if (RpcContext.getContext().isAsyncStarted()) { return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture(); } //如果返回类型是CompletableFuture,则直接返回结果 else if (value instanceof CompletableFuture) { return (CompletableFuture<Object>) value; } //否则直接从结果中包装成CompletableFuture return CompletableFuture.completedFuture(value); }
这里面的逻辑就是就是Provider异步实现的逻辑。