粘包現象
客戶端在一個for循環內連續發送1000個hello給Netty服務器端,
1 Socket socket = new Socket("127.0.0.1", 10101); 2 for(int i = 0; i < 1000; i++){ 3 socket.getOutputStream().write(“hello”.getBytes()); 4 } 5 socket.close();
而在服務器端接受到的信息並不是預期的1000個獨立的Hello字符串.
實際上是無序的hello字符串混合在一起, 如圖所示. 這種現象我們稱之為粘包.
為什么會出現這種現象呢? TCP是個”流”協議,流其實就是沒有界限的一串數據。
TCP底層中並不了解上層業務數據的具體含義,它會根據TCP緩沖區的實際情況進行包划分,
所以在TCP中就有可能一個完整地包會被TCP拆分成多個包,也有可能吧多個小的包封裝成一個大的數據包發送。
分包處理
顧名思義, 我們要對傳輸的數據進行分包. 一個簡單的處理邏輯是在發送數據包之前, 先用四個字節占位, 表示數據包的長度.
數據包結構為:
| 長度(4字節) | 數據 |
1 Socket socket = new Socket("127.0.0.1", 10101); 2 String message = "hello"; 3 byte[] bytes = message.getBytes(); 4 ByteBuffer buffer = ByteBuffer.allocate(4 + bytes.length); 5 // 消息長度 6 buffer.putInt(bytes.length); 7 // 消息正文 8 buffer.put(bytes); 9 byte[] array = buffer.array(); 10 for(int i = 0; i < 1000; i++){ 11 socket.getOutputStream().write(array); 12 } 13 socket.close();
服務器端代碼, 我們需要借助於FrameDecoder類來分包.
1 public class MyDecoder extends FrameDecoder { 2 3 @Override 4 protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { 5 6 if(buffer.readableBytes() > 4){ 7 //標記 8 buffer.markReaderIndex(); 9 //長度 10 int length = buffer.readInt(); 11 12 if(buffer.readableBytes() < length){ 13 buffer.resetReaderIndex(); 14 //緩存當前剩余的buffer數據,等待剩下數據包到來 15 return null; 16 } 17 18 //讀數據 19 byte[] bytes = new byte[length]; 20 buffer.readBytes(bytes); 21 //往下傳遞對象 22 return new String(bytes); 23 } 24 //緩存當前剩余的buffer數據,等待剩下數據包到來 25 return null; 26 } 27 28 }
如此一來, 我們再次在服務器端接受到的消息就是按序打印的hello了.
這邊可能有個疑問, 為什么MyDecoder中數據沒有讀取完畢, 需要return null,
正常的pipeline在數據處理完都是要sendUpstream, 給下一個pipeline的.
這個需要看下FrameDecoder.messageReceived 的源碼. 他在其中緩存了一個cumulation對象,
如果return了null, 他會繼續往緩存里寫數據來實現分包
1 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 2 Object m = e.getMessage(); 3 if (!(m instanceof ChannelBuffer)) { 4 // 數據讀完了, 轉下一個pipeline 5 ctx.sendUpstream(e); 6 } else { 7 ChannelBuffer input = (ChannelBuffer)m; 8 if (input.readable()) { 9 if (this.cumulation == null) { 10 try { 11 this.callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); 12 } finally { 13 this.updateCumulation(ctx, input); 14 } 15 } else { 16 // 緩存上一次沒讀完整的數據 17 input = this.appendToCumulation(input); 18 19 try { 20 this.callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); 21 } finally { 22 this.updateCumulation(ctx, input); 23 } 24 } 25 26 } 27 } 28 }
那么是不是這樣就萬事大吉了呢?
Socket字節流攻擊
在上述代碼中, 我們會在服務器端為客戶端發送的數據包長度, 預先分配byte數組.
如果遇到惡意攻擊, 傳入的數據長度與內容 不匹配. 例如聲明數據長度為Integer.MAX_VALUE.
這樣會消耗大量的服務器資源生成byte[], 顯然是不合理的.
因此我們還要加個最大長度限制.
1 if(buffer.readableBytes() > 2048){ 2 buffer.skipBytes(buffer.readableBytes()); 3 }
新的麻煩也隨之而來, 雖然可以跳過指定長度, 但是數據包本身就亂掉了.
因為長度和內容不匹配, 跳過一個長度后, 不知道下一段數據的開頭在哪里了.
因此我們自定義數據包里面, 不僅要引入數據包長度, 還要引入一個包頭來划分各個包的范圍.
包頭用任意一段特殊字符標記即可, 例如$$$.
1 // 防止socket字節流攻擊 2 if(buffer.readableBytes() > 2048){ 3 buffer.skipBytes(buffer.readableBytes()); 4 } 5 // 記錄包頭開始的index 6 int beginReader = buffer.readerIndex(); 7 8 while(true) { 9 if(buffer.readInt() == ConstantValue.FLAG) { 10 break; 11 } 12 }
新的數據包結構為:
| 包頭(4字節) | 長度(4字節) | 數據 |
Netty自帶拆包類
自己實現拆包雖然可以細粒度控制, 但是也會有些不方便, 可以直接調用Netty提供的一些內置拆包類.
- FixedLengthFrameDecoder 按照特定長度組包
- DelimiterBasedFrameDecoder 按照指定分隔符組包, 例如本文中的$$$
- LineBasedFrameDecoder 按照換行符進行組包, \r \n等等
- ......