之前我們有個netty5的拆包解決方案(參加netty5拆包問題解決實例),現在我們采用另一種思路,不需要新增LengthFieldBasedFrameDecoder,直接修改NettyMessageDecoder:
package com.wlf.netty.nettyapi.msgpack; import com.wlf.netty.nettyapi.constant.Delimiter; import com.wlf.netty.nettyapi.javabean.Header; import com.wlf.netty.nettyapi.javabean.NettyMessage; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class NettyMessageDecoder extends ByteToMessageDecoder { /** * 消息體字節大小:分割符字段4字節+長度字段4字節+請求類型字典1字節+預留字段1字節=10字節 */ private static final int HEAD_LENGTH = 10; @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { while (true) { // 標記字節流開始位置 byteBuf.markReaderIndex(); // 若讀取到分割標識,說明讀取當前字節流開始位置了 if (byteBuf.readInt() == Delimiter.DELIMITER) { break; } // 重置讀索引為0 byteBuf.resetReaderIndex(); // 長度校驗,字節流長度至少10字節,小於10字節則等待下一次字節流過來 if (byteBuf.readableBytes() < HEAD_LENGTH) { byteBuf.resetReaderIndex(); return; } } // 2、獲取data的字節流長度 int dataLength = byteBuf.readInt(); // 校驗數據包是否全部發送過來,總字節流長度(此處讀取的是除去delimiter和length之后的總長度)- // type和reserved兩個字節=data的字節流長度 int totalLength = byteBuf.readableBytes(); if ((totalLength - 2) < dataLength) { // 長度校驗,字節流長度少於數據包長度,說明數據包拆包了,等待下一次字節流過來 byteBuf.resetReaderIndex(); return; } // 3、請求類型 byte type = byteBuf.readByte(); // 4、預留字段 byte reserved = byteBuf.readByte(); // 5、數據包內容 byte[] data = null; if (dataLength > 0) { data = new byte[dataLength]; byteBuf.readBytes(data); } NettyMessage nettyMessage = new NettyMessage(); Header header = new Header(); header.setDelimiter(Delimiter.DELIMITER); header.setLength(dataLength); header.setType(type); header.setReserved(reserved); nettyMessage.setHeader(header); nettyMessage.setData(data); list.add(nettyMessage); // 回收已讀字節 byteBuf.discardReadBytes(); } }
我們的改動很小,只不過將原來的讀索引改為標記索引,然后在拆包時退出方法前重置讀索引,這樣下次數據包過來,我們的讀索引依然從0開始,delimiter的標記就可以讀出來,而不會陷入死循環了。
ByteBuf是ByteBuffer的進化版,ByteBuffer(參見ByteBuffer使用實例)才一個索引,讀寫模式需要通過flip來轉換,而ByteBuf有兩個索引,readerIndex讀索引和writerIndex寫索引,讀寫轉換無縫連接,青出於藍而勝於藍:
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
既然有兩個索引,那么標記mask、重置reset必然也是兩兩對應,上面的代碼中我們只需要用到讀標記和讀重置。
我們把客戶端handler也修改下,先把LengthFieldBasedFrameDecoder去掉:
// channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 4, 4, 2, 0));
再讓數據包更大一些:
/** * 構造PCM請求消息體 * * @return */ private byte[] buildPcmData() throws Exception { byte[] resultByte = longToBytes(System.currentTimeMillis()); // 讀取一個本地文件 String AUDIO_PATH = "D:\\input\\test_1.pcm"; try (RandomAccessFile raf = new RandomAccessFile(AUDIO_PATH, "r")) { int len = -1; byte[] content = new byte[1024]; while((len = raf.read(content)) != -1) { resultByte = addAll(resultByte, content); } } return resultByte; }
再debug下看看,第一次解析客戶端發送的數據,讀取1024字節,我們可以看到讀索引是8(delimiter+length=8),寫索引就是1024,我們的大包里有3939116個字節,去掉10個字節的header,剩下小包是3939106::
第二次再讀1024,代碼已經執行reset重置讀索引了,所以讀索引由8改為0,寫索引累增到2048:
第三次再讀1024,寫索引繼續累增到3072:
最后一次發1024,寫索引已經到達3939116,大包傳輸結束了:
從上面看出,我們對ByteBuf的capacity一直在翻倍,讀指針一直標記在大包的起始位置0,這樣做的目的是每次都能讀取小包的長度length(3939106),拿來跟整個ByteBuf的長度作比較,只要它取到的小包沒到達到length,我們就繼續接受新包,寫索引不停的累加,直到整個大包長度>=3939116(也就是小包>=3939106),這時我們開始移動讀索引,將字節流寫入對象,最后回收已讀取的字節(調用discardReaderBytes方法):
BEFORE discardReadBytes()
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
AFTER discardReadBytes()
+------------------+--------------------------------------+
| readable bytes | writable bytes (got more space) |
+------------------+--------------------------------------+
| | |
readerIndex (0) <= writerIndex (decreased) <= capacity
其他方法參見測試類:
package com.wlf.netty.nettyserver; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.junit.Assert; import org.junit.Test; public class ByteBufTest { @Test public void byteBufTest() { ByteBuf byteBuf = Unpooled.buffer(10); byteBuf.writeInt(0xabef0101); byteBuf.writeInt(1024); byteBuf.writeByte((byte) 1); byteBuf.writeByte((byte) 0); // 開始讀取 printDelimiter(byteBuf); printLength(byteBuf); // 派生一個ByteBuf,取剩下2個字節,但讀索引不動 ByteBuf duplicatBuf = byteBuf.duplicate(); printByteBuf(byteBuf); // 派生一個ByteBuf,取剩下2個字節,讀索引動了 ByteBuf sliceBuf = byteBuf.readSlice(2); printByteBuf(byteBuf); // 兩個派生的對象其實是一樣的 Assert.assertEquals(duplicatBuf, sliceBuf); } private void printDelimiter(ByteBuf buf) { int newDelimiter = buf.readInt(); System.out.printf("delimeter: %s\n", Integer.toHexString(newDelimiter)); printByteBuf(buf); } private void printLength(ByteBuf buf) { int length = buf.readInt(); System.out.printf("length: %d\n", length); printByteBuf(buf); } private void printByteBuf(ByteBuf buf) { System.out.printf("reader Index: %d, writer Index: %d, capacity: %d\n", buf.readerIndex(), buf.writerIndex(), buf.capacity()); } }
輸出:
delimeter: abef0101 reader Index: 4, writer Index: 10, capacity: 10 length: 1024 reader Index: 8, writer Index: 10, capacity: 10 reader Index: 8, writer Index: 10, capacity: 10 reader Index: 10, writer Index: 10, capacity: 10