Dubbo2.7.3之后Provider端異步的實現


一、Dubbo官方說明

Provider端異步執行將阻塞的業務從Dubbo內部線程池切換到業務自定義線程,避免Dubbo線程池的過度占用,有助於避免不同服務間的互相影響。異步執行無益於節省資源或提升RPC響應性能,因為如果業務執行需要阻塞,則始終還是要有線程來負責執行。

第一種使用方式

服務接口定義:

public interface AsyncService {
    CompletableFuture<String> sayHello(String name);
}

服務實現:

public class AsyncServiceImpl implements AsyncService {
    @Override
    public CompletableFuture<String> sayHello(String name) {
        RpcContext savedContext = RpcContext.getContext();
        // 建議為supplyAsync提供自定義線程池,避免使用JDK公用線程池
        return CompletableFuture.supplyAsync(() -> {
            System.out.println(savedContext.getAttachment("consumer-key1"));
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "async response from provider.";
        });
    }
}

 通過return CompletableFuture.supplyAsync(),業務執行已從Dubbo線程切換到業務線程,避免了對Dubbo線程池的阻塞。

第二種使用方式:使用AsyncContext

Dubbo提供了一個類似Serverlet 3.0的異步接口AsyncContext,在沒有CompletableFuture簽名接口的情況下,也可以實現Provider端的異步執行。

服務接口定義:

public interface AsyncService {
    String sayHello(String name);
}

服務暴露,和普通服務完全一致:

<bean id="asyncService" class="org.apache.dubbo.samples.governance.impl.AsyncServiceImpl"/>
<dubbo:service interface="org.apache.dubbo.samples.governance.api.AsyncService" ref="asyncService"/>

服務實現:

public class AsyncServiceImpl implements AsyncService {
    public String sayHello(String name) {
        final AsyncContext asyncContext = RpcContext.startAsync();
        new Thread(() -> {
            // 如果要使用上下文,則必須要放在第一句執行
            asyncContext.signalContextSwitch();
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 寫回響應
            asyncContext.write("Hello " + name + ", response from provider.");
        }).start();
        return null;
    }
}

 

二、Dubbo源碼的實現:

看處理請求的關鍵入口HeaderExchangeHandler的handleRequest方法:

    void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {
...
        }
        // find handler by message class.
        Object msg = req.getData();
        try {
            CompletionStage<Object> future = handler.reply(channel, msg);
            future.whenComplete((appResult, t) -> {
                try {
                    if (t == null) {
                        res.setStatus(Response.OK);
                        res.setResult(appResult);
                    } else {
                        res.setStatus(Response.SERVICE_ERROR);
                        res.setErrorMessage(StringUtils.toString(t));
                    }
                    channel.send(res);
                } catch (RemotingException e) {
                    logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
                } finally {
                    // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
                }
            });
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
            channel.send(res);
        }
    }

 最關鍵的handler.reply(channel, msg);這句代碼返回是CompletionStage<Object>,然后我們進到DubboProtocol的reply方法:

        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

            Invocation inv = (Invocation) message;
            Invoker<?> invoker = getInvoker(channel, inv);

            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {

            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            Result result = invoker.invoke(inv);
            return result.completionFuture().thenApply(Function.identity());
        }

  里面主要調用AbstractProxyInvoker的invoke方法返回了Result,代碼如下:

 @Override
    public Result invoke(Invocation invocation) throws RpcException {
        try {
            Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            CompletableFuture<Object> future = wrapWithFuture(value, invocation);
            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
            future.whenComplete((obj, t) -> {
                AppResponse result = new AppResponse();
                if (t != null) {
                    if (t instanceof CompletionException) {
                        result.setException(t.getCause());
                    } else {
                        result.setException(t);
                    }
                } else {
                    result.setValue(obj);
                }
                asyncRpcResult.complete(result);
            });
            return asyncRpcResult;
        } catch (InvocationTargetException e) {
            if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
                logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
            }
            return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

  這里面最主要是CompletableFuture方法:

private CompletableFuture<Object> wrapWithFuture (Object value, Invocation invocation) {
       //如果啟用了異步,則直接返回異步上下文中啟動時創建的CompletableFuture 
        if (RpcContext.getContext().isAsyncStarted()) {
            return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture();
        } 
//如果返回類型是CompletableFuture,則直接返回結果
else if (value instanceof CompletableFuture) {
            return (CompletableFuture<Object>) value;
        }
//否則直接從結果中包裝成CompletableFuture
        return CompletableFuture.completedFuture(value);
    }

  這里面的邏輯就是就是Provider異步實現的邏輯。

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM