Netty學習——Netty和Protobuf的整合(一)


Netty學習——Netty和Protobuf的整合


Protobuf作為序列化的工具,將序列化后的數據,通過Netty來進行在網絡上的傳輸

1.將proto文件里的java包的位置修改一下,然后再執行一下protoc

 

 

 異常捕獲:啟動服務器端正常,在啟動客戶端的時候,發送消息,報錯

警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:98)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:313)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:427)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
    at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
    at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:102)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:627)
    at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:109)
    at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:69)
    at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:881)
    at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:875)
    at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:197)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
    at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:121)
    at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:65)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
    ... 23 more

十二月 03, 2019 11:01:51 上午 io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field.  This could mean either that the input has been truncated or that an embedded message misreported its own length.
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:98)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
    at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field.  This could mean either that the input has been truncated or that an embedded message misreported its own length.
    at com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:84)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawByte(CodedInputStream.java:1238)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint64SlowPath(CodedInputStream.java:1126)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint32(CodedInputStream.java:1020)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:623)
    at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:109)
    at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:69)
    at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:881)
    at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:875)
    at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:197)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
    at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:121)
    at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:65)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
    ... 21 more

開始找問題:
1.io.netty.handler.codec.DecoderException: 在解碼過程中出錯,可能是服務器端出的錯
2.Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
3.While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.

檢查了一下代碼:猜測了一下:成了,效果圖如下

 

 

 可怕,原因竟然出現在編碼器添加的順序不同導致的: 正確的順序是這樣的:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    //這里和之前的不一樣了,但也就是處理器的不一樣
    //編解碼處理器,protobuf提供的專門的編解碼器.4個處理器
    pipeline.addLast(new ProtobufVarint32FrameDecoder());
    //Decoder是重點,解碼器,將字節碼轉換成想要的數據類型
    //參數  messageLite,外層的要轉換的類的實例
    pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
    pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
    pipeline.addLast(new ProtobufEncoder());
    pipeline.addLast(new TestServerHandler());
}

接下來存放一下完整的客戶端和服務器端的代碼
proto文件

yntax ="proto2";

package com.dawa.protobuf;

option optimize_for = SPEED;
option java_package ="com.dawa.netty.sixthexample";
option java_outer_classname = "MyDataInfo";

message Person{
    required string name = 1;
    optional int32 age = 2;
    optional string address = 3;
}

message Person2{
    required string name = 1;
    optional int32 age = 2;
    optional string address = 3;
}

服務器端代碼:

package com.dawa.netty.sixthexample;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * @Title: TestServer
 * @Author: 大娃
 * @Date: 2019/12/3 10:01
 * @Description:
 */
public class TestServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new TestServerInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
package com.dawa.netty.sixthexample;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @Title: TestServerHandler
 * @Author: 大娃
 * @Date: 2019/12/3 10:09
 * @Description: handler本身是個泛型,這里的泛型就是取 要處理的類型
 */
public class TestServerHandler extends SimpleChannelInboundHandler<MyDataInfo.Person> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {
        //對接受到的消息進行處理:
        System.out.println(msg.getName());
        System.out.println(msg.getAge());
        System.out.println(msg.getAddress());
    }
}
package com.dawa.netty.sixthexample;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

/**
 * @Title: TestServerInitializer
 * @Author: 大娃
 * @Date: 2019/12/3 10:05
 * @Description:
 */
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //這里和之前的不一樣了,但也就是處理器的不一樣
        //編解碼處理器,protobuf提供的專門的編解碼器.4個處理器
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        //Decoder是重點,解碼器,將字節碼轉換成想要的數據類型
        //參數  messageLite,外層的要轉換的類的實例
        pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast(new ProtobufEncoder());
        pipeline.addLast(new TestServerHandler());
    }
}

客戶端代碼

package com.dawa.netty.sixthexample;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @Title: TestClient
 * @Author: 大娃
 * @Date: 2019/12/3 10:15
 * @Description: 客戶端內容
 */
public class TestClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
            .handler(new TestClientInitializer());
            //注意此處,使用的是connect,不是使用的bind
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}
package com.dawa.netty.sixthexample;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

/**
 * @Title: TestClientInitializer
 * @Author: 大娃
 * @Date: 2019/12/3 10:43
 * @Description:
 */
public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //這里和之前的不一樣了,但也就是處理器的不一樣
        //編解碼處理器,protobuf提供的專門的編解碼器.4個處理器
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        //Decoder是重點,解碼器,將字節碼轉換成想要的數據類型
        //參數  messageLite,外層的要轉換的類的實例
        pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast(new ProtobufEncoder());
        //自己的處理器
        pipeline.addLast(new TestClientHandler());
    }
}
package com.dawa.netty.sixthexample;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @Title: TestClientHandler
 * @Author: 大娃
 * @Date: 2019/12/3 10:44
 * @Description:
 */
public class TestClientHandler extends SimpleChannelInboundHandler<MyDataInfo.Person> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //處於活動狀態
        MyDataInfo.Person person = MyDataInfo.Person.newBuilder().setName("大娃").setAge(22).setAddress("北京").build();
        ctx.channel().writeAndFlush(person);
    }
}

這程序是有瑕疵的,解碼器那里不通用,耦合性太強,有兩個很明顯的問題,但是要怎么解決呢?

會專門寫個帖子進行分析,Netty學習——Netty和Protobuf的整合(二)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM