從零開始實現一個分布式RPC框架


該rpc框架是一個mini版的dubbo。學習rpc之前,建議先了解NIO,Netty和Dubbo等知識。請移步網絡編程

前言:(借用阿里大佬的一段話)

為什么要自己寫一個RPC框架,我覺得從個人成長上說,如果一個程序員能清楚的了解RPC框架所具備的要素,掌握RPC框架中涉及的服務注冊發現、負載均衡、序列化協議、RPC通信協議、Socket通信、異步調用、熔斷降級等技術,可以全方位的提升基本素質。雖然也有相關源碼,但是只看源碼容易眼高手低,動手寫一個才是自己真正掌握這門技術的最優路徑。

一.概述

什么是RPC?

  • 遠程服務調用
  • 官方:一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的思想
  • 通俗一點:客戶端在不知道調用細節的情況下,調用存在於遠程計算機上的某個對象,就像調用本地應用程序中的對象一樣。
  • 市面上常見的rpc框架:dobbo,springCloud,gRPC...

那為什么要有 RPC,HTTP 不好么?

  • 因為 RPC 和 HTTP 就不是一個層級的東西,所以嚴格意義上這兩個沒有可比性,也不應該來作比較。
  • HTTP 只是傳輸協議,協議只是規范了一定的交流格式
  • RPC 對比的是本地過程調用,是用來作為分布式系統之間的通信,它可以用 HTTP 來傳輸,也可以基於 TCP 自定義協議傳輸。
  • HTTP 協議比較冗余,所以 RPC 大多都是基於 TCP 自定義協議,定制化的才是最適合自己的。

項目總體結構


整體架構

 接下來,分別解釋上述的過程

 二.自定義注解

服務的提供者和消費者公用一個接口,@ServiceExpose是為了暴露服務,放在生產者的某個實現類上;@ServiceReference是為了引用服務,放在消費者的需要注入的屬性上。

  • Target:指定被修飾的Annotation可以放置的位置(被修飾的目標)
    • @Target(ElementType.TYPE)                                //接口、類

    • @Target(ElementType.FIELD)                               //屬性

    • @Target(ElementType.METHOD)                           //方法

    •  @Target(ElementType.PARAMETER)                   //方法參數
    •  @Target(ElementType.CONSTRUCTOR)             //構造函數

    • @Target(ElementType.LOCAL_VARIABLE)          //局部變量

    •  @Target(ElementType.ANNOTATION_TYPE)       //注解

    • @Target(ElementType.PACKAGE)                         //包

  • Retention:定義注解的保留策略
    • @Retention(RetentionPolicy.SOURCE)             //注解僅存在於源碼中,在class字節碼文件中不包含

    • @Retention(RetentionPolicy.CLASS)              //默認的保留策略,注解會在class字節碼文件中存在,但運行時無法獲得

    • @Retention(RetentionPolicy.RUNTIME)            //注解會在class字節碼文件中存在,在運行時可以通過反射獲取到

  • Documented:指定被修飾的該Annotation可以被javadoc工具提取成文檔
  • Inherited:指定被修飾的Annotation將具有繼承性

 二.啟動配置

主要是加載一些rpc相關的配置類,使用SpringBoot自動裝配。可以使用SPI機制加入一些自定義的類,放到指定文件夾中。

 三.rpc接口注入/rpc服務掃描

這里主要就是通過反射獲得對應注解的屬性/類,進行服務暴露/服務引用。 這里需要關注的是什么時候進行服務暴露/引用?如下:

  • 客戶端:一般有倆種方案
    • 餓漢式:餓漢式是通過實現 Spring 的InitializingBean接口中的 afterPropertiesSet方法,容器通過調用 ReferenceBean的 afterPropertiesSet方法時引入服務。(在Spring啟動時,給所有的屬性注入實現類,包含遠程和本地的實現類)
    • 懶漢式:只有當這個服務被注入到其他類中時啟動引入流程,也就是說用到了才會開始服務引入。
      • 在應用的Spring IOC 容器刷新完畢(spring Context初始化)之后,掃描所有的Bean,將Bean中帶有@ServiceExpose/@ServiceReference注解的field獲取到,然后創建field類型的代理對象,創建完成后,將代理對象set給此field。后續就通過該代理對象創建服務端連接,並發起調用。(dubbo默認)
  • 服務端:與懶漢式一樣。

 那么怎么知道Spring IOC刷新完成,這里就使用一個Spring提供的監聽器,當Spring IOC刷新完成,就會觸發監聽器。

 四.服務注冊到ZK/從Zk獲得服務

Zookeeper采用節點樹的數據模型,類似linux文件系統,/,/node1,/node2 比較簡單。不懂Zookeeper請移步:Zookeeper原理

 我們采用的是對每個服務名創建一個持久節點,服務注冊時實際上就是在zookeeper中該持久節點下創建了一個臨時節點,該臨時節點存儲了服務的IP、端口、序列化方式等。

 客戶端獲取服務時通過獲取持久節點下的臨時節點列表,解析服務地址數據:

 客戶端監聽服務變化:

 五.生成代理類對象

這里使用JDK的動態代理,也可以使用cglib或者Javassist(dobbo使用)。

public class ClientProxyFactory {
    /**
     * 獲取代理對象,綁定 invoke 行為
     *
     * @param clazz 接口 class 對象
     * @param <T>   類型
     * @return 代理對象
     */public <T> T getProxyInstance(Class<T> clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
            final Random random = new Random();

            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 第一步:通過服務發現機制選擇一個服務提供者暴露的服務
                String serviceName = clazz.getName();
                final List<ServiceInfo> serviceInfos = serviceDiscovery.listServices(serviceName);
                logger.info("Rpc server instance list: {}", serviceInfos);
                if (CollectionUtils.isEmpty(serviceInfos)) {
                    throw new RpcException("No rpc servers found.");
                }

                // TODO: 這里模擬負載均衡,從多個服務提供者暴露的服務中隨機挑選一個,后期寫方法實現負載均衡
                final ServiceInfo serviceInfo = serviceInfos.get(random.nextInt(serviceInfos.size()));

                // 第二步:構造 rpc 請求對象
                final RpcRequest rpcRequest = new RpcRequest();
                rpcRequest.setServiceName(serviceName);
                rpcRequest.setMethod(method.getName());
                rpcRequest.setParameterTypes(method.getParameterTypes());
                rpcRequest.setParameters(args);

                // 第三步:編碼請求消息, TODO: 這里可以配置多種編碼方式
                byte[] data = messageProtocol.marshallingReqMessage(rpcRequest);

                // 第四步:調用 rpc client 開始發送消息
                byte[] byteResponse = rpcClient.sendMessage(data, serviceInfo);

                // 第五步:解碼響應消息
                final RpcResponse rpcResponse = messageProtocol.unmarshallingRespMessage(byteResponse);

                // 第六步:解析返回結果進行處理
                if (rpcResponse.getException() != null) {
                    throw rpcResponse.getException();
                }
                return rpcResponse.getRetValue();
            }
        });
    }
}
六.負載均衡

本實現支持兩種主要負載均衡策略,隨機輪詢,其中他們都支持帶權重的隨機和輪詢,其實也就是四種策略。

七.Netty通信

服務端和客戶端基本一樣,這里只展示服務端的代碼。代理對象在Spring啟動的時候就生成了,但是沒有調用,每一個調用(請求)都會生成一個Netty的連接。

public class NettyRpcServer extends RpcServer {
   
    @Override
    public void start() {
        // 創建兩個線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 創建服務端的啟動對象
            ServerBootstrap serverBootstrap = new ServerBootstrap()
                    // 設置兩個線程組
                    .group(bossGroup, workerGroup)
                    // 設置服務端通道實現類型
                    .channel(NioServerSocketChannel.class)
                    // 服務端用於接收進來的連接,也就是boosGroup線程, 線程隊列大小
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    // child 通道,worker 線程處理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        // 給 pipeline 管道設置自定義的處理器
                        @Override
                        public void initChannel(SocketChannel channel) {
                            ChannelPipeline pipeline = channel.pipeline();
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });

            // 綁定端口號,同步啟動服務
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channel = channelFuture.channel();
            // 對關閉通道進行監聽,變為同步
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            logger.error("server error.", e);
        } finally {
            // 釋放線程組資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

 實現具體handler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    //當通道就緒就會觸發該方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //進行記錄
        logger.info("channel active: {}", ctx);
    }

    //讀取數據實際(這里我們可以讀取客戶端發送的消息)
    @Override
    public void channelRead(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
        //將數據讀到buffer中
        final ByteBuf msgBuf = (ByteBuf) msg;
        final byte[] reqBytes = new byte[msgBuf.readableBytes()];
        msgBuf.readBytes(reqBytes);
    }

    //數據讀取完畢
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //使用反射獲找到目標方法進行返回
        final byte[] respBytes = requestHandler.handleRequest(reqBytes);
        ctx.writeAndFlush(respBytes);
    }

    //處理異常, 一般是需要關閉通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
  八.序列化協議

 對計算機網絡稍微有一點了解的同學都知道,數據在網絡中傳輸是二進制的:01010101010101010,類似這種,只有二進制數據才能在網絡中傳輸。但是在編碼之前我們一般先進行序列化,目的是為了優化傳輸的數據量。因為有的數據太大,需要進行空間優化。

那么我們來區分一下序列化和編碼:我畫一張圖大家都全明白了

 


定義一個序列化協議,放入作為一個handler放入pipeline中。

Netty支持多種序列化,比如jdk,Json,ProtoBuf 等,這里使用ProtoBuf,其序列化后碼流小性能高,非常適合RPC調用。接下來看怎么使用ProtoBuf?

  • 1.編寫需要序列化的類xxx.proto:ProtoBuf有自己的語法規則(自行百度)

  • 2.通過官網提供的protoc.exe生成對應的Java代碼
  • 3.前面通過工具生成的代碼(AnimalProto)已經幫我們封裝好了序列化和反序列化的方法,我們只需要調用對應方法即可

引入Protobuf的依賴

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>2.4.1</version>
</dependency>

序列化:

/**
 * 調用對象構造好的Builder,完成屬性賦值和序列化操作
 * @return
 */
public static byte[] protobufSerializer(){
    AnimalProto.Animal.Builder builder = AnimalProto.Animal.newBuilder();
    builder.setId(1L);
    builder.setName("小豬");
    List<String> actions = new ArrayList<>();
    actions.add("eat");
    actions.add("run");
    builder.addAllActions(actions);
    return builder.build().toByteArray();
}

反序列化:

/**
 * 通過調用parseFrom則完成反序列化
 * @param bytes
 * @return
 * @throws InvalidProtocolBufferException
 */
public static Animal deserialize(byte[] bytes) throws Exception {
    AnimalProto.Animal pAnimal = AnimalProto.Animal.parseFrom(bytes);
    Animal animal = new Animal();
    animal.setId(pAnimal.getId());
    animal.setName(pAnimal.getName());
    animal.setActions(pAnimal.getActionsList());
    return animal;
}

測試:

public static void main(String[] args) throws Exception {
    byte[] bytes = serializer();
    Animal animal = deserialize(bytes);
    System.out.println(animal);
}

以下看到是能正常序列化和反序列化的:

 九.通信協議

通信協議主要是解決網絡傳輸問題,比如TCP拆包粘包問題。

TCP問題:

  • TCP拆包粘包主要就是把一些數據合並或者分割開進行發送,這時候有的數據就不完整,有的數據就多出一部分,就會造成問題。一般使用TCP協議都需要考慮拆包粘包問題
  • tcp粘包和半包問題就是因為滑動窗口。 因為不管你的數據是多少長度,怎么分割每一條數據。但是tcp只按照我滑動窗口的長度發送。
  • 本質是因為TCP是流式協議,消息無邊界

解決方案:業界的主流協議的解決方案可以歸納如下

  • 消息定長:例如每個報文的大小為固定長度100字節,如果不夠用空格補足。(定長解碼器)
  • 在包尾加特殊結束符進行分割。(分隔符編碼器)

 

  •  消息長度+消息:將消息分為消息頭和消息體,消息頭中包含表示消息總長度(或者消息體長度)的字段。
    • Netty自帶:

    • 自定義編解碼器

 

 這里只是列舉出來編碼過程,解碼是逆過程。(說白了,編碼就是找着固定的格式進行寫入,解碼就是照着固定的格式讀)


恭喜你,已經學會寫RPC框架了,想深入了解的朋友可以參照源碼。進行學習,升級。

該rpc最終打成一個Spring Boot starter,如果不會的請參照手寫一個Spring Boot starter

 

寄語:生命只有一次,你要活得暢快淋漓

參考文章:

https://mp.weixin.qq.com/s/yaIOCfEigkQMm2kt6I7Orw

https://mp.weixin.qq.com/s/ltos1nEgktec5pn47xAgMw


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM