首先,我們通過一個DEMO來模擬TCP的拆包粘包的情況:客戶端連續向服務端發送100個相同消息。服務端的代碼如下:
AtomicLong count = new AtomicLong(0);
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
long l = count.incrementAndGet();
System.out.println(l + ": " + byteBuf.toString(StandardCharsets.UTF_8));
}
});
}
});
serverBootstrap.bind(8080);
客戶端代碼如下:
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 1000; i++) {
byte[] bytes = "歡迎關注我,微信公眾號:元本一兀!".getBytes(StandardCharsets.UTF_8);
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(bytes);
ctx.channel().writeAndFlush(buffer);
}
}
}
);
}
});
bootstrap.connect("localhost", 8080);
運行結果如下:

首先,我們發了1000個消息,但是在服務端有49行輸出,同時,有些消息是合並在一起的,有些消息解析出了亂碼。上面的輸出中,包含三種情況:
- 正確的結果輸出
- 多個消息拼在一起,也就是粘包情況;
- 另一種情況是半包,導致包不完成,解析出來的數據是亂碼
為什么會粘包、拆包?
這個是因為Netty底層是走的TCP協議,說白了傳輸的是就是字節流,消息與消息之間是沒有邊界的。發生TCP粘包拆包的原因主要有:
- 當連續發送數據時,由於TCP協議的nagle算法,會將較小的內容拼接成較大的包一次性發送到服務器端,因而導致粘包;
- 當發送的內容較大時,由於服務器端的recv(buffer_size)方法中buffer_size較小,不能一次性讀完所有數據,從而導致一個消息分拆成多次讀取,產生非拆包的情況。
本質上來講,TCP協議的包並不是按照業務消息來拆分的,TCP層並不感知發送的消息的大小。
解決粘包拆包的方法
解決粘包拆包的思路,其實就是在接收數據的時候,將字節流拆分成完整的包:
- 如果當前讀到的數據不是一個完整的業務數據包,則繼續從TCP緩沖區中讀取數據,知道讀到的數據中包含完整的數據包;
- 如果檔次讀取到的數據加上內存中已有的數據,包含一個完整的業務數據包,則將完整的業務包拼成一個包,並返回應用層處理;對於多余的數據,仍保留在內存中,待和后續加載的數據合並處理。
Netty中提供了一些拆包器,能夠滿足大部分的使用場景:
- FixedLengthFrameDecoder
- LineBasedFrameDecoder
- DelimiterBasedFrameDecoder
- LengthFieldBasedFrameDecoder
Netty中常用的拆包器
定長拆包器-FixedLengthFrameDecoder
如果你的業務消息格式很簡單,是固定長度的,則使用該拆包器很方便。
比如上面的代碼,發送的數據是固定的51個字節,我們在服務端的pipeline中加上定長拆包器:
AtomicLong count = new AtomicLong(0);
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(51)).addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
long l = count.incrementAndGet();
System.out.println(l + ": " + byteBuf.toString(StandardCharsets.UTF_8));
}
});
}
});
serverBootstrap.bind(8080);
結果如下:
可以看到,服務端收到了1000個完整的獨立的包。
行拆包器-LineBasedFrameDecoder
這個拆包器拆包的邏輯就是按行拆分,發送端每個數據之間用換行符作為分隔符,接收端通過也會按照換行符將字節流拆分成業務消息。
修改一下的上面的客戶端,消息后追加一個\r\n:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 1000; i++) {
byte[] bytes = ("歡迎關注我,微信公眾號:元本一兀!" + i+"\r\n").getBytes(StandardCharsets.UTF_8);
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(bytes);
ctx.channel().writeAndFlush(buffer);
}
}
接收數據段,添加行拆包器:
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE))
.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
long l = count.incrementAndGet();
System.out.println(l + ": " + byteBuf.toString(StandardCharsets.UTF_8));
}
});
}
運行后,我們可以看到接收端能夠接收到1000個完整的包。
基於分隔符的拆包器-DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder允許指定一個分隔符,在收到消息的時候按照指定的分隔符進行拆包。其實上面說的LineBasedFrameDecoder是一個特定的分隔符拆包器,它指定的是使用換行符作為分隔符,下面使用DelimiterBasedFrameDecoder來實現行拆包器:
ch.pipeline().addLast(
new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, delimiterLine, delimiterSharp))
.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
long l = count.incrementAndGet();
System.out.println(l + ": " + byteBuf.toString(StandardCharsets.UTF_8));
}
});
基於分隔符的拆包器允許設置多個分隔符,在設置多個分隔符的情況下,會將包拆分成最小的滿足分隔符的包。
基於長度域的拆包器-LengthFieldBasedFrameDecoder
最后一種拆包器是通用性最強的一種拆包器,只要我們協議的中有一個固定的區域來表示數據長度,就可以方便的使用該拆包器。LengthFieldBaesdFrameDecoder有很多可配置的參數,用來應對各種情況的長度域。
長度域的offset為0
假設消息中長度域就在開頭,這種情況下不需要考慮長度域之前是否有其他內容,配置LengthFieldBasedFrameDecoder很簡單,設置offset為0,以及長度域的字節數。這里以長度域占2字節為例:
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2)
一個解析的例子:
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
長度域offset為0,去掉協議頭
上面的例子中,我們保留了協議頭,雖然這里的協議頭就有長度域。如果我們只想保留數據域,這里需要設置跳過的字節數為2字節:
new LengthFieldBasedFrameDecoder(
Integer.MAX_VALE, // maxFrameLength
0, // lengthFieldOffset
2, // lengthFieldLength
0, // lengthAdjustment
2) // initalBytesToStrip 跳過2字節,也就是跳過長度域
拆包示例:
* BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
* +--------+----------------+ +----------------+
* | Length | Actual Content |----->| Actual Content |
* | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
* +--------+----------------+ +----------------+
長度域占2字節,offset為0,不跳過header,長度域表示所有消息的大小
在前面兩個例子,長度的長度表示的是長度之后的數據長度。這里我們考慮長度里面設置的長度表示的是整個消息的大小,包括頭部和數據部分。這種情況下我們需要制定lengthAdjustment,數據部分的長度為 長度域里的長度 - lengthAdjustment。
new LengthFieldBasedFrameDecoder(
Integer.MAX_VALE, // maxFrameLength
0, // lengthFieldOffset
2, // lengthFieldLength
2, // lengthAdjustment 長度 - lengthAdjustment為數據部分的長度
0) // initalBytesToStrip
拆包示例:
* BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
* +--------+----------------+ +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
* +--------+----------------+ +--------+----------------+
HEADER中不只有長度域的情況
前面的case中,協議頭部只有長度域,但是更多的情況中,HEADER中不只有長度域。比如下面這個例子,HEADER部分有三部分,HDR1,Length,HDR2三部分,分別占1個字節,2個字節,1個字節。
在長度域之前還有HDR1,要定位到長度域,需要指定長度域的offset(lengthFieldOffset=1)。
這里長度域存的是所有的數據長度,如果我們希望拆包的結果中包含HDR2+Data兩部分,可以通過設置lengthAdjustment=-3,長度域之后的內容長度是HDR2+DATA。拆包的結果中,只想包含HDR2+DATA,所以整個消息跳過錢3個字節(HDR1+Lenght部分)。
* +------+--------+------+----------------+
* | HDR1 | Length | HDR2 | Data |
* | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |
* +------+--------+------+----------------+
代碼如下:
new LengthFieldBasedFrameDecoder(
Integer.MAX_VALE, // maxFrameLength
3, // lengthFieldOffset
2, // lengthFieldLength
-3, // lengthAdjustment 長度 - lengthAdjustment為數據部分的長度
3) // initalBytesToStrip
解析結果:
* BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
* +------+--------+------+----------------+ +------+----------------+
* | HDR1 | Length | HDR2 | Data |----->| HDR2 | Data |
* | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
* +------+--------+------+----------------+ +------+----------------+
