netty 詳解(五)netty 使用 protobuf 序列化


錄:

1、編碼和解碼
2、Google Protobuf 介紹
3、案例--netty 使用 protobuf 序列化
    3.1、編寫 .proto 文件
    3.2、自動生成代碼
    3.3、netty 通過 Protobuf 傳遞消息
4、netty 使用 protobuf 傳輸多種類型對象

 

1、編碼和解碼    <--返回目錄

  編寫網絡應用程序時,因為數據在網絡中傳輸的都是二進制字節碼數據,在發送數據時就需要編碼,接收數據時就需要解碼。

  codec(編解碼器) 的組成部分有兩個:decoder(解碼器) 和 encoder(編碼器)。encoder 負責把業務數據轉換成字節碼數據,decoder 負責把字節碼數據轉換成業務數據。

  netty 提供的 StringEncoder/StringDecoder 是對字符串數據進行編解碼;ObjectEncoder/ObjectDecoder 是對 Java 對象進行編解碼。

  ObjectEncoder/ObjectDecoder 可以用來實現 POJO 對象或各種業務對象的編解碼,底層使用的是 Java 序列化技術,而 Java 序列化技術本身效率不高,並存在如下問題:

  • 無法跨語言
  • 序列化后的體積太大,是二進制編碼的 5 倍多
  • 序列化性能低

  所以,引出新的解決方案:Google 的 Protobuf。

 

2、Google Protobuf 介紹    <--返回目錄

  參考文檔:https://developers.google.com/protocol-buffers/docs/proto

  Protobuf 是 Google 發布的開源項目,全稱 Google Protocol Buffers,是一種輕便高效的結構化數據存儲格式,可以用於結構化數據串行化或者說序列化。它很適合做數據存儲或 RPC 數據交換格式。

  支持跨平台/跨語言,支持絕大數語言,例如 c++,c#, Java, python 等。‘

 

  Protobuf 自動生成代碼:

  • 使用 Protobuf 編譯器自動生成代碼,Protobuf 是將類的定義使用 .proto 文件進行描述。在 IDEA 中編寫 .proto 文件時,會自動提示是否下載 .protot 編寫插件(protobuf support 插件),可以讓語法高亮。
  • 然后通過 protoc.exe 編譯器根據 .proto 自動生成 .java 文件

 

  自動生成 .java 文件 參考:(注意 .proto 文件放在 src/main/proto 目錄下)

 

3、案例--netty 使用 protobuf 序列化    <--返回目錄

  需求:

  1)客戶端可以發送一個 User POJO 對象到服務器(通過 protobuf 編碼);

  2)服務端能接收 User POJO 對象,並顯示信息(通過 protobuf 解碼);

 

3.1、編寫 .proto 文件    <--返回目錄

  src/main/proto/User.proto

syntax = "proto3";
option java_package = "com.oy.protobuf";
option java_outer_classname = "UserModel";// 生成的外部類名,同時也是文件名

// protobuf 使用 message 管理數據
message User { // 會在 UserModel 里面生成一個內部類 User,即是真正發送的 POJO 對象
    int32 id = 1;
    string name = 2;
}

 

3.2、自動生成代碼    <--返回目錄

  pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>netty-helloworld</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
        <grpc.version>1.14.0</grpc.version>
        <protobuf.version>3.3.0</protobuf.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.22.Final</version>
        </dependency>

        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty</artifactId>
            <version>${grpc.version}</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>${grpc.version}</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>${grpc.version}</version>
        </dependency>

        <!--<dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>${protobuf.version}</version>
        </dependency>-->
    </dependencies>

    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.5.0.Final</version>
            </extension>
        </extensions>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.1</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:3.5.1:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.14.0:exe:${os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
View Code

  執行 mvn clean compile 命令

 

3.3、netty 通過 Protobuf 傳遞消息    <--返回目錄

   Server

package com.oy.protobuf;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;
import java.util.Date;

public class Server {
    private int port;

    public static void main(String[] args) {
        new Server(8003).start();
    }

    public Server(int port) {
        this.port = port;
    }

    public void start() {
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup work = new NioEventLoopGroup();

        try {
            ServerBootstrap server = new ServerBootstrap()
                    .group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    //.localAddress(new InetSocketAddress(port))
                    //.option(ChannelOption.SO_BACKLOG, 128)
                    //.childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("encoder", new ProtobufEncoder()); // protobuf 編碼器
                            // 需要指定要對哪種對象進行解碼
                            pipeline.addLast("decoder", new ProtobufDecoder(UserModel.User.getDefaultInstance()));
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });

            // 綁定端口
            ChannelFuture future = server.bind(port).sync();
            System.out.println("server started and listen " + port);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }

    public static class NettyServerHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("HelloWorldServerHandler active");
        }

        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("server channelRead...");
            // 讀取客戶端發送的數據 UserMOdel.User
            UserModel.User user = (UserModel.User) msg; System.out.println("客戶端發送的數據: " + user.getId() + "--" + user.getName());
        }

        /**
         * 數據讀取完畢
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            UserModel.User user = UserModel.User.newBuilder().setId(20).setName("服務器").build(); ctx.writeAndFlush(user);
        }

        /**
         * 處理異常,關閉通道
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.channel().close();
        }
    }
}

 

  Client

package com.oy.protobuf;

import com.oy.helloworld.NettyClient;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.util.CharsetUtil;

public class Client {
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 8003;

    public static void main(String[] args) {
        new Client().start(HOST, PORT);
    }

    public void start(String host, int port) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap client = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("encoder", new ProtobufEncoder()); // protobuf 編碼器
                            // 需要指定要對哪種對象進行解碼
                            pipeline.addLast("decoder", new ProtobufDecoder(UserModel.User.getDefaultInstance()));
                            pipeline.addLast();
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });

            ChannelFuture future = client.connect(host, port).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static class NettyClientHandler extends ChannelInboundHandlerAdapter {
        /**
         * 通道就緒觸發該方法
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("HelloWorldClientHandler Active");
            // 發送 User POJO 對象到服務器
            UserModel.User user = UserModel.User.newBuilder().setId(10).setName("客戶端張三").build(); ctx.writeAndFlush(user);
        }

        /**
         * 當通道有讀取事件時觸發該方法
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 讀取服務器發送的數據 UserMOdel.User
            UserModel.User user = (UserModel.User) msg; System.out.println("收到服務器響應: " + user.getId() + "--" + user.getName());
        }
    }
}

  啟動服務端和客戶端程序:

 

 

 

4、netty 使用 protobuf 傳輸多種類型對象    <--返回目錄

  MyDataInfo.proto

  • 使用 message 管理其他的 message
  • 定義的 MyMessage 的第一個參數是枚舉類型,標識傳遞的是 Student 還是 Teacher;第二個參數是 Student 或 Teacher 中的一個;
syntax = "proto3";
option optimize_for = SPEED; // 加快解析
option java_package = "com.oy.protobuf2";
option java_outer_classname = "MyDataInfo";// 生成的外部類名,同時也是文件名

// protobuf 可以使用 message 管理其他的 message
message MyMessage {
    // 定義一個枚舉類型
    enum DataType {
        teacherType = 0;
        studentType = 1;
    }
    // 用 data_type 來標識傳的是哪一個枚舉類型
    DataType data_type = 1;
    // 表示每次枚舉類型最多只能出現其中一個,節省空間
    oneof dataBody {
        Teacher teacher = 2;
        Student student = 3;
    }
}

// protobuf 使用 message 管理數據
message Teacher {
    int32 id = 1;
    string name = 2;
}
message Student {
    int32 id = 1;
    string name = 2;
}

 

  Server

package com.oy.protobuf2;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;

public class Server {
    private int port;

    public static void main(String[] args) {
        new Server(8004).start();
    }

    public Server(int port) {
        this.port = port;
    }

    public void start() {
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup work = new NioEventLoopGroup();

        try {
            ServerBootstrap server = new ServerBootstrap()
                    .group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("encoder", new ProtobufEncoder()); // protobuf 編碼器
                            // 需要指定要對哪種對象進行解碼
                            pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });

            // 綁定端口
            ChannelFuture future = server.bind(port).sync();
            System.out.println("server started and listen " + port);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }

    public static class NettyServerHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("HelloWorldServerHandler active");
        }

        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("server channelRead...");
            // 讀取客戶端發送的數據 Student POJO
            MyDataInfo.MyMessage myMessage = (MyDataInfo.MyMessage) msg;
            System.out.println("客戶端發送的數據: " + myMessage.getDataType() + "--"
                    + myMessage.getStudent().getId() + "--" + myMessage.getStudent().getName());
        }

        /**
         * 數據讀取完畢
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 服務端返回 Teacher POJO
            MyDataInfo.Teacher teacher = MyDataInfo.Teacher.newBuilder().setId(222).setName("老師").build();
            MyDataInfo.MyMessage myMessage = MyDataInfo.MyMessage.newBuilder()
                    .setDataType(MyDataInfo.MyMessage.DataType.teacherType).setTeacher(teacher)
                    .build();
            ctx.writeAndFlush(myMessage);
        }

        /**
         * 處理異常,關閉通道
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.channel().close();
        }
    }
}

 

  Client

package com.oy.protobuf2;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;

public class Client {
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 8004;

    public static void main(String[] args) {
        new Client().start(HOST, PORT);
    }

    public void start(String host, int port) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap client = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("encoder", new ProtobufEncoder()); // protobuf 編碼器
                            // 需要指定要對哪種對象進行解碼
                            pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
                            pipeline.addLast();
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });

            ChannelFuture future = client.connect(host, port).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static class NettyClientHandler extends ChannelInboundHandlerAdapter {
        /**
         * 通道就緒觸發該方法
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("HelloWorldClientHandler Active");
            // 發送 Student POJO 對象到服務器
            MyDataInfo.Student student = MyDataInfo.Student.newBuilder().setId(111).setName("學生").build();
            MyDataInfo.MyMessage myMessage = MyDataInfo.MyMessage.newBuilder()
                    .setDataType(MyDataInfo.MyMessage.DataType.studentType).setStudent(student)
                    .build();
            ctx.writeAndFlush(myMessage);
        }

        /**
         * 當通道有讀取事件時觸發該方法
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 讀取服務器發送的數據,服務器發送的是 Teacher POJO
            MyDataInfo.MyMessage myMessage = (MyDataInfo.MyMessage) msg;
            System.out.println("服務器返回的數據: " + myMessage.getDataType() + "--"
                    + myMessage.getTeacher().getId() + "--" + myMessage.getTeacher().getName());
        }
    }
}

 

  測試:客戶端給服務器發送的是 Student POJO,服務器返回的是 Teacher POJO

 

 ---


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM