Tcp 粘包以及解決方法


1. 簡介

1. TCP 是面向連接的,面向流的,提供可靠性服務,收發兩端(客戶端和服務器端) 都要有一一成對的Socket, 因此,發送端為了將多個發送給接收端的包更有效的發給對方,使用了優化算法(Nagle 算法),將多次間隔較小且數據量小的數據,合並成一個大的數據塊,然后進行封包,這樣雖然提高了效率,但是接收端就難於分辨出完整的數據包了。 因為面向流的通信是無消息保護邊界的。

2. 由於TCP 無消息保護邊界,需要在接收端處理消息邊界問題, 也就是我們所說的粘包拆包問題。

2. 粘包問題演示

TcpServer:

package netty.tcp;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TcpServer {

    private static final Integer PORT = 6666;

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyNettyServerInitializer());

        ChannelFuture sync = serverBootstrap.bind(PORT).sync();
        sync.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("服務端啟動成功,監聽地址: " + PORT);
                }
            }
        });
    }
}

MyNettyServerInitializer

package netty.tcp;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class MyNettyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        // 向管道加入處理器
        ChannelPipeline pipeline = ch.pipeline();
        // 1. 增加一個自定義的handler
        pipeline.addLast(new MyServerHandler());

        System.out.println("server is ok~~~~");
    }
}

MyServerHandler

package netty.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private int count;

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        int i = msg.readableBytes();
        System.out.println("讀取到的字節數: " + i);
        // 回顯一個消息給客戶端
        String msgStr = msg.toString(CharsetUtil.UTF_8);
        String printMsg = "count: " + (++count) + "; msgStr:" + msgStr;
        System.out.println(printMsg);

        ctx.channel().writeAndFlush(Unpooled.copiedBuffer(printMsg, CharsetUtil.UTF_8));
    }
}

TcpClient

package netty.tcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TcpClient {

    private static final Integer PORT = 6666;

    public static void main(String[] args) throws InterruptedException {
        // 創建一個事件循環組
        EventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            // 創建一個啟動Bootstrap(注意是Netty包下的)
            Bootstrap bootstrap = new Bootstrap();
            // 鏈式設置參數
            bootstrap.group(eventExecutors) // 設置線程組
                    .channel(NioSocketChannel.class) // 設置通道class
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 1. 加入一個自定義的處理器
                            pipeline.addLast(new MyClientHandler());
                        }
                    });
            System.out.println("客戶端is ok...");

            // 啟動客戶端連接服務器(ChannelFuture 是netty的異步模型)
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", PORT).sync();
            // 監聽關閉通道
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 關閉
            eventExecutors.shutdownGracefully();
        }
    }
}

MyClientHandler

package netty.tcp;


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * 自定義服務器端處理handler,需要繼承netty定義的 ChannelInboundHandlerAdapter 類
 */
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    /**
     * 通道就緒事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 循環發送十條消息
        int count = 10;
        String msg = "client msg ";
        for (int i = 0; i < count; i++) {
            ctx.writeAndFlush(Unpooled.copiedBuffer(msg + i, CharsetUtil.UTF_8));
        }
    }

    /**
     * 發生異常事件
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("從客戶端 " + ctx.channel().remoteAddress() + " 讀取到的消息, long: " + msg);
        // 回顯一個消息給客戶端
        String msgStr = msg.toString(CharsetUtil.UTF_8);
        System.out.println(msgStr);
    }
}

 

啟動服務器端,然后啟動兩個客戶端,查看日志:

(1) 服務器端:

服務端啟動成功,監聽地址: 6666
server is ok~~~~
讀取到的字節數: 120
count: 1; msgStr:client msg 0client msg 1client msg 2client msg 3client msg 4client msg 5client msg 6client msg 7client msg 8client msg 9
server is ok~~~~
讀取到的字節數: 12
count: 1; msgStr:client msg 0
讀取到的字節數: 96
count: 2; msgStr:client msg 1client msg 2client msg 3client msg 4client msg 5client msg 6client msg 7client msg 8
讀取到的字節數: 12
count: 3; msgStr:client msg 9

(2) 客戶端1

客戶端is ok...
從客戶端 /127.0.0.1:6666 讀取到的消息, long: PooledUnsafeDirectByteBuf(ridx: 0, widx: 137, cap: 1024)
count: 1; msgStr:client msg 0client msg 1client msg 2client msg 3client msg 4client msg 5client msg 6client msg 7client msg 8client msg 9

(3) 客戶端2

客戶端is ok...
從客戶端 /127.0.0.1:6666 讀取到的消息, long: PooledUnsafeDirectByteBuf(ridx: 0, widx: 171, cap: 1024)
count: 1; msgStr:client msg 0count: 2; msgStr:client msg 1client msg 2client msg 3client msg 4client msg 5client msg 6client msg 7client msg 8count: 3; msgStr:client msg 9

  可以看到發生了消息錯亂。

3. 解決辦法

1. 加入一個消息包裝類 TransferMsg

package netty.tcp;

import lombok.Data;

@Data
public class TransferMsg {

    private Integer length;

    private byte[] msg;
}

2. TransferMsgEncoder 編碼器

package netty.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class TransferMsgEncoder extends MessageToByteEncoder<TransferMsg> {

    @Override
    protected void encode(ChannelHandlerContext ctx, TransferMsg msg, ByteBuf out) throws Exception {
        System.out.println("netty.tcp.TransferMsgEncoder.encode 被調用");
        out.writeInt(msg.getLength());
        out.writeBytes(msg.getMsg());
    }
}

3. TransferMsgDecoder 解碼器

package netty.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

public class TransferMsgDecoder extends ReplayingDecoder<Void> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("netty.tcp.TransferMsgDecoder.decode 被調用");
        int count = in.readInt();
        byte[] bytes = new byte[count];
        in.readBytes(bytes);

        TransferMsg transferMsg = new TransferMsg();
        transferMsg.setMsg(bytes);
        transferMsg.setLength(count);
        out.add(transferMsg);
    }
}

4. 服務器端修改

修改MyNettyServerInitializer 加入自己的解碼器

package netty.tcp;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class MyNettyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        // 向管道加入處理器
        ChannelPipeline pipeline = ch.pipeline();
        // 1. 增加一個自定義的handler
        pipeline.addLast(new TransferMsgDecoder());
        pipeline.addLast(new MyServerHandler());

        System.out.println("server is ok~~~~");
    }
}

修改MyServerHandler

package netty.tcp;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class MyServerHandler extends SimpleChannelInboundHandler<TransferMsg> {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TransferMsg msg) throws Exception {
        Integer length = msg.getLength();
        byte[] msg1 = msg.getMsg();
        String s = new String(msg1);
        System.out.println("讀到消息,length: " + length + "\tmsg:" + s);
    }
}

2. 客戶端修改

TcpClient 加入自己的編碼器

package netty.tcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TcpClient {

    private static final Integer PORT = 6666;

    public static void main(String[] args) throws InterruptedException {
        // 創建一個事件循環組
        EventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            // 創建一個啟動Bootstrap(注意是Netty包下的)
            Bootstrap bootstrap = new Bootstrap();
            // 鏈式設置參數
            bootstrap.group(eventExecutors) // 設置線程組
                    .channel(NioSocketChannel.class) // 設置通道class
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            // 1. 加入一個自定義的處理器
                            pipeline.addLast(new TransferMsgEncoder());
                            pipeline.addLast(new MyClientHandler());
                        }
                    });
            System.out.println("客戶端is ok...");

            // 啟動客戶端連接服務器(ChannelFuture 是netty的異步模型)
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", PORT).sync();
            // 監聽關閉通道
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 關閉
            eventExecutors.shutdownGracefully();
        }
    }
}

MyClientHandler 修改發送的消息格式

package netty.tcp;


import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * 自定義服務器端處理handler,需要繼承netty定義的 ChannelInboundHandlerAdapter 類
 */
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    /**
     * 通道就緒事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 循環發送十條消息
        int count = 10;
        String msg = "client msg ";
        String sendMsg = null;
        TransferMsg transferMsg = null;
        for (int i = 0; i < count; i++) {
            transferMsg = new TransferMsg();
            sendMsg = msg + i;
            transferMsg.setMsg(sendMsg.getBytes());
            transferMsg.setLength(sendMsg.getBytes().length);
            ctx.writeAndFlush(transferMsg);
        }
    }

    /**
     * 發生異常事件
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("從客戶端 " + ctx.channel().remoteAddress() + " 讀取到的消息, long: " + msg);
        // 回顯一個消息給客戶端
        String msgStr = msg.toString(CharsetUtil.UTF_8);
        System.out.println(msgStr);
    }
}

測試: 啟動一個服務器端,然后啟動一個客戶端,查看服務器日志如下:

服務端啟動成功,監聽地址: 6666
server is ok~~~~
netty.tcp.TransferMsgDecoder.decode 被調用
讀到消息,length: 12    msg:client msg 0
netty.tcp.TransferMsgDecoder.decode 被調用
讀到消息,length: 12    msg:client msg 1
netty.tcp.TransferMsgDecoder.decode 被調用
讀到消息,length: 12    msg:client msg 2
netty.tcp.TransferMsgDecoder.decode 被調用
讀到消息,length: 12    msg:client msg 3
netty.tcp.TransferMsgDecoder.decode 被調用
讀到消息,length: 12    msg:client msg 4
netty.tcp.TransferMsgDecoder.decode 被調用
讀到消息,length: 12    msg:client msg 5
netty.tcp.TransferMsgDecoder.decode 被調用
讀到消息,length: 12    msg:client msg 6
netty.tcp.TransferMsgDecoder.decode 被調用
讀到消息,length: 12    msg:client msg 7
netty.tcp.TransferMsgDecoder.decode 被調用
讀到消息,length: 12    msg:client msg 8
netty.tcp.TransferMsgDecoder.decode 被調用
讀到消息,length: 12    msg:client msg 9

 

  如果服務器向客戶端返回相同的消息,在服務器端也需要加入自己的編碼器;客戶端加入自己的解碼器。實測解決了粘包問題。

 

總結: Netty 解決方法:

1》 使用自定義協議 + 編解碼器來解決

2》 關鍵就是解決服務器每次讀取數據長度的問題,這個問題解決,就不會出現服務器多讀或者少讀的問題,從而避免TCP的粘包、拆包

 

  還有其他的解決辦法又自定義消息的結束符,比如我們約定以"XXXXXX" 結尾,則收到消息可以判斷是否以這個結尾。

參考: https://juejin.cn/post/6975109908106575903

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM