Netty使用LineBasedFrameDecoder解決TCP粘包/拆包


TCP粘包/拆包

TCP是個”流”協議,所謂流,就是沒有界限的一串數據。TCP底層並不了解上層業務數據的具體含義,它會根據TCP緩沖區的實際情況進行包的划分,所以在業務上認為,一個完整的包可能會被TCP拆分成多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的TCP粘包和拆包問題

 

TCP粘包/拆包發生的原因

1. 應用程序write寫入的字節大小大於套接口發送緩沖區大小
2. 進行MSS大小的TCP分段
3. 以太網幀的payload大於MTU進行IP分片

粘包問題的解決策略

由於底層的TCP無法理解上層的業務數據,所以在底層是無法保證數據包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,可以歸納如下

先來看一個粘包的例子

新建maven工程,添加依賴包

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>5.0.0.Alpha1</version>
    </dependency>

TimeServer

package com.zhen.netty1129_TCP_HALF_PACKAGE;

import java.awt.Event;
import java.net.Socket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TimeServer {

    public void bind(int port) throws Exception{
        //配置服務端的NIO線程組
        //NioEventLoopGroup是個線程組,它包含了一組NIO線程,專門用於網絡事件的處理,實際上它們就是Reactor線程組
        //bossGroup用於服務端接受客戶端的連接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //workerGroup進行SocketChannel的網絡讀寫
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //Netty用於啟動NIO服務端的輔助啟動類,目的是降低服務端的開發復雜度
            ServerBootstrap bootstrap = new ServerBootstrap();
            //將兩個NIO線程組當作入參傳遞到ServerBootstrap
            bootstrap.group(bossGroup, workerGroup)
                //設置創建的Channel為NioServerSocketChannel,它的功能對應於JDK NIO類庫中的ServerSocketChannel類。
                .channel(NioServerSocketChannel.class)
                //配置NioServerSocketChannel的TCP參數,此處將它的backlog設置為1024
                .option(ChannelOption.SO_BACKLOG, 1024)
                //綁定I/O事件的處理類ChildChannelHandler,它的作用類似於Reactor模式中的Handler類,主要用於處理網絡I/O事件,例如記錄日志、對消息進行編解碼等
                .childHandler(new ChildChannelHandler());
            //調用bind方法綁定監聽端口,隨后,調用它的同步阻塞方法sync等待綁定操作完成。
            //完成之后Netty會返回一個ChannelFuture,它的功能類似於JDK的java.util.concurrent.Future,主要用於異步操作的通知回調
            ChannelFuture future = bootstrap.bind(port).sync();
            //等待服務端監聽端口關閉,等待服務端鏈路關閉之后main函數才退出
            future.channel().closeFuture().sync();
        } finally {
            //優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
    private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new TimeServerHandler());
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port = 9090;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        new TimeServer().bind(port);
    }
}

TimeServerHandler

package com.zhen.netty1129_TCP_HALF_PACKAGE;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

//TimeServerHandler 繼承自ChannelHandlerAdapter,它用於對網絡事件進行讀寫操作
public class TimeServerHandler extends ChannelHandlerAdapter{
    
    private int counter;
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //將msg轉換成Netty的ByteBuf對象。ByteBuf類似於jdk中的java.nio.ByteBuffer對象,不過它提供了更加強大和靈活的功能
        ByteBuf buf = (ByteBuf) msg;
        //通過ByteBuf的readableBytes方法可以獲取緩沖區可讀的字節數,根據可讀的字節數創建byte數組
        byte[] req = new byte[buf.readableBytes()];
        //通過ByteBuf的readBytes方法將緩沖區中的字節數據復制到新建的byte數組中
        buf.readBytes(req);
        //通過new String構造函數獲取請求消息
        String body = new String(req, "UTF-8").substring(0, req.length
                - System.getProperty("line.separator").length());
        System.out.println("The time server receive order : " + body
                + "; the counter is : "+ ++counter);
        //對請求消息進行判斷,如果是QUERY TIME ORDER則創建應答消息
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? 
                new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        //通過ChannelHandlerContext的write方法異步發送應答消息給客戶端
        ctx.writeAndFlush(resp);
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //當發生異常時,關閉ChannelHandlerContext,釋放和ChannelHandlerContext相關聯的句柄等資源
        ctx.close();
    }
}

TimeClient

package com.zhen.netty1129_TCP_HALF_PACKAGE;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {

    public void connect(int port,String host) throws Exception{
        //配置客戶端NIO線程組,客戶端處理I/O讀寫的NioEventLoopGroup線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //客戶端輔助啟動類Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //設置線程組
            bootstrap.group(group)
                //與服務端不同的是,它的channel需要設置為NioSocketChannel
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                //然后為其添加Handler,此處為了簡單直接創建匿名內部類,實現initChannel方法
                //作用是當創建NioSocketChannel成功之后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡I/O事件
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(new TimeClientHandler());
                    }
                });
            //調用connect發起異步連接操作,然后調用sync同步方法等待連接成功。
            ChannelFuture future = bootstrap.connect(host, port).sync();
            //等待客戶端鏈路關閉,當客戶端連接關閉之后,客戶端主函數退出,退出之前釋放NIO線程組的資源
            future.channel().closeFuture().sync();
        } finally {
            //優雅退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception{
        int port = 9090;
        String host = "127.0.0.1";
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        new TimeClient().connect(port, host);
    }
    
}

TimeClientHandler

package com.zhen.netty1129_TCP_HALF_PACKAGE;

import java.util.logging.Logger;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class TimeClientHandler extends ChannelHandlerAdapter{
    
    private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
    
    private int counter;
    
    private byte[] req;
    
    
    public TimeClientHandler(){
        req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();
        
    }
    
    //當客戶端和服務端TCP鏈路建立成功之后,Netty的NIO線程會調用channelActive方法,發送查詢時間的指令給服務端
    //調用ChannelHandlerContext的writeAndFlush方法將請求消息發送給客戶端
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }
    
    //當客戶端返回應答消息,channelRead方法被調用
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("Now is :" + body + " ; the counter is : " + ++counter);
    }
    
    //發生異常時,釋放客戶端資源
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.warning("Unexpected exception from downstream : " + cause.getMessage());
        ctx.close();
    }
    
}

此時啟動server,再啟動client,可看到以下結果

server端

client端

 

 

 

可以發現,server只受到了兩條消息,說明發生了粘包,但是我們期望的是收到100條消息,每條包含一條”QUERY TIME ORDER”指令,這說明發生了TCP粘包
客戶端應該收到100條當前系統時間,但實際上只收到了一條,因為服務端只收到了2條請求消息,所以實際服務端只發送了2條應答,由於請求消息不滿足查詢條件,所以返回了2條”BAD ORDER”應答消息。但是實際上客戶端只收到了一條包含兩條”BAD ORDER”指令的消息,說明服務端返回的應答消息也發生了粘包

 

解決TCP粘包問題

利用LineBasedFrameDecoder解決TCP粘包問題

為了解決TCP粘包/拆包導致的半包讀寫問題,Netty默認提供了多種編解碼器用於處理半包,只要能熟練掌握這些類庫的使用,TCP粘包問題從此會變得非常容易

來看代碼

TimeServer

package com.zhen.netty1129_TCP_HALF_PACKAGE_SOLVE;

import java.awt.Event;
import java.net.Socket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class TimeServer {

    public void bind(int port) throws Exception{
        //配置服務端的NIO線程組
        //NioEventLoopGroup是個線程組,它包含了一組NIO線程,專門用於網絡事件的處理,實際上它們就是Reactor線程組
        //bossGroup用於服務端接受客戶端的連接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //workerGroup進行SocketChannel的網絡讀寫
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //Netty用於啟動NIO服務端的輔助啟動類,目的是降低服務端的開發復雜度
            ServerBootstrap bootstrap = new ServerBootstrap();
            //將兩個NIO線程組當作入參傳遞到ServerBootstrap
            bootstrap.group(bossGroup, workerGroup)
                //設置創建的Channel為NioServerSocketChannel,它的功能對應於JDK NIO類庫中的ServerSocketChannel類。
                .channel(NioServerSocketChannel.class)
                //配置NioServerSocketChannel的TCP參數,此處將它的backlog設置為1024
                .option(ChannelOption.SO_BACKLOG, 1024)
                //綁定I/O事件的處理類ChildChannelHandler,它的作用類似於Reactor模式中的Handler類,主要用於處理網絡I/O事件,例如記錄日志、對消息進行編解碼等
                .childHandler(new ChildChannelHandler());
            //調用bind方法綁定監聽端口,隨后,調用它的同步阻塞方法sync等待綁定操作完成。
            //完成之后Netty會返回一個ChannelFuture,它的功能類似於JDK的java.util.concurrent.Future,主要用於異步操作的通知回調
            ChannelFuture future = bootstrap.bind(port).sync();
            //等待服務端監聽端口關閉,等待服務端鏈路關閉之后main函數才退出
            future.channel().closeFuture().sync();
        } finally {
            //優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
    private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            //在原來的TimeServerHandler之前新增了兩個解碼器LineBasedFrameDecoder、StringDecoder
            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
            ch.pipeline().addLast(new StringDecoder());
            ch.pipeline().addLast(new TimeServerHandler());
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port = 9090;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        new TimeServer().bind(port);
    }
}

TimeServerHandler

package com.zhen.netty1129_TCP_HALF_PACKAGE_SOLVE;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

//TimeServerHandler 繼承自ChannelHandlerAdapter,它用於對網絡事件進行讀寫操作
public class TimeServerHandler extends ChannelHandlerAdapter{
    
    private int counter;
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String)msg;
        System.out.println("The time server receive order : " + body
                + "; the counter is : "+ ++counter);
        //對請求消息進行判斷,如果是QUERY TIME ORDER則創建應答消息
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? 
                new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        //通過ChannelHandlerContext的write方法異步發送應答消息給客戶端
        ctx.writeAndFlush(resp);
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //當發生異常時,關閉ChannelHandlerContext,釋放和ChannelHandlerContext相關聯的句柄等資源
        ctx.close();
    }
}

TimeClient

package com.zhen.netty1129_TCP_HALF_PACKAGE_SOLVE;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class TimeClient {

    public void connect(int port,String host) throws Exception{
        //配置客戶端NIO線程組,客戶端處理I/O讀寫的NioEventLoopGroup線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //客戶端輔助啟動類Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //設置線程組
            bootstrap.group(group)
                //與服務端不同的是,它的channel需要設置為NioSocketChannel
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                //然后為其添加Handler,此處為了簡單直接創建匿名內部類,實現initChannel方法
                //作用是當創建NioSocketChannel成功之后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡I/O事件
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        //在原來的TimeClientHandler之前新增了兩個解碼器LineBasedFrameDecoder、StringDecoder
                        ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                        ch.pipeline().addLast(new StringDecoder());                         ch.pipeline().addLast(new TimeClientHandler());
                    }
                });
            //調用connect發起異步連接操作,然后調用sync同步方法等待連接成功。
            ChannelFuture future = bootstrap.connect(host, port).sync();
            //等待客戶端鏈路關閉,當客戶端連接關閉之后,客戶端主函數退出,退出之前釋放NIO線程組的資源
            future.channel().closeFuture().sync();
        } finally {
            //優雅退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception{
        int port = 9090;
        String host = "127.0.0.1";
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        new TimeClient().connect(port, host);
    }
    
}

TimeClientHandler

package com.zhen.netty1129_TCP_HALF_PACKAGE_SOLVE;

import java.util.logging.Logger;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class TimeClientHandler extends ChannelHandlerAdapter{
    
    private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
    
    private int counter;
    
    private byte[] req;
    
    
    public TimeClientHandler(){
        req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();
        
    }
    
    //當客戶端和服務端TCP鏈路建立成功之后,Netty的NIO線程會調用channelActive方法,發送查詢時間的指令給服務端
    //調用ChannelHandlerContext的writeAndFlush方法將請求消息發送給客戶端
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }
    
    //當客戶端返回應答消息,channelRead方法被調用
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //拿到的msg已經是解碼成字符串之后的應答消息了。
        String body = (String)msg;
        System.out.println("Now is :" + body + " ; the counter is : " + ++counter);
    }
    
    //發生異常時,釋放客戶端資源
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.warning("Unexpected exception from downstream : " + cause.getMessage());
        ctx.close();
    }
    
}

此時再次運行,查看結果

Server端

client端

 

此時TCP粘包問題已經解決

 

LineBasedFrameDecoder和StringDecoder的原理分析
LineBasedFrameDecoder的工作原理是依次便利ByteBuf中的刻度子節,判斷看是否有”\n” 或者“\r”,如果有,就以此為止為結束位置,從可讀索引到結束位置區間的字節久組成了一行。它是以換行符為結束標志的解碼器,支持攜帶結束符或者不攜帶結束符兩種編碼方式,同時支持配置單行的最大長度后仍然沒有發現換行符,就會拋出異常,同時忽略掉之前讀到的異常碼流。
StringDecoder的功能非常簡單,就是將接收到的對象轉換成字符串,然后繼續調用后面的handler。LineBasedFrameDecoder+StringDecoder組合就是按行切換的文本解碼器,它被設計用來支持TCP的粘包和拆包.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM