Netty TCP 粘包和拆包 及解決方案


1 TCP 粘包和拆包基本介紹

1) TCP 是面向連接的, 面向流的, 提供高可靠性服務。 收發兩端(客戶端和服務器端) 都要有一一成對的 socket,因此, 發送端為了將多個發給接收端的包, 更有效的發給對方, 使用了優化方法(Nagle 算法) , 將多次間隔較小且數據量小的數據, 合並成一個大的數據塊, 然后進行封包。 這樣做雖然提高了效率, 但是接收端就難於分辨出完整的數據包了, 因為面向流的通信是無消息保護邊界的。
2) 由於 TCP 無消息保護邊界, 需要在接收端處理消息邊界問題, 也就是我們所說的粘包、 拆包問題, 看一張圖
3) 示意圖 TCP 粘包、 拆包圖解

 對圖的說明:

 假設客戶端分別發送了兩個數據包 D1 D2 給服務端, 由於服務端一次讀取到字節數是不確定的, 故可能存在以
下四種情況:
    1) 服務端分兩次讀取到了兩個獨立的數據包, 分別是 D1 D2, 沒有粘包和拆包
    2) 服務端一次接受到了兩個數據包, D1 D2 粘合在一起, 稱之為 TCP 粘包
    3) 服務端分兩次讀取到了數據包, 第一次讀取到了完整的 D1 包和 D2 包的部分內容, 第二次讀取到了 D2 的剩余內容, 這稱之為 TCP 拆包 

    4) 服務端分兩次讀取到了數據包, 第一次讀取到了 D1 包的部分內容 D1_1, 第二次讀取到了 D1 包的剩余部

       分內容 D1_2 和完整的 D2 包。


2 TCP 粘包和拆包現象實例 (現象)

在編寫 Netty 程序時, 如果沒有做處理, 就會發生粘包和拆包的問題
看一個具體的實例:
MyClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset;
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用客戶端發送 10 條數據 hello,server 編號
for(int i= 0; i< 10; ++i) {
ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8"));
ctx.writeAndFlush(buffer);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
String message = new String(buffer, Charset.forName("utf-8"));
System.out.println("客戶端接收到消息=" + message);
System.out.println("客戶端接收消息數量=" + (++this.count));
} 
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
View Code
MyServerHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset;
import java.util.UUID;
public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf>{
private int count;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
//將 buffer 轉成字符串
String message = new String(buffer, Charset.forName("utf-8"));
System.out.println("服務器接收到數據 " + message);
System.out.println("服務器接收到消息量=" + (++this.count));
//服務器回送數據給客戶端, 回送一個隨機 id ,
ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ",
Charset.forName("utf-8"));
ctx.writeAndFlush(responseByteBuf);
}
}
View Code

3 TCP 粘包和拆包解決方案

1) 使用自定義協議 + 編解碼器 來解決
2) 關鍵就是要解決 服務器端每次讀取數據長度的問題, 這個問題解決, 就不會出現服務器多讀或少讀數據的問
題, 從而避免的 TCP 粘包、 拆包 。 
實列:

1) 要求客戶端發送 5 Message 對象, 客戶端每次發送一個 Message 對象
2) 服務器端每次接收一個 Message, 5 次進行解碼, 每讀取到 一個 Message , 會回復一個 Message 對象 給客
戶端.

 

 MessageProtocol //協議包

public class MessageProtocol {
private int len; //關鍵
private byte[] content;
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
} p
ublic byte[] getContent() {
return content;
} p
ublic void setContent(byte[] content) {
this.content = content;
}
}
View Code

 MyClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.Charset;

public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {

    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客戶端發送10條數據 "今天天氣冷,吃火鍋" 編號

        for(int i = 0; i< 5; i++) {
            String mes = "今天天氣冷,吃火鍋";
            byte[] content = mes.getBytes(Charset.forName("utf-8"));
            int length = mes.getBytes(Charset.forName("utf-8")).length;

            //創建協議包對象
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(content);
            ctx.writeAndFlush(messageProtocol);

        }

    }

//    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {

        int len = msg.getLen();
        byte[] content = msg.getContent();

        System.out.println("客戶端接收到消息如下");
        System.out.println("長度=" + len);
        System.out.println("內容=" + new String(content, Charset.forName("utf-8")));

        System.out.println("客戶端接收消息數量=" + (++this.count));

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("異常消息=" + cause.getMessage());
        ctx.close();
    }
}
View Code
MyClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class MyClient {
    public static void main(String[] args)  throws  Exception{

        EventLoopGroup group = new NioEventLoopGroup();

        try {

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer()); //自定義一個初始化類

            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();

            channelFuture.channel().closeFuture().sync();

        }finally {
            group.shutdownGracefully();
        }
    }
}
View Code
MyClientInitializer
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;


public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyMessageEncoder()); //加入編碼器
        pipeline.addLast(new MyMessageDecoder()); //加入解碼器
        pipeline.addLast(new MyClientHandler());
    }
}
View Code
MyServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MyServer {
    public static void main(String[] args) throws Exception{

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new MyServerInitializer()); //自定義一個初始化類


            ChannelFuture channelFuture = serverBootstrap.bind(9994).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}
View Code
MyServerInitializer
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;


public class MyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new MyMessageDecoder());//解碼器
        pipeline.addLast(new MyMessageEncoder());//編碼器
        pipeline.addLast(new MyServerHandler());
    }
}
View Code
MyServerHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.Charset;
import java.util.UUID;


//處理業務的handler
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol>{
    private int count;

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {

        //接收到數據,並處理
        int len = msg.getLen();
        byte[] content = msg.getContent();

        System.out.println();
        System.out.println();
        System.out.println();
        System.out.println("服務器接收到信息如下");
        System.out.println("長度=" + len);
        System.out.println("內容=" + new String(content, Charset.forName("utf-8")));

        System.out.println("服務器接收到消息包數量=" + (++this.count));

        //回復消息

//        String responseContent = UUID.randomUUID().toString();
//        int responseLen = responseContent.getBytes("utf-8").length;
//        byte[]  responseContent2 = responseContent.getBytes("utf-8");
        //構建一個協議包
//        MessageProtocol messageProtocol = new MessageProtocol();
//        messageProtocol.setLen(responseLen);
//        messageProtocol.setContent(responseContent2);
//
//        ctx.writeAndFlush(messageProtocol);


    }
}
View Code
MyMessageDecoder
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

public class MyMessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyMessageDecoder decode 被調用");
        //需要將得到二進制字節碼-> MessageProtocol 數據包(對象)
        int length = in.readInt();

        byte[] content = new byte[length];
        in.readBytes(content);

        //封裝成 MessageProtocol 對象,放入 out, 傳遞下一個handler業務處理
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(content);

        out.add(messageProtocol);

    }
}
View Code
MyMessageEncoder
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyMessageEncoder encode 方法被調用");
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM