Netty之粘包分包


粘包現象


客戶端在一個for循環內連續發送1000個hello給Netty服務器端,  

1         Socket socket = new Socket("127.0.0.1", 10101);
2         for(int i = 0; i < 1000; i++){
3             socket.getOutputStream().write(“hello”.getBytes());
4         }        
5         socket.close();

 而在服務器端接受到的信息並不是預期的1000個獨立的Hello字符串.

實際上是無序的hello字符串混合在一起, 如圖所示. 這種現象我們稱之為粘包.

為什么會出現這種現象呢? TCP是個”流”協議,流其實就是沒有界限的一串數據。 

TCP底層中並不了解上層業務數據的具體含義,它會根據TCP緩沖區的實際情況進行包划分,

所以在TCP中就有可能一個完整地包會被TCP拆分成多個包,也有可能吧多個小的包封裝成一個大的數據包發送。

分包處理


顧名思義, 我們要對傳輸的數據進行分包. 一個簡單的處理邏輯是在發送數據包之前, 先用四個字節占位, 表示數據包的長度. 

數據包結構為:

|    長度(4字節)    |    數據    |

 

 1         Socket socket = new Socket("127.0.0.1", 10101);
 2         String message = "hello";
 3         byte[] bytes = message.getBytes();
 4         ByteBuffer buffer = ByteBuffer.allocate(4 + bytes.length);
 5         // 消息長度
 6         buffer.putInt(bytes.length);
 7         // 消息正文
 8         buffer.put(bytes);
 9         byte[] array = buffer.array();
10         for(int i = 0; i < 1000; i++){
11             socket.getOutputStream().write(array);
12         }
13         socket.close();

 

服務器端代碼, 我們需要借助於FrameDecoder類來分包.

 1 public class MyDecoder extends FrameDecoder {
 2 
 3     @Override
 4     protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
 5 
 6         if(buffer.readableBytes() > 4){            
 7             //標記
 8             buffer.markReaderIndex();
 9             //長度
10             int length = buffer.readInt();
11             
12             if(buffer.readableBytes() < length){
13                 buffer.resetReaderIndex();
14                 //緩存當前剩余的buffer數據,等待剩下數據包到來
15                 return null;
16             }
17             
18             //讀數據
19             byte[] bytes = new byte[length];
20             buffer.readBytes(bytes);
21             //往下傳遞對象
22             return new String(bytes);
23         }
24         //緩存當前剩余的buffer數據,等待剩下數據包到來
25         return null;
26     }
27 
28 }

 

如此一來, 我們再次在服務器端接受到的消息就是按序打印的hello了.

這邊可能有個疑問, 為什么MyDecoder中數據沒有讀取完畢, 需要return null,

正常的pipeline在數據處理完都是要sendUpstream, 給下一個pipeline的.

這個需要看下FrameDecoder.messageReceived 的源碼. 他在其中緩存了一個cumulation對象, 

如果return了null, 他會繼續往緩存里寫數據來實現分包

 1     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
 2         Object m = e.getMessage();
 3         if (!(m instanceof ChannelBuffer)) {
 4           // 數據讀完了, 轉下一個pipeline
 5             ctx.sendUpstream(e);
 6         } else {
 7             ChannelBuffer input = (ChannelBuffer)m;
 8             if (input.readable()) {
 9                 if (this.cumulation == null) {
10                     try {
11                         this.callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
12                     } finally {
13                         this.updateCumulation(ctx, input);
14                     }
15                 } else {
16                  // 緩存上一次沒讀完整的數據
17                     input = this.appendToCumulation(input);
18 
19                     try {
20                         this.callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
21                     } finally {
22                         this.updateCumulation(ctx, input);
23                     }
24                 }
25 
26             }
27         }
28     }

 

那么是不是這樣就萬事大吉了呢?

Socket字節流攻擊


在上述代碼中, 我們會在服務器端為客戶端發送的數據包長度, 預先分配byte數組.

如果遇到惡意攻擊, 傳入的數據長度與內容 不匹配. 例如聲明數據長度為Integer.MAX_VALUE.

這樣會消耗大量的服務器資源生成byte[], 顯然是不合理的.

因此我們還要加個最大長度限制.

1           if(buffer.readableBytes() > 2048){
2              buffer.skipBytes(buffer.readableBytes());
3           }

新的麻煩也隨之而來, 雖然可以跳過指定長度, 但是數據包本身就亂掉了.

因為長度和內容不匹配, 跳過一個長度后, 不知道下一段數據的開頭在哪里了.

因此我們自定義數據包里面, 不僅要引入數據包長度, 還要引入一個包頭來划分各個包的范圍.

包頭用任意一段特殊字符標記即可, 例如$$$.

 1         // 防止socket字節流攻擊
 2         if(buffer.readableBytes() > 2048){
 3           buffer.skipBytes(buffer.readableBytes());
 4         }
 5         // 記錄包頭開始的index
 6         int beginReader = buffer.readerIndex();
 7         
 8         while(true) {
 9             if(buffer.readInt() == ConstantValue.FLAG) {
10                 break;
11             }
12         }

 

新的數據包結構為:

|    包頭(4字節)    |    長度(4字節)    |    數據    |

Netty自帶拆包類


自己實現拆包雖然可以細粒度控制, 但是也會有些不方便, 可以直接調用Netty提供的一些內置拆包類.

  • FixedLengthFrameDecoder 按照特定長度組包
  • DelimiterBasedFrameDecoder 按照指定分隔符組包, 例如本文中的$$$
  • LineBasedFrameDecoder 按照換行符進行組包, \r \n等等
  • ......


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM