Netty 中 TCP 粘包拆包問題
信息通過tcp傳輸過程中出現的狀況 .
TCP是個“流”協議,所謂流,就是沒有界限的一串數據。TCP底層並不了解上層業務數據的具體含義,它會根據TCP緩沖區的實際情況進行包的划分,所以在業務上認為,一個完整的包可能會被TCP拆分成多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送
產生粘包和拆包問題的主要原因是,操作系統在發送TCP數據的時候,底層會有一個緩沖區,例如1024個字節大小,如果一次請求發送的數據量比較小,沒達到緩沖區大小,TCP則會將多個請求合並為同一個請求進行發送,這就形成了粘包問題;如果一次請求發送的數據量比較大,超過了緩沖區大小,TCP就會將其拆分為多次發送,這就是拆包,也就是將一個大的包拆分為多個小包進行發送。
入圖所示:
上圖中演示了粘包和拆包的三種情況:
- D1和D2兩個包都剛好滿足TCP緩沖區的大小,或者說其等待時間已經達到TCP等待時長,從而還是使用兩個獨立的包進行發送;
- D1和D2兩次請求間隔時間內較短,並且數據包較小,因而合並為同一個包發送給服務端;
- 某一個包比較大,因而將其拆分為兩個包D*_1和D*_2進行發送,而這里由於拆分后的某一個包比較小,其又與另一個包合並在一起發送。
發生這種情況的代碼:
客戶端發送數據 快速的發送 10條數據 :
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用客戶端發送10條數據 hello,server 編號
for(int i= 0; i< 10; ++i) {
ByteBuf buffer = Unpooled.copiedBuffer("hello,server " +i, Charset.forName("utf-8"));
ctx.writeAndFlush(buffer);
}
}
}
服務端接受打印:
服務器接收到數據 hello,server 0
服務器接收到數據 hello,server 1
服務器接收到數據 hello,server 2hello,server 3
服務器接收到數據 hello,server 4hello,server 5
服務器接收到數據 hello,server 6
服務器接收到數據 hello,server 7hello,server 8
服務器接收到數據 hello,server 9
很明顯 其中有三條記錄被粘在其他數據上,這就是TCP的粘包拆包現象
怎么解決:
-
Netty自帶的 解決方案:
-
固定長度的拆包器 FixedLengthFrameDecoder,每個應用層數據包的都拆分成都是固定長度的大小
-
行拆包器 LineBasedFrameDecoder,每個應用層數據包,都以換行符作為分隔符,進行分割拆分
-
分隔符拆包器 DelimiterBasedFrameDecoder,每個應用層數據包,都通過自定義的分隔符,進行分割拆分
-
基於數據包長度的拆包器 LengthFieldBasedFrameDecoder,將應用層數據包的長度,作為接收端應用層數據包的拆分依據。按照應用層數據包的大小,拆包。這個拆包器,有一個要求,就是應用層協議中包含數據包的長度
-
FixedLengthFrameDecoder 解碼器
服務端 添加 FixedLengthFrameDecoder 解碼器 並指定長度
public class EchoServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//指定長度為9 則每次截取長度為9的字節
ch.pipeline().addLast(new FixedLengthFrameDecoder(9));
// 將 每次截取的字節編碼為字符串
ch.pipeline().addLast(new StringDecoder());
//自定義處理類打印
ch.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture future = bootstrap.bind(8000).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
自定義服務端Handler 打印字符串:
public class EchoServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("message: " + msg.trim());
}
}
客戶端發送信息 並添加字符串編碼器 將信息已字符串的形式編碼:
public class EchoClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 8000).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
客戶端Handler 發送信息 剛好長度為9 :
public class EchoClientHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("123456789");
}
}
總結: FixedLengthFrameDecoder 解碼器 將按照指定長度截取字節 並添加到List中向后傳遞 , 以本案例為例,如果字節數剛好為9,則全部打印,如果 字節數為18, 則拆分打印兩次,如果為19 則最后一個字節不打印,如果不足9 則什么都不打印.
LineBasedFrameDecoder 行拆分器
通過行換行符 \n 或者 \r\n 進行分割,
將上面案例的FixedLengthFrameDecoder 解碼器 換成 LineBasedFrameDecoder
並指定 截取每段的最大長度 (超過報錯 不往后傳遞)
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
ch.pipeline().addLast(new LineBasedFrameDecoder(5));
// 將前一步解碼得到的數據轉碼為字符串
ch.pipeline().addLast(new StringDecoder());
// 最終的數據處理
ch.pipeline().addLast(new EchoServerHandler());
}
});
...
客戶端Handler 發送字符串, 最后的"1234" 不會打印,,
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("1\n123456\r\n1234");
}
服務端接收並打印結果 分別打印了 "1" 和 "1234" 而超過字節長度5 的 "123456"則報出TooLongFrameException錯誤
server receives message: 1
An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.TooLongFrameException: frame length (6) exceeds the allowed maximum (5)
server receives message: 1234
DelimiterBasedFrameDecoder 自定義分割符
和行分割符類似, 此解碼器可以自定義分割符,常用構造方法:
public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters)
接收一個最大長度,和 任意個數的 分隔符(用ByteBuf的形式傳入),解碼器識別到任意一個 分割符 都會進行拆分
注冊解碼器:
傳入 "$" 和 "*" 作為分割符,並指定最大長度為 5個字節
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(5,
Unpooled.wrappedBuffer("$".getBytes()),Unpooled.wrappedBuffer("*".getBytes())));
// 將前一步解碼得到的數據轉碼為字符串
ch.pipeline().addLast(new StringDecoder());
// 最終的數據處理
ch.pipeline().addLast(new EchoServerHandler());
}
});
客戶端 發送數據:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("1$123456*1234$789$");
}
服務端只打印了 "1" 當解析到 "123456" 時 就報錯了 后面就沒有再解析了,會緩存着 等到該通道關閉 或者有后續數據發送過來時 才繼續解析
LengthFieldBasedFrameDecoder
自定義數據長度,發送的 字節數組中 包含 描述 數據長度的字段 和 數據本身,
解碼過程
常用字段:
- maxFrameLength:指定了每個包所能傳遞的最大數據包大小,(上圖中的最大長度為11)
- lengthFieldOffset:指定了長度字段在字節碼中的偏移量;(11這個描述長度的數據是在數組的第幾位開始)
- lengthFieldLength:指定了長度字段所占用的字節長度;(11 占 1個字節)
- lengthAdjustment: 長度域的偏移量矯正。 如果長度域的值,除了包含有效數據域的長度外,還包含了其他域(如長度域自身)長度,那么,就需要進行矯正。矯正的值為:包長 - 長度域的值 – 長度域偏移 – 長度域長。 ( 11 這個域 不光光描述 Hello,world, 一般設置為0,)
- initialBytesToStrip : 丟棄的起始字節數。丟棄處於有效數據前面的字節數量。比如前面有1個節點的長度域,則它的值為1. ( 如果為0代表不丟棄,則將長度域也向后傳遞)
服務端添加 解碼器:
- 最大長度 為 長度描述域 的值11 + 長度描述域本身占用的長度 1 = 12
- 長度描述域放在數據包的第一位, 沒有偏移 為0
- 長度描述域 長度為1
- 無需矯正
- 一個字節也不丟棄
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 這里將FixedLengthFrameDecoder添加到pipeline中,指定長度為20
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(12,0,1,0,0));
// 將前一步解碼得到的數據轉碼為字符串
ch.pipeline().addLast(new StringDecoder());
// 最終的數據處理
ch.pipeline().addLast(new EchoServerHandler());
}
});
客戶端發送數據 發送最Netty 底層操作 的ByteBuf對象 發送時 無需任何編碼:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = Unpooled.buffer();
buffer.writeByte(11);
buffer.writeBytes("Hello,World".getBytes());
ctx.writeAndFlush(buffer);
}
服務端接收數據為 (11代表的制表符)Hello,World
這樣發送 每次都要計算 數據長度,並手動添加到 數據的前面,很不方便 配合LengthFieldPrepender
使用,這個編碼碼器可以計算 長度,並自動添加到 數據的前面
改造客戶端 先攔截數據按字符串編碼,再計算字節長度 添加 長度描述字段 並占用一個字節 (這個長度要與客戶端的解碼器 lengthFieldLength
參數 值保持一致) :
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldPrepender(1));
ch.pipeline().addLast(new StringEncoder());
// 客戶端發送消息給服務端,並且處理服務端響應的消息
ch.pipeline().addLast(new EchoClientHandler());
}
});
客戶端發送 有字符串編碼器 可以直接發送字符串:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("Hello,World");
}
自定義協議
上面介紹的 各種解碼器 已經可以應付絕大多數場景, 如果遇到 特殊的狀況 我們也可以自定義協議
定義 協議對象:
//協議包
public class MessageProtocol {
private int len; //關鍵
private byte[] content;
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
客戶端發送:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i = 0; i< 5; i++) {
String mes = "Hello,World";
byte[] content = mes.getBytes(Charset.forName("utf-8"));
int length = mes.getBytes(Charset.forName("utf-8")).length;
//創建協議包對象
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
ctx.writeAndFlush(messageProtocol);
}
}
該協議的 自定義 編碼器 將協議包發送出去:
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("MyMessageEncoder encode 方法被調用");
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
將客戶端 發送數據的Handler 和 編碼器 注冊 這里就不寫了
服務端解碼器 讀取長度 並 判斷可讀數據的長度是否足夠 :
public class MyMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
in.markReaderIndex();
//讀取長度
int length = in.readInt();
//如果可讀長度大於 數據長度 說明數據完整
if (in.readableBytes()>length){
byte[] content = new byte[length];
in.readBytes(content);
//封裝成 MessageProtocol 對象,放入 out, 傳遞下一個handler業務處理
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
out.add(messageProtocol);
}else{
//如果數據不夠長 將已經讀過的的int 數據還原回去 留下次讀取
in.resetReaderIndex();
}
}
}
服務端成功讀取:
本例中存在很多問題, 明白這個意思就行, 感興趣的話 可以 自己動手優化