5.源碼分析---SOFARPC調用服務


我們這一次來接着上一篇文章《4. 源碼分析---SOFARPC服務端暴露》講一下服務暴露之后被客戶端調用之后服務端是怎么返回數據的。

示例我們還是和上篇文章一樣使用一樣的bolt協議來講:

    public static void main(String[] args) {
        ServerConfig serverConfig = new ServerConfig()
                .setProtocol("bolt") // 設置一個協議,默認bolt
                .setPort(12200) // 設置一個端口,默認12200
                .setDaemon(false); // 非守護線程

        ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
            .setInterfaceId(HelloService.class.getName()) // 指定接口
            .setRef(new HelloServiceImpl()) // 指定實現
            .setServer(serverConfig); // 指定服務端

        providerConfig.export(); // 發布服務
    }

在Bolt協議下面,當服務端被調用的時候一個服務的流程如下所示:
BoltServerProcessor->FilterChain->ProviderExceptionFilter->FilterInvoker->RpcServiceContextFilter->FilterInvoker->ProviderBaggageFilter->FilterInvoker->ProviderTracerFilter->ProviderInvoker

BoltServerProcessor#handleRequest

@Override
public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, SofaRequest request) {
    // RPC內置上下文
    RpcInternalContext context = RpcInternalContext.getContext();
    context.setProviderSide(true);

    String appName = request.getTargetAppName();
    if (appName == null) {
        // 默認全局appName
        appName = (String) RpcRuntimeContext.get(RpcRuntimeContext.KEY_APPNAME);
    }

    // 是否鏈路異步化中
    boolean isAsyncChain = false;
    try { // 這個 try-finally 為了保證Context一定被清理
        processingCount.incrementAndGet(); // 統計值加1

        context.setRemoteAddress(bizCtx.getRemoteHost(), bizCtx.getRemotePort()); // 遠程地址
        context.setAttachment(RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT, asyncCtx); // 遠程返回的通道

        if (RpcInternalContext.isAttachmentEnable()) {
            InvokeContext boltInvokeCtx = bizCtx.getInvokeContext();
            if (boltInvokeCtx != null) {
                putToContextIfNotNull(boltInvokeCtx, InvokeContext.BOLT_PROCESS_WAIT_TIME,
                    context, RpcConstants.INTERNAL_KEY_PROCESS_WAIT_TIME); // rpc線程池等待時間 Long
            }
        }
        if (EventBus.isEnable(ServerReceiveEvent.class)) {
            EventBus.post(new ServerReceiveEvent(request));
        }

        // 開始處理
        SofaResponse response = null; // 響應,用於返回
        Throwable throwable = null; // 異常,用於記錄
        ProviderConfig providerConfig = null;
        String serviceName = request.getTargetServiceUniqueName();

        try { // 這個try-catch 保證一定有Response
            invoke:
            {
                if (!boltServer.isStarted()) { // 服務端已關閉
                    throwable = new SofaRpcException(RpcErrorType.SERVER_CLOSED, LogCodes.getLog(
                        LogCodes.WARN_PROVIDER_STOPPED, SystemInfo.getLocalHost() + ":" +
                            boltServer.serverConfig.getPort()));
                    response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage());
                    break invoke;
                }
                if (bizCtx.isRequestTimeout()) { // 加上丟棄超時的請求的邏輯
                    throwable = clientTimeoutWhenReceiveRequest(appName, serviceName, bizCtx.getRemoteAddress());
                    break invoke;
                }
                // 查找服務
                //在server.registerProcessor方法中設置 ProviderProxyInvoker
                Invoker invoker = boltServer.findInvoker(serviceName);
                if (invoker == null) {
                    throwable = cannotFoundService(appName, serviceName);
                    response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage());
                    break invoke;
                }
                if (invoker instanceof ProviderProxyInvoker) {
                    providerConfig = ((ProviderProxyInvoker) invoker).getProviderConfig();
                    // 找到服務后,打印服務的appName
                    appName = providerConfig != null ? providerConfig.getAppName() : null;
                }
                // 查找方法
                String methodName = request.getMethodName();
                //在server.registerProcessor方法中設置
                Method serviceMethod = ReflectCache.getOverloadMethodCache(serviceName, methodName,
                    request.getMethodArgSigs());
                if (serviceMethod == null) {
                    throwable = cannotFoundServiceMethod(appName, methodName, serviceName);
                    response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage());
                    break invoke;
                } else {
                    request.setMethod(serviceMethod);
                }

                // 真正調用
                response = doInvoke(serviceName, invoker, request);

                if (bizCtx.isRequestTimeout()) { // 加上丟棄超時的響應的邏輯
                    throwable = clientTimeoutWhenSendResponse(appName, serviceName, bizCtx.getRemoteAddress());
                    break invoke;
                }
            }
        } catch (Exception e) {
            // 服務端異常,不管是啥異常
            LOGGER.errorWithApp(appName, "Server Processor Error!", e);
            throwable = e;
            response = MessageBuilder.buildSofaErrorResponse(e.getMessage());
        }

        // Response不為空,代表需要返回給客戶端
        if (response != null) {
            RpcInvokeContext invokeContext = RpcInvokeContext.peekContext();
            isAsyncChain = CommonUtils.isTrue(invokeContext != null ?
                (Boolean) invokeContext.remove(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN) : null);
            // 如果是服務端異步代理模式,特殊處理,因為該模式是在業務代碼自主異步返回的
            if (!isAsyncChain) {
                // 其它正常請求
                try { // 這個try-catch 保證一定要記錄tracer
                    asyncCtx.sendResponse(response);
                } finally {
                    if (EventBus.isEnable(ServerSendEvent.class)) {
                        EventBus.post(new ServerSendEvent(request, response, throwable));
                    }
                }
            }
        }
    } catch (Throwable e) {
        // 可能有返回時的異常
        if (LOGGER.isErrorEnabled(appName)) {
            LOGGER.errorWithApp(appName, e.getMessage(), e);
        }
    } finally {
        processingCount.decrementAndGet();
        if (!isAsyncChain) {
            if (EventBus.isEnable(ServerEndHandleEvent.class)) {
                EventBus.post(new ServerEndHandleEvent());
            }
        }
        RpcInvokeContext.removeContext();
        RpcInternalContext.removeAllContext();
    }
}

這個方法主要做了如下幾件事:

  1. 設置上下文參數
  2. 從緩存中得到服務暴露時設置的invoker
  3. 為request設置method參數
  4. 調用doInvoke返回response
  5. 將response返回給客戶端

BoltServerProcessor#doInvoke

我們直接進入到doInvoke方法中,看是如何生成response對象的。

private SofaResponse doInvoke(String serviceName, Invoker invoker, SofaRequest request) throws SofaRpcException {
    // 開始調用,先記下當前的ClassLoader
    ClassLoader rpcCl = Thread.currentThread().getContextClassLoader();
    try {
        // 切換線程的ClassLoader到 服務 自己的ClassLoader
        ClassLoader serviceCl = ReflectCache.getServiceClassLoader(serviceName);
        Thread.currentThread().setContextClassLoader(serviceCl);
        return invoker.invoke(request);
    } finally {
        Thread.currentThread().setContextClassLoader(rpcCl);
    }
}

這里主要是為了獲取緩存里面加載被暴露服務的類加載器,這樣可以防止不同的類加載器之間一個類被加載多次。

然后調用過濾器鏈,最后進入到ProviderInvoker中

ProviderInvoker#invoke

@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
    SofaResponse sofaResponse = new SofaResponse();
    long startTime = RpcRuntimeContext.now();
    try {
        // 反射 真正調用業務代碼
        Method method = request.getMethod();
        if (method == null) {
            throw new SofaRpcException(RpcErrorType.SERVER_FILTER, "Need decode method first!");
        }
        Object result = method.invoke(providerConfig.getRef(), request.getMethodArgs());

        sofaResponse.setAppResponse(result);
    } catch (IllegalArgumentException e) { // 非法參數,可能是實現類和接口類不對應)
        sofaResponse.setErrorMsg(e.getMessage());
    } catch (IllegalAccessException e) { // 如果此 Method 對象強制執行 Java 語言訪問控制,並且底層方法是不可訪問的
        sofaResponse.setErrorMsg(e.getMessage());
    } catch (InvocationTargetException e) { // 業務代碼拋出異常
        cutCause(e.getCause());
        sofaResponse.setAppResponse(e.getCause());
    } finally {
        if (RpcInternalContext.isAttachmentEnable()) {
            long endTime = RpcRuntimeContext.now();
            RpcInternalContext.getContext().setAttachment(RpcConstants.INTERNAL_KEY_IMPL_ELAPSE,
                endTime - startTime);
        }
    }

    return sofaResponse;
}

到最后我們發現,服務端會通過反射調用被暴露服務的方法,封裝成Response類返回。

我們再次回到BoltServerProcessor#handleRequest方法中

....//忽略其他內容
// Response不為空,代表需要返回給客戶端
if (response != null) {
    RpcInvokeContext invokeContext = RpcInvokeContext.peekContext();
    isAsyncChain = CommonUtils.isTrue(invokeContext != null ?
        (Boolean) invokeContext.remove(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN) : null);
    // 如果是服務端異步代理模式,特殊處理,因為該模式是在業務代碼自主異步返回的
    if (!isAsyncChain) {
        // 其它正常請求
        try { // 這個try-catch 保證一定要記錄tracer
            asyncCtx.sendResponse(response);
        } finally {
            if (EventBus.isEnable(ServerSendEvent.class)) {
                EventBus.post(new ServerSendEvent(request, response, throwable));
            }
        }
    }
}
....//忽略其他內容

最后我們的response實例會使用netty傳給客戶端。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM