本地調用遠程服務器上的功能


本地調用遠程服務器上的功能

RPC,全稱為Remote Procedure Call(遠程過程調用)。通俗一點講就是在本地調用遠程服務器上的功能。實現遠程調用至少需要滿足以下幾個條件:

1.網絡通信

2.序列化與反序列化

3.反射

遠程通信是遠程調用的前題,只有經過序列化后的數據才能在網絡上傳輸,傳輸到服務器端后需要反序列化成對象,然后通過反射機制調用服務器上客戶端指定的服務,再將結果返回給客戶端,用一張圖表示:

服務器每收到一次客戶端的請求,就啟動一個線程處理,調用具體的服務對象相應的方法,並將返回結果返回給客戶端。

基於此原理,下面寫一個示例實現RPC功能:

項目結構圖如下:分別有客戶端、服務器、服務以及請求與響應。

首先抽象出請求類與響應類:

請求類:屬性有請求的類名、方法名、輸入參數類型、輸入數據。

復制代碼
package cn.yang.common;

import java.io.Serializable;

//請求
public class Request implements Serializable {
private String className;//類名
private String methodName;//方法名

private Class[] inType;//輸入參數類型
private Object[] inData;//輸入數據

public String getClassName() {
    return className;
}

public void setClassName(String className) {
    this.className = className;
}

public String getMethodName() {
    return methodName;
}

public void setMethodName(String methodName) {
    this.methodName = methodName;
}

public Class[] getInType() {
    return inType;
}

public void setInType(Class[] inType) {
    this.inType = inType;
}

public Object[] getInData() {
    return inData;
}

public void setInData(Object[] inData) {
    this.inData = inData;
}

}
復制代碼

響應類:屬性有返回對象

復制代碼
package cn.yang.common;

import java.io.Serializable;

//響應
public class Response implements Serializable {
private Object obj;//返回對象

public Object getObj() {
    return obj;
}

public void setObj(Object obj) {
    this.obj = obj;
}

}
復制代碼

服務器類以及服務處理類:

服務器類負責接收客戶端的請求,並將請求分配給服務處理類進行處理。

服務器類:

復制代碼
package cn.yang.server;

import cn.yang.server.handler.ServerHandler;

import java.io.BufferedReader;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;

//服務器類,負責接收任務,分配任務
public class Server {
public static void main(String args[]){
try {
ServerSocket socket = new ServerSocket();
socket.bind(new InetSocketAddress(1234));
BufferedReader in;
Socket accept;
while (true){
accept = socket.accept();
System.out.println("accept a request from"+accept.getRemoteSocketAddress());
new Thread(new ServerHandler(accept)).start();
}
}catch(IOException e){
e.printStackTrace();
}

}

}
復制代碼

服務處理類:

復制代碼
package cn.yang.server.handler;

import cn.yang.common.Request;
import cn.yang.common.Response;

import java.io.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;

//服務處理類
public class ServerHandler implements Runnable {
private Socket socket;
private ObjectInputStream in;
private ObjectOutputStream out;

public ServerHandler(Socket socket){
    this.socket=socket;
}

@Override
@SuppressWarnings("unchecked")
public void run() {
    try {
        in = new ObjectInputStream(socket.getInputStream());
        Request request = (Request)in.readObject();
        
        //類名
        String className=request.getClassName();
        //方法名
        String methodNmae=request.getMethodName();
        //參數類型
        Class[] inType=request.getInType();
        //參數值
        Object[] inDate=request.getInData();
        
        //根據類名導入類的字節碼
        Class cls=Class.forName(className);
        
        //根據方法名得到方法
        Method method = cls.getMethod(methodNmae, inType);
        
        //創建類對象
        Object obj = cls.newInstance();
        
        //執行方法得到結果
        Object result = method.invoke(obj, inDate);

        //得到socket輸出流
        out=new ObjectOutputStream(socket.getOutputStream());

        //構造Response對象並輸出
        Response response=new Response();
        response.setObj(result);
        out.writeObject(response);
    } catch (InstantiationException | InvocationTargetException
            | IllegalAccessException | NoSuchMethodException |
            ClassNotFoundException | IOException e) {
        e.printStackTrace();
    }finally {
        if(in!=null){
            try {
                in.close();
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                in=null;
            }
        }

        if(out!=null){
            try {
                out.close();
            } catch (IOException e2) {
                e2.printStackTrace();
            }finally {
                out=null;
            }
        }

        if(socket!=null){
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                socket=null;
            }
        }
    }
}

}
復制代碼

服務類:

復制代碼
package cn.yang.service;

import java.util.Arrays;
import java.util.List;

//服務類
public class Hello{
//sayHello服務方法
public String sayHello(String name){
return "hello:"+name;
}

//list服務方法
public List<String> list(){
    List<String> list= Arrays.asList("1","2","3");
    return list;
}

}
復制代碼

最后是客戶端類:

復制代碼
package cn.yang.client;

import cn.yang.common.Request;
import cn.yang.common.Response;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;

//客戶端
public class client {
public static void main(String args[]){
Socket socket = new Socket();
ObjectOutputStream objectOutputStream=null;
ObjectInputStream objectInputStream=null;
try {
socket.connect(new InetSocketAddress(1234));

        //封裝請求類
        Request request=new Request();
        request.setClassName("cn.yang.service.Hello");//全類名
        request.setMethodName("sayHello");//方法
        request.setInType(new Class[]{String.class});//參數類型
        request.setInData(new Object[]{"LiMing"});//參數值

        /*Request request=new Request();
        request.setClassName("cn.yang.service.Hello");
        request.setMethodName("list");
        request.setInType(new Class[]{});
        request.setInData(new Object[]{});*/

        //打開socket輸出流
        objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
        //寫入Request對象
        objectOutputStream.writeObject(request);

        //打開socket輸入流
        objectInputStream = new ObjectInputStream(socket.getInputStream());

        //讀取Response對象
        Response response=(Response)objectInputStream.readObject();
        Object obj = response.getObj();
        System.out.println("result:"+obj);

    } catch (IOException | ClassNotFoundException e) {
        e.printStackTrace();
    }finally {
        if(objectOutputStream!=null){
            try {
                objectOutputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        if(objectInputStream!=null){
            try {
                objectInputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        if(socket!=null){
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

}
復制代碼

運行Server,然后再運行Client,得到結果:

總結:RPC實際上是在網絡通信基礎之上,運用反射技術達到遠程功能調用的目的。Request和Response類必須是可序列化類,以便在網絡上傳輸,並且客戶端和服務器都必須有這兩個類。這個示例只是以最簡單的方式說明RPC本質是什么。

現在有許多RPC框架,如Thrift,Hessian,JsonRPC,Dubbo,rPcx,gRPC等,他們就是在此基礎之上進行再封裝,優化,使用戶更簡便的使用,達到調用遠程功能,就如同調用本地功能一樣方便。

上面所說的網絡通信、序列化與反序例化以及反射的實現各個框架都有所不同。

網絡通信有的選擇TCP,有的選擇HTTP。TCP 是傳輸層協議,HTTP 是應用層協議,而傳輸層較應用層更加底層,在數據傳輸方面,越底層越快,因此,在一般情況下,TCP 一定比 HTTP 快。

就序列化而言,Java 提供了默認的序列化方式,但在高並發的情況下,這種方式將會帶來一些性能上的瓶頸,於是市面上出現了一系列優秀的序列化框架,比如:Protobuf、Kryo、Hessian、Jackson 等,它們可以取代 Java 默認的序列化,從而提供更高效的性能。

反射技術有Java自帶的反射技術,Objenesis以及CGLIB。

實際應用中,服務不會直接是一個具體的類,而是一個接口,也稱為協議,即客戶端和服務端達成的一種共識。客戶端請求時,傳輸接口名字,服務端通過接口名字找到接口的實現類,然后創建對象,並執行方法返回結果或者客戶端通過服務注冊中心找到服務實現類名,傳送給服務器,這涉及到服務注冊與發現,不同的框架實現的方式也有所不同,甚至一些框架還有流量監控與控制,服務故障轉換,負載均衡等。

這一篇文章只是執磚引玉,希望大家在了解RPC基礎原理之后,走上更高的台階!

參考:

輕量級分布式 RPC 框架

阿里Dubbo瘋狂更新,關Spring Cloud什么事


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM