TCP粘包和拆包


         TCP(transport control protocol,傳輸控制協議)是面向連接的,面向流的,提供高可靠性服務。收發兩端(客戶端和服務器端)都要有一一成對的socket,因此,發送端為了將多個發往接收端的包,更有效的發到對方,使用了優化方法(Nagle算法),將多次間隔較小且數據量小的數據,合並成一個大的數據塊,然后進行封包。這樣,接收端,就難於分辨出來了,必須提供科學的拆包機制。即面向流的通信是無消息保護邊界的。
        圖解TCP的粘包和拆包

    假設客戶端分別發送了兩個數據包D1和D2給服務端,由於服務端一次讀取到字節數是不確定的,故可能存在以下四種情況:

    1.服務端分兩次讀取到了兩個獨立的數據包,分別是D1和D2,沒有粘包和拆包

    2.服務端一次接受到了兩個數據包,D1和D2粘合在一起,稱之為TCP粘包

    3.服務端分兩次讀取到了數據包,第一次讀取到了完整的D1包和D2包的部分內容,第二次讀取到了D2包的剩余內容,這稱之為TCP拆包

    4.服務端分兩次讀取到了數據包,第一次讀取到了D1包的部分內容D1_1,第二次讀取到了D1包的剩余部分內容D1_2和完整的D2包。

      特別要注意的是,如果TCP的接受滑窗非常小,而數據包D1和D2比較大,很有可能會發生第五種情況,即服務端分多次才能將D1和D2包完全接受,期間發生多次拆包。

     

     粘包、拆包問題的解決方案:定義通信協議

     目前業界主流的協議(protocol)方案可以歸納如下:

     1 定長協議:假設我們規定每3個字節,表示一個有效報文。

     2.特殊字符分隔符協議:在包尾部增加回車或者空格符等特殊字符進行分割 。

     3.長度編碼:將消息分為消息頭和消息體,消息頭中用一個int型數據(4字節),表示消息體長度的字段。在解析時,先讀取內容長度Length,其值為實際消息體內容(Content)占用的字節數,之后必須讀取到這么多字節的內容,才認為是一個完整的數據報文。

     

     下面我用長度編碼方式解決TCP的粘包、拆包問題

     首先看一下不使用長度編碼協議的時候會發生什么問題?

     Server端測試代碼   

public class MyServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).
                    childHandler(new MyServerInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);
        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("服務端接收到的消息內容: " + message);
        System.out.println("服務端接收到的消息數量: " + (++this.count));
        ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString(), Charset.forName("utf-8"));
        ctx.writeAndFlush(responseByteBuf);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        System.out.println(this);

        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new MyServerHandler());
    }
}

     Client端測試代碼

public class MyClient {

    public static void main(String[] args) throws Exception{
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).
                    handler(new MyClientInitializer());

            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; ++i) {
            ByteBuf buffer = Unpooled.copiedBuffer("sent from client ", Charset.forName("utf-8"));
            ctx.writeAndFlush(buffer);
        }
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);

        String message = new String(buffer, Charset.forName("utf-8"));

        System.out.println("客戶端接收到的消息內容: " + message);
        System.out.println("客戶端接收到的消息數量: " + (++this.count));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new MyClientHandler());
    }
}

          啟動服務端,然后

          第一次啟動客戶端,服務端日志:

          第二次啟動客戶端,服務端日志:

          第三次啟動客戶端,服務端日志:

         從結果中可以看出發生了粘包問題。

 

         用長度編碼方式解決TCP的粘包和拆包問題

         自定義協議CustomProtocol,解碼器MyDecoder,編碼器MyEncoder

public class CustomProtocol {

    private int length;

    private byte[] content;

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }
}

public class MyDecoder extends ReplayingDecoder<Void> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        int length = in.readInt();

        byte[] content = new byte[length];
        in.readBytes(content);

        CustomProtocol personProtocol = new CustomProtocol();
        personProtocol.setLength(length);
        personProtocol.setContent(content);

        out.add(personProtocol);
    }
}

public class MyEncoder extends MessageToByteEncoder<CustomProtocol> {

    @Override
    protected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) throws Exception {

        out.writeInt(msg.getLength());
        out.writeBytes(msg.getContent());
    }
}

          Server端測試代碼   

public class MyServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).
                    childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();

                            pipeline.addLast(new MyDecoder());
                            pipeline.addLast(new MyEncoder());

                            pipeline.addLast(new MyClientHandler());
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

public class MyServerHandler extends SimpleChannelInboundHandler<CustomProtocol> {

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, CustomProtocol msg) throws Exception {
        int length = msg.getLength();
        byte[] content = msg.getContent();

        System.out.println("服務端接收到的數據:");
        System.out.println("長度: " + length);
        System.out.println("內容:" + new String(content, Charset.forName("utf-8")));

        System.out.println("服務端接收到的消息數量:" + (++this.count));

        String responseMessage = UUID.randomUUID().toString();
        int responseLength = responseMessage.getBytes("utf-8").length;
        byte[] responseContent = responseMessage.getBytes("utf-8");

        CustomProtocol personProtocol = new CustomProtocol();
        personProtocol.setLength(responseLength);
        personProtocol.setContent(responseContent);

        ctx.writeAndFlush(personProtocol);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

          Client端測試代碼   

public class MyClient {

    public static void main(String[] args) throws Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).
                    handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();

                            pipeline.addLast(new MyDecoder());
                            pipeline.addLast(new MyEncoder());

                            pipeline.addLast(new MyClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

public class MyClientHandler extends SimpleChannelInboundHandler<CustomProtocol> {

    private int count;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; ++i) {
            String messageToBeSent = "sent from client ";
            byte[] content = messageToBeSent.getBytes(Charset.forName("utf-8"));
            int length = messageToBeSent.getBytes(Charset.forName("utf-8")).length;

            CustomProtocol personProtocol = new CustomProtocol();
            personProtocol.setLength(length);
            personProtocol.setContent(content);

            ctx.writeAndFlush(personProtocol);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, CustomProtocol msg) throws Exception {
        int length = msg.getLength();
        byte[] content = msg.getContent();

        System.out.println("客戶端接收到的消息: ");

        System.out.println("長度: " + length);
        System.out.println("內容:" + new String(content, Charset.forName("utf-8")));

        System.out.println("客戶端接受到的消息數量:" + (++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

 測試結果:  

 結果分析:沒有發生Tcp的粘包和拆包。                

      

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM