RPC框架從0到10


RPC(Remote Procedure Call)

從單機走向分布式,產生了很多分布式的通信方式

  • 最古老也是最有效,並且永不過時的,TCP/UDP的二進制傳輸,事實上所有的通信方式歸根結底都是TCP/UDP
  • CORBA Common Object Request Broker Architecture。古老而復雜的,支持面向對象的通信協議。
  • Web Service(SOA SOAP RDDI WSDL ...)基於http+xml的標准化Web API
  • RestFul 回歸簡單化本源的Web API的事實標准,http+json
  • RMI Remote Method Invocation Java內部的分布式通信協議
  • JMS Java Message Service JavaEE中的消息框架標准,為很多MQ所支持
  • RPC Remote Procudure Call 遠程過程方法調用,這只是一個統稱概念,遠程通信的方式,重點在於方法調用(不支持對象的概念),具體實現甚至可以用RMI RestFul等去實現,但一般不用,因為RMI不能跨語言,而RestFul效率太低。多用於服務器集群間的通信,因此常使用更加高效,短小精悍的傳輸模式以提高效率。
    從單機到分布式->分布式通信(一台機器解決不了的問題,需要多台機器解決,多台機器內部需要進行通信)->最基本:二進制,兩台機器之間通過網絡通信一定是二進制數據傳輸TCP/IP(socket)

RPC01

首先構造一個用於傳輸的類User,該類實現了序列化可以進行序列化從而通過網絡傳輸二進制數據,他會跑在一台機器上去訪問數據庫,有人想拿到這個User的話我會對外提供一些服務

然后通過AUserService暴露FindById接口來進行數據查詢,你給我一個ID,我給你一個User
User.java

package com.airsky.demo.rpc.Common;

import java.io.Serializable;

public class User implements Serializable {
    private static final long serialVersionUID = 1L;
    int id;
    String name;

    public User(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }
}

AUserService.java

package com.airsky.demo.rpc.Common;

public interface AUserService {
    User findById(int id);
}

最終在AUserServiceImpl進行了實現,對傳入的id進行了模擬數據庫查詢,返回一個User對象。這里只進行一個模擬,具體實現的話也很簡單。

package com.airsky.demo.rpc.Common;

public class AUserServiceImpl implements AUserService{

    @Override
    public User findById(int id) {
        //模擬數據庫查詢
        return new User(id,"AirSky");
    }
}

如果我們想通過最原始的方式來完成上述操作該怎么做呢?這個也是很多游戲和軟件類似功能最底層的實現。
首先我們這台機器對外提供服務,就需要提供一個Server打開一個Socket端口進行監聽,下面的實現是在Server類中新建了一個ServerSocket對象,並監聽8888端口,accept接收一個客戶端連接,然后通過send方法對連接進行處理。

public class Server {
    //加上條件變量,不然s.close()不可達
    private static boolean running = true;
    public static void main(String[] args) throws Exception {
        ServerSocket s=new ServerSocket(8888);
        while (running){
            System.out.println("接收連接中...");
            Socket client=s.accept();
            System.out.println("連接主機:" + client.getInetAddress());
            send(client);
            client.close();
        }
        s.close();
    }

下面來實現send方法對連接進行進一步處理。大概就是你給我一個id,我給你一個id和id對應的名字,就是這么簡單。send方法傳入了一個socket對象,我們可以通過socket對象的getInputStream方法來打開一個輸入流,getOutputStream方法打開一個輸出流。使用DataOutputStream的readInt對輸入流中的數據進行指定方式讀取,使用該包裝流來簡化讀寫操作,如果客戶端以正確的方式傳送了ID的即可讀取到,然后調用AUserService的findById方法來查詢id對應的User對象,最后通過DataOutputStream的writeInt和writeUTF以指定的方式分別向輸出流中寫入獲取到的id和name。

    private static void send(Socket client) throws Exception {
        //獲取Socket的輸入流,用Data流包裝讀取二進制數據
        DataInputStream dis = new DataInputStream(client.getInputStream());
        //獲取Socket的輸出流,用Data流包裝寫出二進制數據
        DataOutputStream dos = new DataOutputStream(client.getOutputStream());

        //讀取Id
        int id = dis.readInt();
        System.out.println("接收到ID為:"+id);
        AUserService service= new AUserServiceImpl();
        User user = service.findById(id);
        //寫出數據
        System.out.println("返回的對象為:"+user);
        dos.writeInt(user.getId());
        dos.writeUTF(user.getName());
        dos.flush();
        dis.close();
        dos.close();
    }

服務端實現了我們再來實現以下客戶端。服務端監聽了8888端口我們是不是就需要新建一個Socket來連接8888端口,同樣使用getOutputStream來獲取一個輸出流,再用DataOutputStream向流中寫指定格式數據(二進制),最后來使用getInputStream獲取輸入流得到數據n。得到數據之后new一個新對象拿來用。

public class Client {
    public static void main(String[] args) throws Exception {
        Socket s = new Socket("127.0.0.1",8888);
        //使用DataOutputStream包裝以二進制寫入
        DataOutputStream dos = new DataOutputStream(s.getOutputStream());
        dos.writeInt(8888);
        DataInputStream dis = new DataInputStream(s.getInputStream());
        int id = dis.readInt();
        String name = dis.readUTF();
        User user = new User(id,name);
        System.out.println(user);
    }
}

現在我們啟動服務端,然后啟動客戶端后,服務端和客戶端就會收到以下數據


以上這種是最簡單最原始的方式,這種方式非常的不靈活,這種方式只能傳輸單一的對象,對於傳輸的對象必須了解才能傳的過去拿的過來,如果在代碼量大的情況下需要對某個或者多個對象加屬性,那么我們的就需要大量整 改,尤其是傳輸過程和業務邏輯代碼全部混合在一起的時候,所以我們就需要一種寫起來更加爽的方式。RPC02應運而生。

RPC02

假如我現在只是個業務開發,對網絡這塊兒不太熟,現在我想告訴自己,能不能給個簡單的方法,讓我直接訪問接口編寫邏輯代碼就行了。所以這里我們就需要對網絡這塊兒進行一個簡單的封裝。這就是RPC的演進過程。
最簡單的方法是將網絡操作封裝為代理類Stub,Stub屏蔽了關於網絡的細節

package com.airsky.demo.rpc.rpc02;

import com.airsky.demo.rpc.Common.User;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.Socket;

public class Stub {
    public User findUserById(Integer id) throws Exception {
        Socket s = new Socket("127.0.0.1",8888);
        //使用DataOutputStream包裝以二進制寫入
        DataOutputStream dos = new DataOutputStream(s.getOutputStream());
        dos.writeInt(id);
        DataInputStream dis = new DataInputStream(s.getInputStream());
        int id1 = dis.readInt();
        String name = dis.readUTF();
        User user = new User(id1,name);
        return user;
    }
}

現在只要在new一個Stub出來,用Stub的findUserById方法就能拿到對象
Client中調用代理類

package com.airsky.demo.rpc.rpc02;

public class Client {
    public static void main(String[] args) throws Exception {
        Stub stub = new Stub();
        System.out.println(stub.findUserById(33));
    }
}

這就是第一步的演進,開發是一個不斷螺旋遞增的迭代過程,瀑布式的模型在很多場景下,尤其是互聯網下早就被敏捷開發所替代了,敏捷一定要迭代。我們很多時候了解到的都是結果,直接接觸到了最終的版本,所以我們根本理解不了中間的演進過程,也理解不了前人們為什么做出來這么多的改進,這就是我們不能理解RPC的原因。從歷史演進學習技術,會理解得更透徹,知道前因后果,怎么問題的步驟與思路。

RPC03

到了這里大家可能會說,作為一個Stub,這非常的不完善,你只能代理一個方法,返回一個類,這也太弱了。那我們就一點點來演進。
如果說Stub能給我提供這樣的一個接口,我們想使用Service的findUserById,而Stub的接口提供給我這個方法。通過Stub代理過的findUserById方法我們就能遠程訪問了。當我們調用findUserById的時候,他幫我們加進去了一些代碼,這些代碼就是網絡服務,所以Stub這里實現了代理模式里面的動態代理,這里最難的。
Stub

package com.airsky.demo.rpc.rpc03;

import com.airsky.demo.rpc.Common.IUserService;
import com.airsky.demo.rpc.Common.User;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class Stub {
    public static IUserService getStub(){
        InvocationHandler h=new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket s = new Socket("127.0.0.1",8888);
                //使用ByteArrayOutputStream在內存中聲明一個數組流
//        ByteArrayOutputStream bos = new ByteArrayOutputStream();
                //使用DataOutputStream包裝以二進制寫入
                DataOutputStream dos = new DataOutputStream(s.getOutputStream());
                dos.writeInt(456);//這里寫死了需要改進
//        s.getOutputStream().write(bos.toByteArray());
//
//        System.out.println(s.getInputStream().read());
                DataInputStream dis = new DataInputStream(s.getInputStream());
                int id1 = dis.readInt();
                String name = dis.readUTF();
                User user = new User(id1,name);

                dos.close();
                dis.close();
                s.close();
                return user;
            }
        };
        Object o= Proxy.newProxyInstance(IUserService.class.getClassLoader(),new Class[]{IUserService.class},h);
        System.out.println(o.getClass().getName());
        System.out.println(o.getClass().getInterfaces()[0]);
        return (IUserService)o;

    }
}

Client

package com.airsky.demo.rpc.rpc03;

import com.airsky.demo.rpc.Common.IUserService;
import com.airsky.demo.rpc.Common.User;

public class Client {
    public static void main(String[] args) {
        IUserService service=Stub.getStub();//返回一個動態生成的對象
        User user=service.findUserById(111);
        System.out.println(user);
    }
}

當我們在Client端調用findUserById的時候,首頁一點要明白,service是一個具體的類,是我們動態產生的,當調用這個方法的時候,他本質上會調用動態代理,那么既然調用了動態代理,那么就要使用InvocationHandler調用處理器進行處理,從而調用我們在invoke方法中實現的內容。invoke的第一個參數聲明了調用該方法的代理實例,第二參數指向了它里邊的findUserById方法,第三個參數為傳入的參數。這樣我們就可以代理實現修改類方法。這樣寫的好處在於以后在這個接口里添加任何方法的時候都可以在這里進行處理。此時通過動態代理產生的是一個實現了UserService接口的新的類,通過打印o類名即可看出。

通過反射獲取className看出這是動態產生的新類$Proxy0

我們通過invoke第二個參數告訴他這個是實現UserService接口。


通過反射獲取interface看出這是實現了UserService接口

我們對這個類進行任何方法調用的時候,都是要經過InvocationHandler的處理,然后我在里面可以判斷你調用的是哪個方法,我做相應的處理,調用findUserById我要聯網,調用別的我要檢查你的權限等等。

RPC04

上一個版本里的代碼是有缺陷的,為什么說呢,可以看到在Stub的此處

不管我們調用哪個方法,都只能傳456,如果我們想實現一個Save功能那不就完蛋了嗎。所以我們需要改進這里為可控參數。那么怎么改進呢,無論什么方法,我都用一個通用的版本實現即可。既然我們要遠程調用其他方法,那么就可以通過invoke方法的三個參數來獲取被代理的的方法信息,然后將方法信息傳輸過去。
Stub

package com.airsky.demo.rpc.rpc04;

import com.airsky.demo.rpc.Common.IUserService;
import com.airsky.demo.rpc.Common.User;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class Stub {
    public static IUserService getStub(){
        InvocationHandler h=new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket s = new Socket("127.0.0.1",8888);

                //使用ObjectOutputStream將對象數據序列化傳輸過去。
                ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
                String methodName = method.getName();
                Class[] parametersTypes = method.getParameterTypes();
                oos.writeUTF(methodName);
                oos.writeObject(parametersTypes);
                oos.writeObject(args);
                oos.flush();

                DataInputStream dis = new DataInputStream(s.getInputStream());
                int id1 = dis.readInt();
                String name = dis.readUTF();
                User user = new User(id1,name);

                oos.close();
                dis.close();
                s.close();
                return user;
            }
        };
        Object o= Proxy.newProxyInstance(IUserService.class.getClassLoader(),new Class[]{IUserService.class},h);
        System.out.println(o.getClass().getName());
        System.out.println(o.getClass().getInterfaces()[0]);
        return (IUserService)o;

    }
}

下面來假設一下,假如我們要調用saveUser方法,我們通過getName獲取了方法名,通過getParamterTypes獲取了參數類型,args獲取到了參數,那么我們把這些數據通過ObjectOutputStream流告訴服務端,馬上給我調這個方法,參數類型數據都給你。服務端馬不停蹄的給我找到了這個方法,並且傳入參數得到了返回值。
既然客戶端做了這么大的改變,那么服務端是不是也要修改呀?是滴,服務端同樣使用ObjectInputStream讀入對象信息,首先按照傳輸的順序使用readUTF讀取方法名,然后使用readObject讀取參數類型,返回類型為Class數組,接着使用readObject讀取方法參數args,返回類型為Object,接下來先實例化需要調用方法的類,然后通過反射找到對應方法名和參數類型的方法,最后通過invoke進行反射調用,args為參數值,調用完成之后即可得到該方法的返回值得到一個User對象。當然如果我們想要實現一個saveUser方法也同樣可以使用這種方式進行調用,只需要改變一下Client的方法名,其它的不需要進行改動。非常方便。

package com.airsky.demo.rpc.rpc04;

import com.airsky.demo.rpc.Common.UserService;
import com.airsky.demo.rpc.Common.UserServiceImpl;
import com.airsky.demo.rpc.Common.User;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.ObjectInputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
    //加上條件變量,不然s.close()不可達
    private static boolean running = true;
    public static void main(String[] args) throws Exception {
        ServerSocket s=new ServerSocket(8888);
        while (running){
            System.out.println("接收連接中...");
            Socket client=s.accept();
            System.out.println("連接主機:" + client.getInetAddress());
            send(client);
            client.close();
        }
        s.close();
    }

    private static void send(Socket client) throws Exception {

        ObjectInputStream ois= new ObjectInputStream(client.getInputStream());
        //獲取Socket的輸出流,用Data流包裝寫出二進制數據
        DataOutputStream dos = new DataOutputStream(client.getOutputStream());


        String methodName = ois.readUTF();
        Class[] parameterTypes= (Class[]) ois.readObject();//讀取類型
        Object[] args= (Object[]) ois.readObject();//讀取Id
        System.out.println("接收到ID為:"+args[0]);
        UserService service=new com.airsky.demo.rpc.rpc04.UserServiceImpl();
        Method method = service.getClass().getMethod(methodName,parameterTypes);
        User user = (User) method.invoke(service,args);
        //寫出數據
        System.out.println("返回的對象為:"+user);
//        client.getOutputStream().write(user.getName().getBytes());
        dos.writeInt(user.getId());
        dos.writeUTF(user.getName());
        dos.flush();
        ois.close();
        dos.close();
//
//
//        System.out.println(client.getInputStream().read());
    }
}

所以在這個版本里面我可以提供很多方法的支持,對於同一個接口里面很多方法支持。

RPC05

但是如果我們想要對隨意接口的任意方法進行調用,那么上面的寫法就無法滿足我們了。還有我們的Client端對於返回值的處理還不完善,現在是將user對象拆解來進行傳輸的。如果我們的User實現改變了,那么所有的代碼也需要進行改變,這肯定不行。所以在Server發送和Client接收對象時我們均使用ObjectOutputStream來直接傳輸user對象的序列化數據。這樣的話我們的User就可以隨意改變了。
Server

package com.airsky.demo.rpc.rpc05;

import com.airsky.demo.rpc.Common.User;
import com.airsky.demo.rpc.Common.UserServiceImpl;

import java.io.DataOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
    //加上條件變量,不然s.close()不可達
    private static boolean running = true;
    public static void main(String[] args) throws Exception {
        ServerSocket s=new ServerSocket(8888);
        while (running){
            System.out.println("接收連接中...");
            Socket client=s.accept();
            System.out.println("連接主機:" + client.getInetAddress());
            send(client);
            client.close();
        }
        s.close();
    }

    private static void send(Socket client) throws Exception {

        ObjectInputStream ois= new ObjectInputStream(client.getInputStream());

        ObjectOutputStream oos = new ObjectOutputStream(client.getOutputStream());


        String methodName = ois.readUTF();
        Class[] parameterTypes= (Class[]) ois.readObject();//讀取類型
        Object[] args= (Object[]) ois.readObject();//讀取Id
        System.out.println("接收到ID為:"+args[0]);
        UserServiceImpl service=new UserServiceImpl();
        Method method = service.getClass().getMethod(methodName,parameterTypes);
        User user = (User) method.invoke(service,args);
        //使用ObjectOutputStream直接序列化寫出對象
        System.out.println("返回的對象為:"+user);
        oos.writeObject(user);
        oos.flush();
        ois.close();
        oos.close();
    }
}

Stub

package com.airsky.demo.rpc.rpc05;

import com.airsky.demo.rpc.Common.IUserService;
import com.airsky.demo.rpc.Common.User;

import java.io.DataInputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class Stub {
    public static IUserService getStub(){
        InvocationHandler h=new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket s = new Socket("127.0.0.1",8888);

                //使用ObjectOutputStream將對象數據序列化傳輸過去。
                ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
                String methodName = method.getName();
                Class[] parametersTypes = method.getParameterTypes();
                oos.writeUTF(methodName);
                oos.writeObject(parametersTypes);
                oos.writeObject(args);
                oos.flush();

                //使用ObjectOutputStream將序列化數據轉換為對象。
                ObjectInputStream ois = new ObjectInputStream(s.getInputStream());
                User user = (User) ois.readObject();
                oos.close();
                ois.close();
                s.close();
                return user;
            }
        };
        Object o= Proxy.newProxyInstance(IUserService.class.getClassLoader(),new Class[]{IUserService.class},h);
        System.out.println(o.getClass().getName());
        System.out.println(o.getClass().getInterfaces()[0]);
        return (IUserService)o;

    }
}

到了這里大家發現沒有,我們的UserService可以隨意添加方法,與此同時User還可以自由改變實現,我們的程序會變得更靈活,尤其是把Stub的內容對特別弱雞級(我)的程序員屏蔽掉,告訴雞你就調我的Stub方法就能進行遠程調用了,里邊怎么處理的你不用管。

RPC06

當然上個版本也不是很完美,我們還可以做進一步的加工。在上個版本中我們只能拿到一個UserService,那我們你不能通過getStub拿到任意類型的公開接口,可不可以?非常可以。我們都知道在Proxy的newProxyInstance方法中傳入了接口的class對象,那么我們是不是可以在這個地方進行動態的傳入,傳入我們想要拿到的任意接口。
Stub

package com.airsky.demo.rpc.rpc06;

import com.airsky.demo.rpc.Common.IUserService;
import com.airsky.demo.rpc.Common.User;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class Stub {
    public static Object getStub(Class<?> clz){
        InvocationHandler h=new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket s = new Socket("127.0.0.1",8888);

                //使用ObjectOutputStream將對象數據序列化傳輸過去。
                ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
                String clazzName = clz.getName();
                String methodName = method.getName();
                Class[] parametersTypes = method.getParameterTypes();
                oos.writeUTF(clazzName);
                oos.writeUTF(methodName);
                oos.writeObject(parametersTypes);
                oos.writeObject(args);
                oos.flush();

                //使用ObjectOutputStream將序列化數據轉換為對象。
                ObjectInputStream ois = new ObjectInputStream(s.getInputStream());
                Object o = ois.readObject();
                oos.close();
                ois.close();
                s.close();
                return o;
            }
        };
        Object o= Proxy.newProxyInstance(clz.getClassLoader(),new Class[]{clz},h);
        System.out.println(o.getClass().getName());
        System.out.println(o.getClass().getInterfaces()[0]);
        return o;

    }
}

可以看到我們給getStub新添加了一個class參數clz,而將這個參數傳入了newProxyInstance,並且我們的getStub返回的是一個Object,這樣你給我一個class類型我就可以返回給你一個實現了這個class類型的動態代理的那個類對象。在invoke中我們首先通過clz.getName()獲取class的名字,因為我們需要調用的是任意接口,所以我們得先把接口名字寫出去讓服務端去進行調用。然后往外寫的時候先寫需要調用的接口名稱,在寫需要調用的接口方法,最后寫參數類型和參數。我們再來看看Server端需要怎么改動。
Server

package com.airsky.demo.rpc.rpc06;

import com.airsky.demo.rpc.Common.ProductServiceImpl;
import com.airsky.demo.rpc.Common.User;
import com.airsky.demo.rpc.Common.UserService;
import com.airsky.demo.rpc.Common.UserServiceImpl;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
    //加上條件變量,不然s.close()不可達
    private static boolean running = true;
    public static void main(String[] args) throws Exception {
        ServerSocket s=new ServerSocket(8888);
        while (running){
            System.out.println("接收連接中...");
            Socket client=s.accept();
            System.out.println("連接主機:" + client.getInetAddress());
            send(client);
            client.close();
        }
        s.close();
    }

    private static void send(Socket client) throws Exception {

        ObjectInputStream ois= new ObjectInputStream(client.getInputStream());
        
        ObjectOutputStream oos = new ObjectOutputStream(client.getOutputStream());


        String clazzName = ois.readUTF();
        String methodName = ois.readUTF();
        Class[] parameterTypes= (Class[]) ois.readObject();//讀取類型
        Object[] args= (Object[]) ois.readObject();//讀取Id
        System.out.println("接收到ID為:"+args[0]);
//去服務注冊表找到具體的類,如果使用spring甚至還可以直接根據配置注入bean然后根據bean查找。
        Class clazz = ProductServiceImpl.class;
        Method method = clazz.getMethod(methodName,parameterTypes);
        Object o = method.invoke(clazz.newInstance(),args);
        //使用ObjectOutputStream直接序列化寫出對象
        System.out.println("返回的對象為:"+o);
        oos.writeObject(o);
        oos.flush();
        ois.close();
        oos.close();
    }
}

Server端無非就是在讀的時候先讀到我們的className,然后就去服務注冊表找到具體的實現類,最后找對應的方法,最后傳入參數進行調用就行了。當然這個過程完全可以使用Spring來注入來查找具體的實現類。我們通過新增一個ProductService接口來測試代碼是否完善。
Product

package com.airsky.demo.rpc.Common;

import java.io.Serializable;

public class Product implements Serializable {
    private static final long serialVersionUID = 1L;
    int id;
    String name;
    int count;

    public Product(int id, String name, int count) {
        this.id = id;
        this.name = name;
        this.count = count;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    @Override
    public String toString() {
        return "Product{" +
                "id=" + id +
                ", name=" + name +
                ", count=" + count +
                '}';
    }
}

ProductService

package com.airsky.demo.rpc.Common;

public interface ProductService {
    Product findProductByName(String name);
}

ProductServiceImpl

package com.airsky.demo.rpc.Common;

public class ProductServiceImpl implements ProductService {
    @Override
    public Product findProductByName(String name) {
        return new Product(4,name,4);
    }
}

Client

package com.airsky.demo.rpc.rpc06;

import com.airsky.demo.rpc.Common.Product;
import com.airsky.demo.rpc.Common.ProductService;
public class Client {
    public static void main(String[] args) {
        ProductService service= (ProductService) Stub.getStub(ProductService.class);//返回一個動態生成的對象
        Product product=service.findProductByName("SuperMan");
        System.out.println(product);

    }
}


可以看到成功的進行了遠程方法調用。
現在我們來梳理回顧一下全過程,我在Client端要對Server端的服務做調用,Server端服務提供了很多個,簡單點叫服務1,服務2...,調用服務的時候我非常不爽,因為每次都要寫網絡細節。這時我就生成了一個代理類,這個代理類用動態生成的方式來生成網絡細節,你只要告訴我這個工具類,我要服務1的方法1,我傳了一個參數1,工具類就會生成一個代理類,將我們的方法調用信息傳給服務器,服務器找到對應的實現類,對應的方法。將執行完成的結果返回。 所謂的RPC:Remote Procudure Call,遠程的、方法的、調用。幫你屏蔽了底層。這里面的實現有好多種,所以這只是一種通信的方式,我可以通過屏蔽底層的方式(Stub),就跟我調用本機的方法似的。現在只是一個最基礎的RPC實現,這里面有大量的可以改進的地方,一步一步的改進你就可以自己寫一個RPC框架了。

RPC08

此時我們在底層的實現是序列化,轉換為二進制,因為所有網絡傳輸的都是二進制。我們現在用的序列化是jdk自帶的Serializable,這個方式是最土的,因為它只支持java語言,而且效率低,長度長。所以對於RPC序列化上的實現就有好多好多可以替代的內容。
了解一下RPC的序列化框架。

  • java.io.Serializable
  • Hessian
  • google protobuf
  • fackbook Thrift
  • kyro
  • fst
  • json序列化框架
  • jackson
  • google Gson
  • FastJson
  • xmlrpc(xstream)
    Json框架就是先轉換為JSON字符串格式,通過JSON框架再轉換為二進制,XML就是轉換為XML格式。
    以上只是RPC框架的序列化實現部分,完整的RPC框架還要提供服務注冊、服務發現、服務治理、服務監控、服務的負載均衡各種各樣的東西。
    這里我們就來了解一下Hessian序列化框架,序列化就是把一個對象轉換成字節數組,反序列化就是反之。我們在HelloHessian中實現了一個方法serialize,作用是將一個對象轉換成字節數組。怎么轉換的呢?還是使用ByteArrayOutputStream創建字節數組輸出流對象然后捕獲內存緩沖區的數據(Object),轉換成字節數組,在外面我們將原來嵌套的ObjectOutputStream換成了Hessian2Output,然后將對象數據寫入了字節數組輸出流中轉換成了字節數組。
    HelloHessian
package com.airsky.demo.rpc.rpc08_Hessian01;

import com.airsky.demo.rpc.Common.User;
import com.caucho.hessian.io.Hessian2Input;
import com.caucho.hessian.io.Hessian2Output;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class HelloHessian {
    public static void main(String[] args) throws IOException {
        User a = new User(4,"AirSky");
        byte[] bytes = serialize(a);
        System.out.println(bytes.length);
        User a1 = (User)deserialize(bytes);
        System.out.println(a1);
    }

    private static byte[] serialize(User a) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        Hessian2Output output = new Hessian2Output(baos);
        output.writeObject(a);
        output.flush();
        byte[] bytes = baos.toByteArray();
        baos.close();
        output.close();
        return bytes;

    }
    private static Object deserialize(byte[] bytes) throws IOException {
        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
        Hessian2Input input = new Hessian2Input(bais);
        Object o = input.readObject();
        bais.close();
        input.close();
        return o;
    }
}

這時你可能會說,你在搞笑嗎?我直接用JDK自帶的序列化不就行了,干嘛搞的這么麻煩。別急,跟着我來看看Hessian和JDK的對比。

package com.airsky.demo.rpc.rpc08_Hessian01;

import com.airsky.demo.rpc.Common.User;
import com.caucho.hessian.io.Hessian2Output;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;

public class HessianVSJDK {
    public static void main(String[] args) throws IOException {
        User a = new User(1,"AirSky");

        long b =System.currentTimeMillis();
        byte[] HessianBytes = HessianSerialize(a);
        long c =System.currentTimeMillis();
        System.out.println("HessianLength:"+HessianBytes.length);
        System.out.println("HessianString:"+new String(HessianBytes));
        System.out.println("HessianTime:"+(c-b));

        System.out.println("------------------------------------------------------------------");

        long b1 =System.currentTimeMillis();
        byte[] JdkBytes = JdkSerialize(a);
        long c1 =System.currentTimeMillis();
        System.out.println("JdkLength:"+JdkBytes.length);
        System.out.println("JdkString:"+new String(JdkBytes));
        System.out.println("JdkTime:"+(c1-b1));
    }
    private static byte[] HessianSerialize(Object a) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        Hessian2Output output = new Hessian2Output(baos);
        output.writeObject(a);
        output.flush();
        byte[] bytes = baos.toByteArray();
        baos.close();
        output.close();
        return bytes;

    }
    private static byte[] JdkSerialize(Object a) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(baos);
        oos.writeObject(a);
        oos.flush();
        byte[] bytes = baos.toByteArray();
        baos.close();
        oos.close();
        return bytes;
    }
}

額外
我們在HessianVSJDK中實現了一個使用Hessian和ObjectOutputStream序列化的方法,通過比較這兩個方法序列化User對象a之后的長度和字符串內容可以看出Hessian去掉了原生序列化中沒有必要傳輸的大量字符。但通過序列化時間也可以看出java自身所帶的方法明顯比hessian自帶的序列化效率更高。這里也有實驗數據https://my.oschina.net/caomuquan/blog/378416。RPC中必須有的就是序列化,所以在RPC的序列化中產生了一系列的序列化框架,這些框架有什么區別呢?區別就是有的快、有的慢、有的大、有的小、有的效率高、有的效率低、有的用純文本、有的用xml。
說到這里大家是不是明白了幾個概念了,第一、RPC的基本概念,第二、RPC的序列化框架。

RPC10

RPC除了有序列化的一面還有什么呢?作為RPC中的Server來說,現在用的傳輸數據的方式是最基礎的TCP/IP,但是我們傳輸數據的協議有好多種是不是?我也可以用HTTP來傳輸我的字符串或者二進制,所以RPC的概念除了序列化之外還有RPC的網絡協議,這個你可以自己選。可以選HTTP,也可以選底層的TCP/IP,可以選WebService,甚至可以選mail協議。協議無所謂,無非就是通過這些協議把我們序列化好的數據發出去。不同的框架可能就使用的不同的協議,需要自己去選,去定義才行。

整理自馬士兵老師:《36行代碼透徹解析RPC》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM