Netty學習——Netty和Protobuf的整合
Protobuf作為序列化的工具,將序列化后的數據,通過Netty來進行在網絡上的傳輸
1.將proto文件里的java包的位置修改一下,然后再執行一下protoc
異常捕獲:啟動服務器端正常,在啟動客戶端的時候,發送消息,報錯
警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. io.netty.handler.codec.DecoderException: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero). at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:98) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:313) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:427) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero). at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:102) at com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:627) at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:109) at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:69) at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:881) at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:875) at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:197) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48) at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:121) at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:65) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88) ... 23 more 十二月 03, 2019 11:01:51 上午 io.netty.channel.DefaultChannelPipeline onUnhandledInboundException 警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. io.netty.handler.codec.DecoderException: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length. at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:98) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) Caused by: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length. at com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:84) at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawByte(CodedInputStream.java:1238) at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint64SlowPath(CodedInputStream.java:1126) at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint32(CodedInputStream.java:1020) at com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:623) at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:109) at com.dawa.netty.sixthexample.MyDataInfo$Person.<init>(MyDataInfo.java:69) at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:881) at com.dawa.netty.sixthexample.MyDataInfo$Person$1.parsePartialFrom(MyDataInfo.java:875) at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:197) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48) at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:121) at io.netty.handler.codec.protobuf.ProtobufDecoder.decode(ProtobufDecoder.java:65) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88) ... 21 more
開始找問題:
1.io.netty.handler.codec.DecoderException: 在解碼過程中出錯,可能是服務器端出的錯
2.Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
3.While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.
檢查了一下代碼:猜測了一下:成了,效果圖如下
可怕,原因竟然出現在編碼器添加的順序不同導致的: 正確的順序是這樣的:
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //這里和之前的不一樣了,但也就是處理器的不一樣 //編解碼處理器,protobuf提供的專門的編解碼器.4個處理器 pipeline.addLast(new ProtobufVarint32FrameDecoder()); //Decoder是重點,解碼器,將字節碼轉換成想要的數據類型 //參數 messageLite,外層的要轉換的類的實例 pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance())); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new TestServerHandler()); }
接下來存放一下完整的客戶端和服務器端的代碼
proto文件
yntax ="proto2"; package com.dawa.protobuf; option optimize_for = SPEED; option java_package ="com.dawa.netty.sixthexample"; option java_outer_classname = "MyDataInfo"; message Person{ required string name = 1; optional int32 age = 2; optional string address = 3; } message Person2{ required string name = 1; optional int32 age = 2; optional string address = 3; }
服務器端代碼:
package com.dawa.netty.sixthexample; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * @Title: TestServer * @Author: 大娃 * @Date: 2019/12/3 10:01 * @Description: */ public class TestServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new TestServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(8899).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.dawa.netty.sixthexample; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * @Title: TestServerHandler * @Author: 大娃 * @Date: 2019/12/3 10:09 * @Description: handler本身是個泛型,這里的泛型就是取 要處理的類型 */ public class TestServerHandler extends SimpleChannelInboundHandler<MyDataInfo.Person> { @Override protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception { //對接受到的消息進行處理: System.out.println(msg.getName()); System.out.println(msg.getAge()); System.out.println(msg.getAddress()); } }
package com.dawa.netty.sixthexample; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; /** * @Title: TestServerInitializer * @Author: 大娃 * @Date: 2019/12/3 10:05 * @Description: */ public class TestServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //這里和之前的不一樣了,但也就是處理器的不一樣 //編解碼處理器,protobuf提供的專門的編解碼器.4個處理器 pipeline.addLast(new ProtobufVarint32FrameDecoder()); //Decoder是重點,解碼器,將字節碼轉換成想要的數據類型 //參數 messageLite,外層的要轉換的類的實例 pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance())); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new TestServerHandler()); } }
客戶端代碼
package com.dawa.netty.sixthexample; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; /** * @Title: TestClient * @Author: 大娃 * @Date: 2019/12/3 10:15 * @Description: 客戶端內容 */ public class TestClient { public static void main(String[] args) throws Exception { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .handler(new TestClientInitializer()); //注意此處,使用的是connect,不是使用的bind ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync(); channelFuture.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully(); } } }
package com.dawa.netty.sixthexample; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; /** * @Title: TestClientInitializer * @Author: 大娃 * @Date: 2019/12/3 10:43 * @Description: */ public class TestClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //這里和之前的不一樣了,但也就是處理器的不一樣 //編解碼處理器,protobuf提供的專門的編解碼器.4個處理器 pipeline.addLast(new ProtobufVarint32FrameDecoder()); //Decoder是重點,解碼器,將字節碼轉換成想要的數據類型 //參數 messageLite,外層的要轉換的類的實例 pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance())); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); //自己的處理器 pipeline.addLast(new TestClientHandler()); } }
package com.dawa.netty.sixthexample; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * @Title: TestClientHandler * @Author: 大娃 * @Date: 2019/12/3 10:44 * @Description: */ public class TestClientHandler extends SimpleChannelInboundHandler<MyDataInfo.Person> { @Override protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception { } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //處於活動狀態 MyDataInfo.Person person = MyDataInfo.Person.newBuilder().setName("大娃").setAge(22).setAddress("北京").build(); ctx.channel().writeAndFlush(person); } }
這程序是有瑕疵的,解碼器那里不通用,耦合性太強,有兩個很明顯的問題,但是要怎么解決呢?
會專門寫個帖子進行分析,Netty學習——Netty和Protobuf的整合(二)