RPC實例及總結


RPC的概念

RPC是指遠程過程調用,也就是說兩台服務器A,B,一個應用部署在A服務器上,想要調用B服務器上應用提供的函數/方法,由於不在一個內存空間,不能直接調用,需要通過網絡來表達調用的語義和傳達調用的數據。

下面是對於RPC的理解:

 

RPC調用遠程服務的過程:

 

1、客戶端client發起服務調用請求。

2、client stub 可以理解成一個代理,會將調用方法、參數按照一定格式進行封裝,通過服務提供的地址,發起網絡請求。

3、消息通過網絡傳輸到服務端。

4、server stub接受來自socket的消息

5、server stub將消息進行解包、告訴服務端調用的哪個服務,參數是什么

6、結果返回給server stub。

7、sever stub把結果進行打包交給socket

8、socket通過網絡傳輸消息

9、client slub 從socket拿到消息。

10、client stub解包消息將結果返回給client。

一個RPC框架就是把步驟2到9都封裝起來。

為什么需要RPC

1、首先要明確一點:RPC可以用HTTP協議實現,並且用HTTP是建立在 TCP 之上最廣泛使用的 RPC,但是互聯網公司往往用自己的私有協議,比如鵝廠的JCE協議,私有協議不具備通用性為什么還要用呢?因為相比於HTTP協議,RPC采用二進制字節碼傳輸,更加高效也更加安全。
2、現在業界提倡“微服務“的概念,而服務之間通信目前有兩種方式,RPC就是其中一種。RPC可以保證不同服務之間的互相調用。即使是跨語言跨平台也不是問題,讓構建分布式系統更加容易。
3、RPC框架都會有服務降級、流量控制的功能,保證服務的高可用。

RPC框架職責

    通過上面的討論,RPC框架要向調用方屏蔽各種復雜性,要向服務提供方也屏蔽各類復雜性:

  1)調用方感覺就像調用本地函數一樣

  2)服務提供方感覺就像實現一個本地函數一樣來實現服務

 所以整個RPC框架又分為client部分與server部分,負責把整個非(1)(2)的各類復雜性屏蔽,這些復雜性就是RPC框架的職責。

再細化一些,client端又包含:序列化、反序列化、連接池管理、負載均衡、故障轉移、隊列管理,超時管理、異步管理等等等等職責。server端包含:服務端組件、服務端收發包隊列、io線程、工作線程、序列化反序列化、上下文管理器、超時管理、異步回調等等等等職責。

RPC要點

1、代理

因為涉及到兩個機器,是不能直接調用的,或者說是不能直接訪問到目標主機,所以需要一個代理對象來完成這個功能,目前可以使用jdk提供的動態代理,或者是三方提供的動態代理(cglib)的方式來完成整個的代理過程。

2、序列化

在調用別人服務的時候,肯定需要一些參數(入參-出參等等),這些參數需要在網絡上進行傳輸,就涉及到了序列化的問題,因此在實現RPC的是,我們的參數是需要序列話的,序列化的方式我們有很多中選擇, 可以選擇jdk中自帶的,三方的Hessian等等。

3、通信

序列化好的數據(二進制流),是怎么到目標機器上面的,然而這個就需要通信技術來解決了,這就引入了I/o網絡模型,有阻塞的,非阻塞的,信號的,多路復用的,以及異步的等,選擇一種適合的就顯示額外重要了。

4、服務實例化

當目標服務器接受到了我們的請求后,並返回數據,我們需要在本機上面將進行反序列化,然后生成對象(不一定需要),調用服務。

RPC的應用

RPC在分布式系統中的系統環境建設和應用程序設計中有着廣泛的應用,應用包括如下方面:

1、分布式操作系統的進程間通訊

進程間通訊是操作系統必須提供的基本設施之一,分布式操作系統必須提供分布於異構的結點機上進程間的通訊機制,RPC是實現消息傳送模式的分布式進程間通訊的手段之一。

2、構造分布式計算的軟件環境

由於分布式軟件環境本身地理上的分布性, 它的各個組成成份之間存在大量的交互和通訊,R P C 是其基本的實現方法之一。ONC+和DCE兩個流行的分式布計算軟件環境都是使用RPC構造的,其它一些分布式軟件環境也采用了RPC方式。

3、遠程數據庫服務

在分布式數據庫系統中,數據庫一般駐存在服務器上,客戶機通過遠程數據庫服務功能訪問數據庫服務器,現有的遠程數據庫服務是使用RPC模式的。例如,Sybase和Oracle都提供了存儲過程機制,系統與用戶定義的存儲過程存儲在數據庫服務器上,用戶在客戶端使用RPC模式調用存儲過程。

4、分布式應用程序設計

RPC機制與RPC工具為分布式應用程序設計提供了手段和方便, 用戶可以無需知道網絡結構和協議細節而直接使用RPC工具設計分布式應用程序。

5、分布式程序的調試

RPC可用於分布式程序的調試。使用反向RPC使服務器成為客戶並向它的客戶進程發出RPC,可以調試分布式程序。例如,在服務器上運行一個遠端調試程序,它不斷接收客戶端的RPC,當遇到一個調試程序斷點時,它向客戶機發回一個RPC,通知斷點已經到達,這也是RPC用於進程通訊的例子。

RPC實例

下面就舉一個1+1=2 的遠程調用的例子。客戶端發送兩個參數,服務端返回兩個數字的相加結果。RpcConsumer類調用CalculateService中的Calculate方法, 首先通過RpcFramework中的call方法,注冊自己想要調用那個服務,返回代理,然后就像本地調用一樣去調用Calculate方法,計算People,”People”有兩個屬性都被賦值成1,返回這兩個屬性相加后的結果。

public class RpcConsumer {public static void main(String args[]) {
    CalculateService service=RpcFramework.call(CalculateService.class,"127.0.0.1",8888);      
    People people=new People(1,1);
    String hello=service.Calculate(people);
    System.out.println(hello);

     }

生成動態代理的代碼如下。客戶端在調用方法前會先執行invoke方法,建立socket連接,把方法名和參數傳遞給服務端,然后獲取返回結果。

 //動態代理機制
    public static <T> T  call(final Class<?> interfaceClass,String host,int port){
         if(interfaceClass==null){
             throw new IllegalArgumentException("調用服務為空");
         }
        if(host==null||host.length()==0){
            throw new IllegalArgumentException("主機不能為null");
        }
         if(port<=0||port>65535){
            throw new IllegalArgumentException("端口不合法"+port);
         }

         return (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[]{interfaceClass},new CallerHandler(host,port));
    }
    static class CallerHandler implements InvocationHandler {
        private String host;
        private int port;
        public CallerHandler(String host, int port) {
            this.host = host;
            this.port = port;
            SERVER = new InetSocketAddress(host, port);
        }

        public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {

            Socket socket = new Socket(host, port);
            try {
                ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                try {
                    output.writeUTF(method.getName());
                    output.writeObject(method.getParameterTypes());
                    output.writeObject(arguments);
                    ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                    try {
                        Object result = input.readObject();
                        if (result instanceof Throwable) {
                            throw (Throwable) result;
                        }
                        return result;

                    } finally {
                        input.close();
                    }

                } finally {
                    output.close();
                }

            } finally {
                socket.close();
            }
        }
    }

 

RpcProvider 類實現具體的Calculate方法。通過RpcFramework中的publish方法,發布自己的服務。

 

public class RpcProvider{
    public static void main(String[] args) throws Exception {
        CalculateService service =new CalculateServiceImpl();
        RpcFramework.publish(service,8888);
    }}
interface CalculateService{
    String Calculate(People p);}
class CalculateServiceImpl implements CalculateService{
    public String Calculate(People people){
        int res=people.getA()+people.getB();
        return "計算結果 "+res;
    }
}

 

 

發布服務的代碼如下。服務端循環監聽某個端口,采用java原生的序列化方法,讀取客戶端需要調用的方法和參數,執行該方法並將結果返回。

public static void publish(final Object service,int port) throws IOException {
        if(service==null)
            throw new IllegalArgumentException("發布服務不能是空");
        if(port<=0 || port >65535)
            throw new IllegalArgumentException("端口不合法"+port);
        ServerSocket server=new ServerSocket(port);
        while (true) {
            try{
                final Socket socket=server.accept();
                new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        try {
                            ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                            try {
                                String methodName = input.readUTF();
                                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                                Object[] arguments = (Object[]) input.readObject();
                                ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                                try {
                                    Method method = service.getClass().getMethod(methodName, parameterTypes);
                                    Object result = method.invoke(service, arguments);
                                    output.writeObject(result);
                                } catch (Throwable t) {
                                    output.writeObject(t);
                                } finally {
                                    output.close();
                                }
                            } finally {
                                input.close();
                            }
                        } finally {
                            socket.close();
                        }
                    } catch (Exception e) {
                            e.printStackTrace();
                    }
                }
                }).start();
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }

可以看到正確返回計算結果2。

 

以上例子轉載自簡書:https://www.jianshu.com/p/32ca4fd5a7e2  作者:樂浩beyond

總結

RPC 的主要目標是讓構建分布式計算(應用)更容易、透明,在提供強大的遠程調用能力時不損失本地調用的語義簡潔性。為實現該目標,RPC 框架需提供一種透明調用機制讓使用者不必顯式的區分本地調用和遠程調用。當我們的業務越來越多、應用也越來越多時,自然的,我們會發現有些功能已經不能簡單划分開來或者划分不出來。此時,可以將公共業務邏輯抽離出來,將之組成獨立的服務Service應用 。而原有的、新增的應用都可以與那些獨立的Service應用 交互,以此來完成完整的業務功能。所以此時,我們急需RPC來完成這種需求。可以說,今后我們用到RPC的地方只會越來越多,它可以有效的減輕我們的工作量。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM