目前流行的微服務,Rest風格的Http模式遠程調用大行其道。 Rest格式的調用,可以做到對Provider方的代碼0依賴,做到完全的解耦,規避Provider方接口升級帶來的各種問題。
在日常的業務中,會涉及到各種協議的多系統間交互,比如Hessian、老系統常用的Webservice 等Http的遠程調用,Dubbo 都提供了封裝與擴展。
相比SpringCloud , Dubbo 一個接口發布多協議,一個系統調用多個外部協議接口時,會非常的便利,幾乎可以做到代碼的0 增加。
學習與理解這些不同協議的封裝,能加深對Dubbo 設計的理解,指導自己的編寫新的擴展實現。
一、 Dubbo的 http 調用協議配置
Dubbo 支持http調用的協議比較多,在官網 協議參考手冊 里面介紹的就有:hessian、http、webservice、rest 這4種, 在Dubbo 的2.7.3版本的代碼里面,有jsonrpc、xml這2中應該也是透過http協議進行調用的。
通過繼承圖,我們可以看到,這幾個協議都是直接繼承AbstractProxyProtocol 的。
以webservice 協議為例:在dubbo配置里面加上:
<dubbo:protocol name="webservice" server="servlet" port="8080" contextpath="webservice"/>
然后在web.xml里面配置dubbo的DispatchServlet攔截,就可以發布webservice協議的服務。
web.xml 配置:
<servlet> <servlet-name>dubboDispatch</servlet-name> <servlet-class>org.apache.dubbo.remoting.http.servlet.DispatcherServlet</servlet-class> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>dubboDispatch</servlet-name> <url-pattern>/webservice/*</url-pattern> </servlet-mapping>
代碼以外部Servlet模式發布服務,因為現在實際代碼運行的時候,更多的是在外部容器里面,servlet模式,應用更廣,代碼閱讀更清晰。 內嵌jetty或者tomcat模式,只是多了自己啟動服務器,手動配置servlet而已(個人猜測,沒看代碼)。
發布的時候,port 為 應用的端口號,contextpath 為servlet 的映射路徑。
二、 Webservice 協議服務 export
Dubbo 的http 相關協議,沒有默認采用netty傳輸,dubbo編碼 那一堆理不斷剪還亂的channel、channelhandler 包裝鏈。 借助成熟的應用服務器,cxf相關的開源組件, 封裝與發布都非常簡單。
成熟的應用服務器,承擔了原生netty傳輸里面的 線程控制,channel重用等邏輯。
cxf開源組件, 承擔了數據的編碼、解碼,底層數據的封裝等邏輯。
有了之前Dubbo 默認服務的基礎,我們直接從 WebServiceProtocol 協議的 export 方法開始。使用父類 AbstractProxyProtocol的 export方法。 所以dubbo的幾個http 協議都是相同的處理模式。
1 public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException { 2 final String uri = serviceKey(invoker.getUrl()); 3 Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri); 4 if (exporter != null) { 5 // When modifying the configuration through override, you need to re-expose the newly modified service. 6 if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) { 7 return exporter; 8 } 9 } 10 //注釋 1 11 final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl()); 12 13 //注釋 2 14 exporter = new AbstractExporter<T>(invoker) { 15 @Override 16 public void unexport() { 17 super.unexport(); 18 exporterMap.remove(uri); 19 if (runnable != null) { 20 try { 21 runnable.run(); 22 } catch (Throwable t) { 23 logger.warn(t.getMessage(), t); 24 } 25 } 26 } 27 }; 28 exporterMap.put(uri, exporter); //將uri 與exporter放入map, 29 return exporter; 30 }
整個方法很簡單,也比較清晰,沒有復雜的邏輯。 export 放入的入參 invoker ,是一個經過多從包裝的invoker 的filter鏈,最底層是實現類的ref調用。
注釋1. proxyFactory.getProxy(invoker,true) ,作用是將invoker 鏈,代理成接口的實現類。然后調用 各個子類的doExport 方法,完成接口服務的綁定與配置。 返回一個runnable,runnable的邏輯是注銷接口的操作。
注釋2. 將invoker 與runnable 對象,封裝為exporter
進入WebServiceProtocol 的doExport方法:
protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException { String addr = getAddr(url); HttpServer httpServer = serverMap.get(addr); if (httpServer == null) { //注釋1 。 將webServiceHandler 綁定url ,只會綁定一次 httpServer = httpBinder.bind(url, new WebServiceHandler()); serverMap.put(addr, httpServer); } //注釋 2 調用CXF的 組件,構建發布Service EndPoint final ServerFactoryBean serverFactoryBean = new ServerFactoryBean(); serverFactoryBean.setAddress(url.getAbsolutePath()); serverFactoryBean.setServiceClass(type); serverFactoryBean.setServiceBean(impl); serverFactoryBean.setBus(bus); serverFactoryBean.setDestinationFactory(transportFactory); //注釋3. transportFactory后續會重用到 serverFactoryBean.create(); return new Runnable() { //注釋4. 返回runnable,邏輯主要是銷毀這個Server @Override public void run() { if(serverFactoryBean.getServer()!= null) { serverFactoryBean.getServer().destroy(); } if(serverFactoryBean.getBus()!=null) { serverFactoryBean.getBus().shutdown(true); } } }; }
注釋1. 將WebServiceHandler 與url綁定,實際操作是 將webservicehandler 在DispatchServlet里面注冊。
配置的是servlet,所以httpbinder 就是 ServletHttpBinder ,bind方法 new 一個 ServletHttpServer 。 在里面就一個關鍵代碼:
DispatcherServlet.addHttpHandler(url.getParameter(Constants.BIND_PORT_KEY, 8080), handler); ,將handler 放入DispatchServlet的map里面
同一個server ,getAddr() 方法獲取到的addr 都是一樣的,所以 注釋1.只會被運行一次。
注釋2. 就看開始調用CXF的組件,構建接口的Endpoint 。 用的的變量 bus、transportFactory都是 new出來的,都是cxf的代碼,這個不深入,目前會用即可。
transportFactory 應該是一個比較重要的對象,保存了接口的實現調用。 在webservicehandler里面,invoke的時候,會用到。
注釋4. 構建返回的runnable, 運行的時候,會銷戶這個serverFactoryBean 。
接下來看看WebServiceHandler 類,方法也比較簡單:
private class WebServiceHandler implements HttpHandler { private volatile ServletController servletController; @Override public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { if (servletController == null) { HttpServlet httpServlet = DispatcherServlet.getInstance(); if (httpServlet == null) { response.sendError(500, "No such DispatcherServlet instance."); return; } synchronized (this) { if (servletController == null) { //注釋1. servletController 雙重檢測不存在,第一次調用的時候就 構建一個, servletController = new ServletController(transportFactory.getRegistry(), httpServlet.getServletConfig(), httpServlet); } } } RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort()); servletController.invoke(request, response);//注釋2. 調用請求,轉交給cxf處理並返回 } }
主要就2個點:
注釋1. 雙重檢測 servletController 是否存在,不存在就構建一個。 在構建的時候用到了httpservlet 。
這是延遲構建,所以不用像rest協議需要在啟動的時候配置BootstrapListener,並且還需要在spring的listener之前配置。
注釋2. 調用cxf的方法,執行請求,並在response里面寫返回。
我們再看看 DispatcherServlet 的service 方法:
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { HttpHandler handler = handlers.get(request.getLocalPort()); if (handler == null) {// service not found. response.sendError(HttpServletResponse.SC_NOT_FOUND, "Service not found."); } else { handler.handle(request, response); } }
mapping到請求后,從handler的map里面,獲取handler,直接調用handler方法。 接上面代碼就是:WebServiceHandler.handle()方法,清晰明了。
發布: 1. 在transportFactory 里面注冊服務 。 2. handler 存放transportFactory , 3. key=端口,value = handler 放入DispatchServlet的map里面。
被調用: 1. DispatchServlet 接收請求 2. 根據port查找handler 3. 調用handler 並將結果寫入response (使用transportFactory 保存的信息,處理請求)
二、 Webservice 協議服務 refer
refer 方法, 方法是在AbstractProtocol 里面的,調用AbstractProxyProtocol.protocolBindingRefer() ,再調用WebServiceProcotol.doRefer()
方法簡單,這個我們倒着看, 先看 WebServiceProcotol.doRefer()
protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException { ClientProxyFactoryBean proxyFactoryBean = new ClientProxyFactoryBean(); proxyFactoryBean.setAddress(url.setProtocol("http").toIdentityString()); proxyFactoryBean.setServiceClass(serviceType); proxyFactoryBean.setBus(bus); T ref = (T) proxyFactoryBean.create(); Client proxy = ClientProxy.getClient(ref); HTTPConduit conduit = (HTTPConduit) proxy.getConduit(); HTTPClientPolicy policy = new HTTPClientPolicy(); policy.setConnectionTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT)); policy.setReceiveTimeout(url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT)); conduit.setClient(policy); return ref; }
WebServiceProcotol.doRefer() 這個方法比較簡單, 入參接口class 與provider 的url ,調用CXF的代碼,返回一個接口實現的代理。
AbstractProxyProtocol.protocolBindingRefer()方法:
protected <T> Invoker<T> protocolBindingRefer(final Class<T> type, final URL url) throws RpcException { //doRefer的結果,接口實現代理, 由Dubbo的proxyFactory封裝成Invoker 返回 final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url); Invoker<T> invoker = new AbstractInvoker<T>(type, url) { //對Invodker 再次封裝,捕獲CXF調用時的各種異常 @Override protected Result doInvoke(Invocation invocation) throws Throwable { try { Result result = target.invoke(invocation); //cxf遠程調用,異常被捕獲封裝 // FIXME result is an AsyncRpcResult instance. Throwable e = result.getException(); if (e != null) { for (Class<?> rpcException : rpcExceptions) { if (rpcException.isAssignableFrom(e.getClass())) { throw getRpcException(type, url, invocation, e); } } } return result; } catch (RpcException e) { if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) { e.setCode(getErrorCode(e.getCause())); } throw e; } catch (Throwable e) { throw getRpcException(type, url, invocation, e); } } }; invokers.add(invoker); return invoker; }
這個方法邏輯比較清晰:
1. 先調用doRefer方法,將接口、url 與cxf組合起來,返回接口的 cxf 代理實現, 再由proxyFactory 封裝為Invoker。
2. Invoker 再次被封裝一次,捕獲調用時的各種異常。
3. 封裝的Invoker 返回,在AbstractProtocol 的refer里面,再次被封裝成AsyncToSyncInvoker ,后被返回。
AbstractProtocol.refer() 最終返回的Invoker ,會被Directory 放入Invoker list里面,供路由選擇。
至此,refer 方法結束, 總得來說,有3個主要步驟:
1. 將接口class 與provider ,調用cxf的代碼,生成proxy
2. 將proxy生成Invoker,供上層代碼使用。
3. invoker 的封裝
三、其他
說下之前沒注意到的2個抽象Class : AbstractInvoker 與 AbstractProxyInvoker
AbstractProxyInvoker 是將 接口實現 組裝成Invoker,invoke 里面有 CompletableFuture ,這個invoke 是全鏈路異步實現的關鍵代碼,將最終的調用異步化。
AbstractInvoker 一般是對Invoker 的再次封裝,調用前做一些基礎數據的設置,異常轉換
四、總結
以Dubbo的WebServiceProtocol 為例,介紹了dubbo的 關於http 的協議實現。 借助成熟的服務容器,協議開源組件,擴展協議比起 默認的netty傳輸,dubbo協議要簡單很多。基本只需要做好 開源組件的設置就好。 其他的協議,可以對照源碼,都是一個模式,具體細節會有所不同。
對於REST 協議,需要在啟動最開始 配置 org.apache.dubbo.remoting.http.servlet.BootstrapListener ,是因為在 Rest 的服務發布時,需要用到ServletContext, 如果將這個 配置在Spring 的listener之后,spring 啟動的時候,發布服務初始化就會失敗。
一般dubbo都默認用dubbo協議, 在與外圍系統交互的時候,dubbo的多協議適配,會比較方便。
外圍交互,會有一端是非dubbo框架,所以無法使用到dubbo的路由、負載均衡、動態配置等服務治理特性。