Google的Protobuf在業界非常流行,很多商業項目選擇Protobuf作為編解碼框架,Protobuf的優點。
(1)在谷歌內部長期使用,產品成熟度高;
(2)跨語言,支持多種語言,包括C++、Java和Python;
(3)編碼后的消息更小,更加有利於存儲和傳輸;
(4)編解碼的性能非常高;
(5)支持不同協議版本的前向兼容;
(6)支持定義可選和必選字段。
Protobuf的入門
Protobuf是一個靈活、高效、結構化的數據序列化框架,相比於XML等傳統的序列化工具,它更小,更快,更簡單。Protobuf支持數據結構化一次可以到處使用,甚至跨語言使用,通過代碼生成工具可以自動生成不同語言版本的源代碼,甚至可以在使用不同版本的數據結構進程間進行數據傳遞,實現數據結構的前向兼容。
Netty的Protobuf應用開發


服務端代碼示例:
import cn.sf.redis.socket.javaser.SubReqServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class SubReqServer { public void bind(int port) throws Exception { // 配置服務端的NIO 線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(Channel ch) { //首先向ChannelPipeline添加ProtobufVarint32FrameDecoder,它主要用於半包處理 ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); //隨后繼續添加ProtobufDecoder解碼器,它的參數是com.google.protobuf.MessageLite, //實際上就是要告訴ProtobufDecoder需要解碼的目標類是什么, //否則僅僅從字節數組中是無法判斷出要解碼的目標類型信息的。 ch.pipeline().addLast( new ProtobufDecoder( SubscribeReqProto.SubscribeReq .getDefaultInstance())); ch.pipeline().addLast( new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqServerHandler()); } }); // 綁定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服務端監聽端口關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默認值 } } new SubReqServer().bind(port); } } import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; @ChannelHandler.Sharable public class SubReqServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //由於ProtobufDecoder已經對消息進行了自動解碼,因此接收到的訂購請求消息可以直接使用。 SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg; //對用戶名進行校驗,校驗通過后構造應答消息返回給客戶端 if ("Lilinfeng".equalsIgnoreCase(req.getUserName())) { System.out.println("Service accept client subscribe req : ["+ req.toString() + "]"); //由於使用了ProtobufEncoder,所以不需要對SubscribeRespProto.SubscribeResp進行手工編碼。 ctx.writeAndFlush(resp(req.getSubReqID())); } } private SubscribeRespProto.SubscribeResp resp(int subReqID) { SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder(); builder.setSubReqID(subReqID); builder.setRespCode(0); builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address"); return builder.build(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close();// 發生異常,關閉鏈路 } }
客戶端代碼示例:
import cn.sf.redis.socket.javaser.SubReqClientHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; public class SubReqClient { public void connect(int port, String host) throws Exception { // 配置客戶端NIO 線程組 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); //客戶端需要解碼的對象是訂購響應, //所以使用SubscribeResp Proto.SubscribeResp的實例做入參。 ch.pipeline().addLast( new ProtobufDecoder( SubscribeRespProto.SubscribeResp .getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new SubReqClientHandler()); } }); // 發起異步連接操作 ChannelFuture f = b.connect(host, port).sync(); // 等待客戶端鏈路關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放NIO 線程組 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默認值 } } new SubReqClient().connect(port, "127.0.0.1"); } } import com.google.common.collect.Lists; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.util.List; public class SubReqClientHandler extends ChannelHandlerAdapter { public SubReqClientHandler() { } @Override public void channelActive(ChannelHandlerContext ctx) { for (int i = 0; i < 10; i++) { ctx.write(subReq(i)); } ctx.flush(); } private SubscribeReqProto.SubscribeReq subReq(int i) { SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq .newBuilder(); builder.setSubReqID(i); builder.setUserName("Lilinfeng"); builder.setProductName("Netty Book For Protobuf"); List<String> address = Lists.newArrayList(); address.add("NanJing YuHuaTai"); address.add("BeiJing LiuLiChang"); address.add("ShenZhen HongShuLin"); builder.addAllAddress(address); return builder.build(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Receive server response : [" + msg + "]"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
運行結果:
服務端運行結果如下:
Service accept client subscribe req : [subReqID: 0
userName: "Lilinfeng"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChang"
address: "ShenZhen HongShuLin"
]
.....................................................................
Service accept client subscribe req : [subReqID: 9
userName: "Lilinfeng"
productName: "Netty Book For Protobuf"
address: "NanJing YuHuaTai"
address: "BeiJing LiuLiChang"
address: "ShenZhen HongShuLin"
]
客戶端運行結果如下。
Receive server response : [subReqID: 0
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
.....................................................................
Receive server response : [subReqID: 9
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
利用Netty提供的Protobuf編解碼能力,我們在不需要了解Protobuf實現和使用細節的情況下就能輕松支持Protobuf編解碼,可以方便地實現跨語言的遠程服務調用和與周邊的異構系統進行通信對接。
Protobuf的使用注意事項
ProtobufDecoder僅僅負責解碼,它不支持讀半包。因此,在ProtobufDecoder前面,一定要有能夠處理讀半包的解碼器,有三種方式可以選擇。
- 使用Netty提供的ProtobufVarint32FrameDecoder,它可以處理半包消息;
- 繼承Netty提供的通用半包解碼器LengthFieldBasedFrameDecoder;
- 繼承ByteToMessageDecoder類,自己處理半包消息。
如果你只使用ProtobufDecoder解碼器而忽略對半包消息的處理,程序是不能正常工作的。
服務端注釋掉ProtobufVarint32FramepDecoder:

