netty 默認支持protobuf 的封裝與解碼,如果通信雙方都使用netty則沒有什么障礙,但如果客戶端是其它語言(C#)則需要自己仿寫與netty一致的方式(解碼+封裝),提前是必須很了解netty是如何進行封裝與解碼的。這里主要通過讀源碼主要類ProtobufVarint32FrameDecoder(解碼)+ProtobufVarint32LengthFieldPrepender(封裝) 來解析其原理與實現。
一,支持protobuf 協議的默認實現
//配置服務端NIO線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder( SubscribeReqProto.SubscribeReq.getDefaultInstance())) .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) .addLast(new SubReqServerHandler()); } }); //綁定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); //等待服務端監聽端口關閉 f.channel().closeFuture().sync(); }finally{ //退出時釋放資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
以上是提供的默認實現。關鍵在於ProtobufVarint32FrameDecoder,ProtobufVarint32LengthFieldPrepender類。
二,ProtobufVarint32LengthFieldPrepender 編碼類
An encoder that prepends the the Google Protocol Buffers 128 Varints integer length field.
* BEFORE DECODE (300 bytes) AFTER DECODE (302 bytes) * +---------------+ +--------+---------------+ * | Protobuf Data |-------------->| Length | Protobuf Data | * | (300 bytes) | | 0xAC02 | (300 bytes) | * +---------------+ +--------+---------------+
從類的說明來看, proto 消息格式如:Length + Protobuf Data (消息頭+消息數據) 方式,這里特別需要注意的是頭長使用的是varints方式不是int ,消息頭描述消息數據體的長度。為了更減少傳輸量,消息頭采用的是varint 格式。
什么是varint?
Varint 是一種緊湊的表示數字的方法。它用一個或多個字節來表示一個數字,值越小的數字使用越少的字節數。這能減少用來表示數字的字節數。 Varint 中的每個 byte 的最高位 bit 有特殊的含義,如果該位為 1,表示后續的 byte 也是該數字的一部分,如果該位為 0,則結束。其他的 7 個 bit 都用來表示數字。因此小於 128 的數字都可以用一個 byte 表示。大於 128 的數字,會用兩個字節。
更多可參見我上篇文章
最大的區別是消息頭它不是固定長度(常見是的使用INT 4個字節固定長度),Varint它用一個或多個字節來表示一個數字決定它不是固定長度!
ProtobufVarint32LengthFieldPrepender 類的主要方法如下:
@Override protected void encode( ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { int bodyLen = msg.readableBytes(); int headerLen = CodedOutputStream.computeRawVarint32Size(bodyLen); out.ensureWritable(headerLen + bodyLen); CodedOutputStream headerOut = CodedOutputStream.newInstance(new ByteBufOutputStream(out), headerLen); headerOut.writeRawVarint32(bodyLen); headerOut.flush(); out.writeBytes(msg, msg.readerIndex(), bodyLen); }
CodedOutputStream 主要是針對與varints相關操作類。 先看是如何寫消息頭的,得到bodyLen 消息體長度然后調用computeRawVarint32Size()計算需要多少個字節,
public static int computeRawVarint32Size(final int value) { if ((value & (0xffffffff << 7)) == 0) return 1; if ((value & (0xffffffff << 14)) == 0) return 2; if ((value & (0xffffffff << 21)) == 0) return 3; if ((value & (0xffffffff << 28)) == 0) return 4; return 5; }
0xffffffff << 7 二進制表示11111111111111111111111110000000 ,當與value &計算=0則表示value最大只會是000000000000000000000001111111,一個字節足以。
通過&運算得出使用多少個字節就可以表示當前數字。左移7位是與Varint定義相關,第一位需要保留給標識(1表示后續的 byte 也是該數字的一部分,0則結束)。要表示 int 32位 和多加的每個字節第一個標識位,多出了4位,所以就最大會有5個字節。
得到了varints值,然后如何寫入out? 再看關鍵方法writeRawVarint32()。
public void writeRawVarint32(int value) throws IOException { while (true) { //0x7F為127 if ((value & ~0x7F) == 0) {//是否小於127,小於則一個字節就可以表示了 writeRawByte(value); return; } else { writeRawByte((value & 0x7F) | 0x80);//因不於小127,加一高位標識 value >>>= 7;//右移7位,再遞歸 } } } /** Write a single byte. */ public void writeRawByte(final byte value) throws IOException { if (position == limit) { refreshBuffer(); } buffer[position++] = value; } private void refreshBuffer() throws IOException { if (output == null) { // We're writing to a single buffer. throw new OutOfSpaceException(); } // Since we have an output stream, this is our buffer // and buffer offset == 0 output.write(buffer, 0, position); position = 0; }
byte 的取值(-128~127) , 0x7F為127 , 0x80為128
循環取后7位,如果小於127則結束,不小於第一位加標識位1。 因為循環右移所以,實際位置顛倒了,解碼時需要倒過來再拼接。
消息頭因為是varint32可變字節,所以比較復雜些,消息體簡單直接writeBytes即可。
二,ProtobufVarint32FrameDecoder 解碼類
同樣對應CodedOutputStream有CodedInputStream類,操作解碼時的varints。
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { in.markReaderIndex(); final byte[] buf = new byte[5]; for (int i = 0; i < buf.length; i ++) { if (!in.isReadable()) { in.resetReaderIndex(); return; } buf[i] = in.readByte(); if (buf[i] >= 0) { int length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32(); if (length < 0) { throw new CorruptedFrameException("negative length: " + length); } if (in.readableBytes() < length) { in.resetReaderIndex(); return; } else { out.add(in.readBytes(length)); return; } } } // Couldn't find the byte whose MSB is off. throw new CorruptedFrameException("length wider than 32-bit"); }
前面說明了最大長度為5個字節所以這里聲明了5個長度的字節來讀取消息頭。
buf[i] >= 0 這里為什么是>0然后就可以解碼了呢?
還是這句話:varints第一位表示后續的byte是否是該數字的一部分!
如果字節第一位為1則表示后續還有字節是表示消息頭,當這個字節的第一位為1則這個字節肯定是負數(字節最高位表示正負),大於等於0表示描述消息體長度的數字已經讀完了。
然后調用readRawVarint32() 還原成int ,與之前 writeRawVarint32()反其道而行。
public int readRawVarint32() throws IOException { byte tmp = readRawByte(); if (tmp >= 0) { return tmp; } int result = tmp & 0x7f; if ((tmp = readRawByte()) >= 0) { result |= tmp << 7; } else { result |= (tmp & 0x7f) << 7; if ((tmp = readRawByte()) >= 0) { result |= tmp << 14; } else { result |= (tmp & 0x7f) << 14; if ((tmp = readRawByte()) >= 0) { result |= tmp << 21; } else { result |= (tmp & 0x7f) << 21; result |= (tmp = readRawByte()) << 28; if (tmp < 0) { // Discard upper 32 bits. for (int i = 0; i < 5; i++) { if (readRawByte() >= 0) { return result; } } throw InvalidProtocolBufferException.malformedVarint(); } } } } return result; }
取第N字節左移7*N位或|第一個字節拼接,實現了倒序拼接,最后得到了消息體長度。然后根據得到的消息體長度讀取數據,如果消息體長度不夠則回滾到markReaderIndex,等待數據。
四,總結
本文主要詳細介紹了netty 對 protobuf 協議的解碼與包裝。重點在消息頭 varint32的 算法表示上進行了說明。了解了varint32在協議中的實現,方便應用在其語言對接。