Dubbo源碼學習筆記 之 調用&發布 Rest&WebService協議服務


  目前流行的微服務,Rest風格的Http模式遠程調用大行其道。 Rest格式的調用,可以做到對Provider方的代碼0依賴,做到完全的解耦,規避Provider方接口升級帶來的各種問題。

      在日常的業務中,會涉及到各種協議的多系統間交互,比如Hessian、老系統常用的Webservice 等Http的遠程調用,Dubbo 都提供了封裝與擴展。

     相比SpringCloud , Dubbo 一個接口發布多協議,一個系統調用多個外部協議接口時,會非常的便利,幾乎可以做到代碼的0 增加。

     學習與理解這些不同協議的封裝,能加深對Dubbo 設計的理解,指導自己的編寫新的擴展實現。

 

 一、 Dubbo的 http 調用協議配置

         Dubbo 支持http調用的協議比較多,在官網 協議參考手冊 里面介紹的就有:hessian、http、webservice、rest 這4種, 在Dubbo 的2.7.3版本的代碼里面,有jsonrpc、xml這2中應該也是透過http協議進行調用的。

         通過繼承圖,我們可以看到,這幾個協議都是直接繼承AbstractProxyProtocol 的。  

                                         Dubbo的http協議繼承圖

以webservice 協議為例:在dubbo配置里面加上:

 <dubbo:protocol name="webservice" server="servlet" port="8080" contextpath="webservice"/>

然后在web.xml里面配置dubbo的DispatchServlet攔截,就可以發布webservice協議的服務。

web.xml 配置:

<servlet>
    <servlet-name>dubboDispatch</servlet-name>
    <servlet-class>org.apache.dubbo.remoting.http.servlet.DispatcherServlet</servlet-class>
    <load-on-startup>1</load-on-startup>
  </servlet>
  <servlet-mapping>
    <servlet-name>dubboDispatch</servlet-name>
    <url-pattern>/webservice/*</url-pattern>
  </servlet-mapping>

代碼以外部Servlet模式發布服務,因為現在實際代碼運行的時候,更多的是在外部容器里面,servlet模式,應用更廣,代碼閱讀更清晰。 內嵌jetty或者tomcat模式,只是多了自己啟動服務器,手動配置servlet而已(個人猜測,沒看代碼)。

  發布的時候,port 為 應用的端口號,contextpath 為servlet 的映射路徑。 

 

二、 Webservice 協議服務 export

    Dubbo 的http 相關協議,沒有默認采用netty傳輸,dubbo編碼 那一堆理不斷剪還亂的channel、channelhandler 包裝鏈。 借助成熟的應用服務器,cxf相關的開源組件, 封裝與發布都非常簡單。

   成熟的應用服務器,承擔了原生netty傳輸里面的 線程控制,channel重用等邏輯。

   cxf開源組件, 承擔了數據的編碼、解碼,底層數據的封裝等邏輯。

   有了之前Dubbo 默認服務的基礎,我們直接從 WebServiceProtocol 協議的 export 方法開始。使用父類 AbstractProxyProtocol的 export方法。 所以dubbo的幾個http 協議都是相同的處理模式。

 1  public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
 2         final String uri = serviceKey(invoker.getUrl());
 3         Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
 4         if (exporter != null) {
 5             // When modifying the configuration through override, you need to re-expose the newly modified service.
 6             if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) {
 7                 return exporter;
 8             }
 9         }
10         //注釋 1 
11         final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
12         
13         //注釋  2
14         exporter = new AbstractExporter<T>(invoker) {
15             @Override
16             public void unexport() {
17                 super.unexport();
18                 exporterMap.remove(uri);
19                 if (runnable != null) {
20                     try {
21                         runnable.run();
22                     } catch (Throwable t) {
23                         logger.warn(t.getMessage(), t);
24                     }
25                 }
26             }
27         };
28         exporterMap.put(uri, exporter);  //將uri 與exporter放入map, 
29         return exporter;
30     }

整個方法很簡單,也比較清晰,沒有復雜的邏輯。  export 放入的入參 invoker ,是一個經過多從包裝的invoker 的filter鏈,最底層是實現類的ref調用。

  注釋1.  proxyFactory.getProxy(invoker,true) ,作用是將invoker 鏈,代理成接口的實現類。然后調用 各個子類的doExport 方法,完成接口服務的綁定與配置。 返回一個runnable,runnable的邏輯是注銷接口的操作。

  注釋2.  將invoker 與runnable 對象,封裝為exporter 

進入WebServiceProtocol 的doExport方法:

protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException {
        String addr = getAddr(url);
        HttpServer httpServer = serverMap.get(addr);
        if (httpServer == null) {
            //注釋1 。 將webServiceHandler 綁定url ,只會綁定一次
            httpServer = httpBinder.bind(url, new WebServiceHandler());
            serverMap.put(addr, httpServer);
        }

        //注釋 2 調用CXF的 組件,構建發布Service EndPoint
        final ServerFactoryBean serverFactoryBean = new ServerFactoryBean();
        serverFactoryBean.setAddress(url.getAbsolutePath());
        serverFactoryBean.setServiceClass(type);
        serverFactoryBean.setServiceBean(impl);
        serverFactoryBean.setBus(bus);
        serverFactoryBean.setDestinationFactory(transportFactory); //注釋3. transportFactory后續會重用到
        serverFactoryBean.create();
        return new Runnable() { //注釋4. 返回runnable,邏輯主要是銷毀這個Server
            @Override
            public void run() {
                if(serverFactoryBean.getServer()!= null) {
                    serverFactoryBean.getServer().destroy();
                }
                if(serverFactoryBean.getBus()!=null) {
                    serverFactoryBean.getBus().shutdown(true);
                }
            }
        };
    }

注釋1.   將WebServiceHandler 與url綁定,實際操作是 將webservicehandler 在DispatchServlet里面注冊。

          配置的是servlet,所以httpbinder 就是 ServletHttpBinder ,bind方法 new 一個 ServletHttpServer 。  在里面就一個關鍵代碼:

    DispatcherServlet.addHttpHandler(url.getParameter(Constants.BIND_PORT_KEY, 8080), handler);   ,將handler 放入DispatchServlet的map里面

         同一個server ,getAddr() 方法獲取到的addr 都是一樣的,所以 注釋1.只會被運行一次。

注釋2. 就看開始調用CXF的組件,構建接口的Endpoint 。 用的的變量 bus、transportFactory都是 new出來的,都是cxf的代碼,這個不深入,目前會用即可。

        transportFactory 應該是一個比較重要的對象,保存了接口的實現調用。  在webservicehandler里面,invoke的時候,會用到。

注釋4. 構建返回的runnable, 運行的時候,會銷戶這個serverFactoryBean 。

 

接下來看看WebServiceHandler 類,方法也比較簡單: 

private class WebServiceHandler implements HttpHandler {

        private volatile ServletController servletController;

        @Override
        public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
            if (servletController == null) {
                HttpServlet httpServlet = DispatcherServlet.getInstance();
                if (httpServlet == null) {
                    response.sendError(500, "No such DispatcherServlet instance.");
                    return;
                }
                synchronized (this) {
                    if (servletController == null) {  //注釋1. servletController 雙重檢測不存在,第一次調用的時候就 構建一個,
                        servletController = new ServletController(transportFactory.getRegistry(), httpServlet.getServletConfig(), httpServlet);
                    }
                }
            }
            RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
            servletController.invoke(request, response);//注釋2. 調用請求,轉交給cxf處理並返回
        }

    }

主要就2個點:

注釋1. 雙重檢測 servletController 是否存在,不存在就構建一個。 在構建的時候用到了httpservlet 。

      這是延遲構建,所以不用像rest協議需要在啟動的時候配置BootstrapListener,並且還需要在spring的listener之前配置。

注釋2. 調用cxf的方法,執行請求,並在response里面寫返回。

 

我們再看看 DispatcherServlet 的service 方法:

protected void service(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        HttpHandler handler = handlers.get(request.getLocalPort());
        if (handler == null) {// service not found.
            response.sendError(HttpServletResponse.SC_NOT_FOUND, "Service not found.");
        } else {
            handler.handle(request, response);
        }
    }

mapping到請求后,從handler的map里面,獲取handler,直接調用handler方法。 接上面代碼就是:WebServiceHandler.handle()方法,清晰明了。

發布: 1. 在transportFactory 里面注冊服務 。 2. handler 存放transportFactory , 3. key=端口,value  = handler  放入DispatchServlet的map里面。

被調用: 1. DispatchServlet 接收請求  2. 根據port查找handler  3. 調用handler 並將結果寫入response (使用transportFactory 保存的信息,處理請求)

 

二、 Webservice 協議服務 refer

    refer 方法, 方法是在AbstractProtocol 里面的,調用AbstractProxyProtocol.protocolBindingRefer() ,再調用WebServiceProcotol.doRefer()

   方法簡單,這個我們倒着看, 先看 WebServiceProcotol.doRefer()

protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException {
        ClientProxyFactoryBean proxyFactoryBean = new ClientProxyFactoryBean();
        proxyFactoryBean.setAddress(url.setProtocol("http").toIdentityString());
        proxyFactoryBean.setServiceClass(serviceType);
        proxyFactoryBean.setBus(bus);
        T ref = (T) proxyFactoryBean.create();
        Client proxy = ClientProxy.getClient(ref);
        HTTPConduit conduit = (HTTPConduit) proxy.getConduit();
        HTTPClientPolicy policy = new HTTPClientPolicy();
        policy.setConnectionTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT));
        policy.setReceiveTimeout(url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT));
        conduit.setClient(policy);
        return ref;
    }

WebServiceProcotol.doRefer() 這個方法比較簡單, 入參接口class 與provider 的url ,調用CXF的代碼,返回一個接口實現的代理。 

 

AbstractProxyProtocol.protocolBindingRefer()方法:

protected <T> Invoker<T> protocolBindingRefer(final Class<T> type, final URL url) throws RpcException {
        //doRefer的結果,接口實現代理, 由Dubbo的proxyFactory封裝成Invoker  返回
        final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
        Invoker<T> invoker = new AbstractInvoker<T>(type, url) {  //對Invodker 再次封裝,捕獲CXF調用時的各種異常
            @Override
            protected Result doInvoke(Invocation invocation) throws Throwable {
                try {
                    Result result = target.invoke(invocation);   //cxf遠程調用,異常被捕獲封裝
                    // FIXME result is an AsyncRpcResult instance.
                    Throwable e = result.getException();
                    if (e != null) {
                        for (Class<?> rpcException : rpcExceptions) {
                            if (rpcException.isAssignableFrom(e.getClass())) {
                                throw getRpcException(type, url, invocation, e);
                            }
                        }
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
                        e.setCode(getErrorCode(e.getCause()));
                    }
                    throw e;
                } catch (Throwable e) {
                    throw getRpcException(type, url, invocation, e);
                }
            }
        };
        invokers.add(invoker);
        return invoker;
    }

 這個方法邏輯比較清晰:

     1. 先調用doRefer方法,將接口、url 與cxf組合起來,返回接口的 cxf 代理實現, 再由proxyFactory 封裝為Invoker。

     2. Invoker 再次被封裝一次,捕獲調用時的各種異常。

    3. 封裝的Invoker 返回,在AbstractProtocol 的refer里面,再次被封裝成AsyncToSyncInvoker ,后被返回。

 

AbstractProtocol.refer() 最終返回的Invoker ,會被Directory 放入Invoker list里面,供路由選擇。 

至此,refer 方法結束, 總得來說,有3個主要步驟:

   1. 將接口class 與provider ,調用cxf的代碼,生成proxy 

    2. 將proxy生成Invoker,供上層代碼使用。

   3. invoker 的封裝

三、其他

  說下之前沒注意到的2個抽象Class : AbstractInvoker 與 AbstractProxyInvoker

 AbstractProxyInvoker 是將 接口實現 組裝成Invoker,invoke 里面有 CompletableFuture ,這個invoke 是全鏈路異步實現的關鍵代碼,將最終的調用異步化。

 AbstractInvoker 一般是對Invoker 的再次封裝,調用前做一些基礎數據的設置,異常轉換

 

四、總結

    以Dubbo的WebServiceProtocol 為例,介紹了dubbo的 關於http 的協議實現。 借助成熟的服務容器,協議開源組件,擴展協議比起 默認的netty傳輸,dubbo協議要簡單很多。基本只需要做好 開源組件的設置就好。 其他的協議,可以對照源碼,都是一個模式,具體細節會有所不同。

   對於REST 協議,需要在啟動最開始 配置 org.apache.dubbo.remoting.http.servlet.BootstrapListener ,是因為在 Rest 的服務發布時,需要用到ServletContext, 如果將這個 配置在Spring 的listener之后,spring 啟動的時候,發布服務初始化就會失敗。   

  一般dubbo都默認用dubbo協議, 在與外圍系統交互的時候,dubbo的多協議適配,會比較方便。

  外圍交互,會有一端是非dubbo框架,所以無法使用到dubbo的路由、負載均衡、動態配置等服務治理特性。 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM