簡述RPC原理實現


 

 

 

前言

架構的改變,往往是因為業務規模的擴張。

隨着業務規模的擴張,為了滿足業務對技術的要求,技術架構需要從單體應用架構升級到分布式服務架構,來降低公司的技術成本,更好的適應業務的發展。

分布式服務架構的諸多優勢,這里就不一一列舉了,今天圍繞的話題是服務框架,為了推行服務化,必然需要一套易用的服務框架,來支撐業務技術架構升級。

 

服務框架

服務架構的核心是服務調用,分布式服務架構中的服務分布在不同主機的不同進程上,服務的調用跟單體應用進程內方法調用的本質區別就是需要借助網絡來進行通信。

 

 

RPC Demo實現思路

原作者梁飛,在此記錄下他非常簡潔的rpc實現思路。

 

核心框架類

  /*
   * Copyright 2011 Alibaba.com All right reserved. This software is the
   * confidential and proprietary information of Alibaba.com ("Confidential
   * Information"). You shall not disclose such Confidential Information and shall
   * use it only in accordance with the terms of the license agreement you entered
   * into with Alibaba.com.
   */
  package com.alibaba.study.rpc.framework;
  ​
  import java.io.ObjectInputStream;
  import java.io.ObjectOutputStream;
  import java.lang.reflect.InvocationHandler;
  import java.lang.reflect.Method;
  import java.lang.reflect.Proxy;
  import java.net.ServerSocket;
  import java.net.Socket;
  ​
  /**
   * RpcFramework
   * 
   * @author william.liangf
   */
  public class RpcFramework {
  ​
      /**
       * 暴露服務
       * 
       * @param service 服務實現
       * @param port 服務端口
       * @throws Exception
       */
      public static void export(final Object service, int port) throws Exception {
          if (service == null)
              throw new IllegalArgumentException("service instance == null");
          if (port <= 0 || port > 65535)
              throw new IllegalArgumentException("Invalid port " + port);
          System.out.println("Export service " + service.getClass().getName() + " on port " + port);
          ServerSocket server = new ServerSocket(port);
          for(;;) {
              try {
                  final Socket socket = server.accept();
                  new Thread(new Runnable() {
                      @Override
                      public void run() {
                          try {
                              try {
                                  ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                                  try {
                                      String methodName = input.readUTF();
                                      Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                                      Object[] arguments = (Object[])input.readObject();
                                      ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                                      try {
                                          Method method = service.getClass().getMethod(methodName, parameterTypes);
                                          Object result = method.invoke(service, arguments);
                                          output.writeObject(result);
                                      } catch (Throwable t) {
                                          output.writeObject(t);
                                      } finally {
                                          output.close();
                                      }
                                  } finally {
                                      input.close();
                                  }
                              } finally {
                                  socket.close();
                              }
                          } catch (Exception e) {
                              e.printStackTrace();
                          }
                      }
                  }).start();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
  ​
      /**
       * 引用服務
       * 
       * @param <T> 接口泛型
       * @param interfaceClass 接口類型
       * @param host 服務器主機名
       * @param port 服務器端口
       * @return 遠程服務
       * @throws Exception
       */
      @SuppressWarnings("unchecked")
      public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {
          if (interfaceClass == null)
              throw new IllegalArgumentException("Interface class == null");
          if (! interfaceClass.isInterface())
              throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");
          if (host == null || host.length() == 0)
              throw new IllegalArgumentException("Host == null!");
          if (port <= 0 || port > 65535)
              throw new IllegalArgumentException("Invalid port " + port);
          System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
          return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() {
              public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
                  Socket socket = new Socket(host, port);
                  try {
                      ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                      try {
                          output.writeUTF(method.getName());
                          output.writeObject(method.getParameterTypes());
                          output.writeObject(arguments);
                          ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                          try {
                              Object result = input.readObject();
                              if (result instanceof Throwable) {
                                  throw (Throwable) result;
                              }
                              return result;
                          } finally {
                              input.close();
                          }
                      } finally {
                          output.close();
                      }
                  } finally {
                      socket.close();  
                }  
            }  
        });  
    }  
​  
}

 

 

定義服務接口

 /*
   * Copyright 2011 Alibaba.com All right reserved. This software is the
   * confidential and proprietary information of Alibaba.com ("Confidential
   * Information"). You shall not disclose such Confidential Information and shall
   * use it only in accordance with the terms of the license agreement you entered
   * into with Alibaba.com.
   */
  package com.alibaba.study.rpc.test;
  ​
  /**
   * HelloService
   * 
   * @author william.liangf
   */
  public interface HelloService {
  ​
      String hello(String name);
  ​
  }

 

 

實現服務

 1   /*
 2    * Copyright 2011 Alibaba.com All right reserved. This software is the
 3    * confidential and proprietary information of Alibaba.com ("Confidential
 4    * Information"). You shall not disclose such Confidential Information and shall
 5    * use it only in accordance with the terms of the license agreement you entered
 6    * into with Alibaba.com.
 7    */
 8   package com.alibaba.study.rpc.test;
 9 10   /**
11    * HelloServiceImpl
12    * 
13    * @author william.liangf
14    */
15   public class HelloServiceImpl implements HelloService {
16 17       public String hello(String name) {
18           return "Hello " + name;
19       }
20 21   }

 

 

暴露服務

 /*
   * Copyright 2011 Alibaba.com All right reserved. This software is the
   * confidential and proprietary information of Alibaba.com ("Confidential
   * Information"). You shall not disclose such Confidential Information and shall
   * use it only in accordance with the terms of the license agreement you entered
   * into with Alibaba.com.
   */
  package com.alibaba.study.rpc.test;
  ​
  import com.alibaba.study.rpc.framework.RpcFramework;
  ​
  /**
   * RpcProvider
   * 
   * @author william.liangf
   */
  public class RpcProvider {
  ​
      public static void main(String[] args) throws Exception {
          HelloService service = new HelloServiceImpl();
          RpcFramework.export(service, 1234);
      }
  ​
  }

 

 

引用服務

 1   /*
 2    * Copyright 2011 Alibaba.com All right reserved. This software is the
 3    * confidential and proprietary information of Alibaba.com ("Confidential
 4    * Information"). You shall not disclose such Confidential Information and shall
 5    * use it only in accordance with the terms of the license agreement you entered
 6    * into with Alibaba.com.
 7    */
 8   package com.alibaba.study.rpc.test;
 9 10   import com.alibaba.study.rpc.framework.RpcFramework;
11 12   /**
13    * RpcConsumer
14    * 
15    * @author william.liangf
16    */
17   public class RpcConsumer {
18       
19       public static void main(String[] args) throws Exception {
20           HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
21           for (int i = 0; i < Integer.MAX_VALUE; i ++) {
22               String hello = service.hello("World" + i);
23               System.out.println(hello);
24               Thread.sleep(1000);
25           }
26       }
27       
28   }

 

 

小結

梁飛大大的博客使用原生的jdk api就展現給各位讀者一個生動形象的rpc demo,實在是強。

這個簡單的例子的實現思路是:

  • 使用阻塞的socket IO流來進行server和client的通信,也就是rpc應用中服務提供方和服務消費方。並且是端對端的,用端口號來直接進行通信

  • 方法的遠程調用使用的是jdk的動態代理

  • 參數的序列化也是使用的最簡單的objectStream

 


 

服務框架

服務框架的核心是服務調用,分布式服務架構中的服務分布在不同主機的不同進程上,服務的調用跟單體應用進程內方法調用的本質區別就是需要借助網絡來進行通信。

下圖是服務框架的架構圖,主流的服務框架的實現都是這套架構,如 Dubbo、SpringCloud 等。

 

 

  • Invoker 是服務的調用方

  • Provider 是服務的提供方

  • Registry 是服務的注冊中心

  • Monitor 是服務的監控模塊

Invoker 和 Provider 分別作為服務的調用和被調用方,這點很明確。

但是僅有這兩者還是不夠的,因為作為調用方需要知道服務部署在哪,去哪調用服務,所以有了 Registry 模塊,它的功能是給服務提供方注冊服務,給服務調用方發現服務。

Monitor 作為服務的監控模塊,負責服務的調用統計以及鏈路分析功能,也是服務治理重要的一環。

 

核心模塊

下圖是服務框架的流程圖,我們分服務注冊、發現、調用三個方面來進行流程分解。

 

 

服務注冊是服務提供方向注冊中心注冊服務信息;當提供服務應用下線時,負責將服務注冊信息從注冊中心刪去。

服務發現是服務調用方從注冊中心訂閱服務,獲取服務提供方的相關信息;當服務注冊信息有變更時,注冊中心負責通知到服務調用方。

服務調用是服務調用方通過從注冊中心拿到服務提供方的信息,向服務提供方發起服務調用,獲取調用結果。

對照上述流程圖,我們按照請求的具體過程進行分析。

作為服務調用方 Invoker 的具體流程是:

  1. Request 從下往上,由於服務調用方只能拿到服務提供方提供的 API 接口或者 API 接口的 JAR 包,所以服務調用方需要經過一層代理 Proxy 來偽裝服務的實現;

  2. 經過代理 Proxy 之后,會經過路由 Router、負載均衡 LoadBalance 模塊,目的是從一堆從注冊中心拿到的服務提供方信息中選出最合適的服務提供方機器進行調用。另外,還會經過 Monitor 監控等模塊;

  3. 接着會經過服務編碼 Codec 模塊,這個模塊的目的是因為請求在網絡傳輸前需要按照通信協議以及對象的序列化方式,對傳輸的請求進行編解碼;

  4. 最終會經過網絡通信 Transporter 模塊,這個模塊將 Codec 編碼好的請求進行傳輸。

 

作為服務提供方 Provider 的具體流程是:

  1. Request 從上往下,經過網絡通信 Transporter 模塊,獲取到的是由調用方發送的Request字節數組。

  2. 接着經過服務編碼 Codec 模塊,根據通信協議解出一個完整的請求包,然后使用具體的序列化方式反序列化成請求對象。

  3. 緊接着會經過監控、限流、鑒權等模塊。

  4. 最終會執行服務的真正業務實現 ServiceImpl,執行完后,結果按原路返回。

     

按照上述流程分解一個服務框架的相關工作,再去看一些開源的服務框架也就不難理解了。

一般服務框架的核心模塊應該有注冊中心、網絡通信、服務編碼(通信協議、序列化)、服務路由、負載均衡,服務鑒權,可用性保障(服務降級、服務限流、服務隔離)、服務監控(Metrics、Trace)、配置中心、服務治理平台等。

 

注冊中心

注冊中心是用來注冊和發現服務的,需要具備的基本功能有注冊服務、下線服務、發現服務、通知服務變更等。

當前使用比較多的開源注冊中心有 ZookeeperETCDEureka 等。

Zookeeper 與 ETCD 在整體架構上都比較類似,使用方式非常便捷,應用比較廣泛。

這兩套系統按照 CAP 理論,屬於 CP 系統,可用性會差一點,但是作為中小規模服務注冊中心,還是游刃有余,並沒有某些人說的那么差勁。 Eureka 是 Spring Cloud Netflix 微服務套件中的一部分,很不幸的是 Eureka 2.0 開源工作宣告停止。

 

網絡通信

服務的調用方和提供方都來自不同的主機的不同的進程,所以要進行調用,必然少不了網絡通信。可以說網絡通信是分布式系統的重中之重,網絡通信框架的好壞直接影響服務框架的性能。從零實現一套性能高,穩定性強的通信框架還是非常難的,好在目前已經有很多開源的高性能的網絡通信框架。 針對 Java 生態有 Mina、Netty 等,目前使用最廣泛的也當屬 Netty。Netty 使用的是 per thread one eventloop 線程模型,這點與 Nginx 等其他高性能網絡框架類似。另外,Netty 非常易用,所以網絡通信選擇 Netty 框架自然是毫無疑問的。

 

Netty實踐學習案例,是Netty初學者及核心技術鞏固的最佳實踐。
可以見我的netty學習工程:
https://github.com/sanshengshui/netty-learning-example

 

服務編碼

內存對象要經過網絡傳輸前需要做兩件事:第一是確定好通信協議,第二序列化。

 

通信協議

通信協議說白了在發送數據前按照一定的格式來處理數據,然后進行發送,保證接收方拿到數據知道按照什么樣的格式進行處理。

有些同學可能不理解,為什么需要通信協議,不是有 TCP、UDP 協議了嗎?這里說的不是傳輸層的通信協議,應該是應用層的協議類似 HTTP。

因為的 TCP 協議雖然已經保證了可靠有序的傳輸,但是如果沒有一套應用層的協議,就不知道發過來的字節數據是不是一個完整的數據請求,或者說是多個請求的字節數據都在一起,無法拆分,這就是是所謂的粘包,需要按照協議進行拆包,拆成一個個完整的請求包進行處理。

協議的實現上一般大廠或者開源的服務框架選擇自建協議,更偏向服務領域。如 Dubbo,當然也有些框架直接使用 HTTP,HTTP/2,比如 GRPC 使用的就是 HTTP/2。

序列化

由於向網絡層發送的數據必須是字節數據,不可能直接將一個對象發送到網絡,所以在發送對象數據前,一般需要將對象序列化成字節數據,然后進行傳輸。

在服務方收到網絡的字節數據時,需要經過反序列化拿到相關的對象。

序列化的實現目前現成比較多,如 Hessian、JSON、Thrift、ProtoBuf 等。Thrift 和 ProtoBuf 能支持跨語言,性能比較好,不過使用時需要編寫 IDL 文件,有點麻煩。Hessian、JSON 使用起來比較友好,但是性能會差一點。


 

服務路由

服務路由指的是向服務提供方發起調用時,需要根據一定的算法從注冊中心拿到的服務方地址信息中選擇其中的一批機器進行調用。

路由的算法一般是根據場景來進行選擇的,比如有些公司實施兩地三中心這種高可用部署,但是由於兩地的網絡延時比較大,那這時就可以實施同地區路由策略,比如上海的調用方請求會優先選擇上海的服務進行調用,來降低網絡延時導致的服務端到端的調用耗時。

還有些框架支持腳本配置來進行定向路由策略。


 

負載均衡

負載均衡是緊接着服務路由的模塊,負載均衡負責將發送請求均勻合理的發送到服務提供方的節點上,而備選機器,一般就是經過路由模塊選擇出來的。

負載均衡的算法有很多,如 RoundRobin、Random、LeastActive、ConsistentHash 等。

而且這些算法一般都是基於權重的增強版本,因為需要根據權重來調節每台服務節點的流量。


 

服務鑒權

服務鑒權是服務安全調用的基礎,雖然絕大部分服務都是公司內部服務,但是對於敏感度較高的數據還是需要進行鑒權的。

鑒權的服務需要對服務的調用方進行授權,未經授權的調用方是不能夠調用該服務的。

關於服務鑒權的實現大都是基於 token 的認證方案,如 JWT(JSON Web Token) 認證。


 

可用性保障

可用性保障模塊是服務高可用的一個重要保證。

服務在交互中主要分成調用方和提供方兩種角色,作為服務調用方,可以通過服務降級提升可用性。作為服務提供方,可以通過服務限流、服務隔離來保證可用性。

服務降級

服務降級指的是當依賴的服務不可用時,使用預設的值來替代服務調用。

試想一下,假設調用一個非關鍵路徑上的服務(也就是說該調用獲取的結果是否實時,是否正確不是特別重要)出現問題,導致調用超時、失敗等,在沒有降級措施的情況下,會直接應用服務調用方業務。

因此,有些非關鍵路徑上服務調用,可以通過服務降級實現有損服務,柔性可用。 開源的降級組件有 Netflix 的 Hystrix,Hystrix 使用比較廣泛。

服務限流

服務降級保護的是服務的調用方,也就是服務的依賴方。而服務的提供方呢,如何保證服務的可用性呢? 服務限流指的是對服務調用流量的限制,限制其調用頻次,來保護服務。

在高並發的場景中,很容易出現流量過高,導致服務被打垮。這里就需要限流來保證服務自身的穩定運行。 Hystrix 也是可以用來限流的,但是用的比較多的有 guava 的 RateLimiter,其使用的是令牌桶算法,能夠保證平滑限流。

服務隔離

除了服務限流對服務提供方進行保護,就夠了嗎? 可能還不夠,考慮一下這樣的場景,假設某一個有問題的方法出現問題,處理非常耗時,這樣會堵住整個服務處理線程,導致正常的服務方法也不能夠正常調用。因此還需要服務隔離。 服務隔離指的是對服務執行的方法進行線程池隔離,保證異常耗時方法不會對正常的方法調用產生干擾,進而保護服務的穩定運行,提升可用性。


 

服務監控

服務監控是高可用系統不可或缺的重要支撐。

服務監控不僅包括服務調用等業務統計信息 Metrics,還包括分布式鏈路追蹤 Trace。

分布式系統監控比單體應用要復雜的多,需要將大量的監控信息進行聚合展示,尤其是在分布式鏈路追蹤方面,由於服務調用過程中涉及到多個分布在不同機器上的服務,需要一個調用鏈路展示系統方便查看調用鏈路中耗時和出問題的環節。

Metrics

Metrics 監控主要是服務調用的一些統計報表,包括服務調用次數、成功數、失敗數,以及服務方法的調用耗時,如平均耗時,耗時99線,999線等。全方位展示服務的可用性以及性能等信息。

目前開源的 Metrics 監控有美團點評的 Cat、SoundCloud 的 Prometheus 以及基於 OpenTracking 的 SkyWalking。

Trace

Trace 監控是對分布式服務調用過程中的整體鏈路展示和分析。方便查看鏈路上各個環境的性能問題。

分布式鏈路追蹤的原理大都是基於 Google 的論文 Dapper, a Large-Scale Distributed Systems Tracing Infrastructure。 開源的分布式鏈路追蹤系統有美團點評的 Cat,基於 OpenTracking 的SkyWalking、Twitter 的 ZipKin。


 

配置中心

配置中心不光是常見的系統需要,服務框架也需要,它能夠對系統中使用的配置進行管理,也能夠針對修改配置動態通知到應用系統。 一套完善的服務框架,必然少不了配置,如一些動態開關、降級配置、限流配置、鑒權配置等。

開源的配置中心有阿里的 Diamond,攜程的 Apollo。


 

治理平台

治理平台指的是對服務進行管理的平台。

微服務微了之后,必然會導致服務數量的上升,如果沒有一個完善的治理平台,服務規模擴大之后,很難去維護,也必然導致故障頻頻,並且極度影響開發效率。

治理平台主要是服務功能的相關操作平台,包括服務權重修改、服務下線、鑒權降級等配置修改等。 治理平台跟服務框架的耦合比較強,所以開源的比較少。

其他

關於RPC原理實現詳解到這里就結束了。

原創不易,如果感覺不錯,希望給個推薦!您的支持是我寫作的最大動力!

版權聲明:

作者:穆書偉

博客園出處:www.cnblogs.com/sanshengshu…

github出處:github.com/sanshengshu…    

個人博客出處:sanshengshui.github.io/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM