學習netty遇到的關於 LineBasedFrameDecoder 的問題


最近在看《Netty權威指南》這本書,關於TCP粘包/拆包,書中使用的是 LineBasedFrameDecoder 來解決的,但是我在實踐的過程中出現了問題,上代碼吧。

這個是 server 的代碼

package com.cd.netty4.zhc.demo.ex01;

import java.text.SimpleDateFormat;
import java.util.Date;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.CharsetUtil;

/** 
 * 本例子參考《Netty權威指南(第2版)》第4章 
 * 先運行 TimeServerExc02,然后運行 TimeClientExc02,可解決 TCP 粘包/拆包問題
 * 使用 LineBasedFrameDecoder + StringDecoder 解決 TCP 粘包/拆包問題
 * */
public class TimeServerExc02 {
    public static void main(String args[]) {
        System.out.println("---------------------- server 測試開始 ---------------------");
        new TimeServerExc02().bind("127.0.0.1", 1234);
        System.out.println("---------------------- server 測試end ---------------------");
    }

    public void bind(String host, int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serboot = new ServerBootstrap().group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel arg0) throws Exception {
                        arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
                        arg0.pipeline().addLast("decoder", new StringDecoder());
                        arg0.pipeline().addLast("handler", new TimeServerHandlerExc02());
                    }
                });

        try {
            // 綁定端口,同步等待成功
            ChannelFuture future = serboot.bind(host, port).sync();
            // 等待服務端監聽端口關閉
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

class TimeServerHandlerExc02 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("server channelActive(有client連接上了)..");
        ctx.writeAndFlush(Unpooled.copiedBuffer("您已經成功連接上了 server!", CharsetUtil.UTF_8)); // 必須有flush
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server channelRead..");
        String msgStr = msg.toString();
        System.out.println("讀入client消息:" + msgStr);
        String currentTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        ByteBuf resp = Unpooled.copiedBuffer(currentTime, CharsetUtil.UTF_8);
        ctx.writeAndFlush(resp);
        System.out.println("向client發送消息:" + currentTime);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

這個是client的代碼:

package com.cd.netty4.zhc.demo.ex01;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.CharsetUtil;

/** 
 * 本例子參考《Netty權威指南(第2版)》第4.2章 
 * */
public class TimeClientExc02 {

    public static void main(String[] args) {
        try {
            System.out.println("---------------------- client 測試開始 ---------------------");
            new TimeClientExc02().connect("127.0.0.1", 1234);
            System.out.println("---------------------- client 測試end ---------------------");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private int count;

    public void connect(String host, int port) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap boot = new Bootstrap().group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            ch.pipeline().addLast("decoder", new StringDecoder());
                            ch.pipeline().addLast(new TimeClientHandlerExc02());
                        }
                    });
            ChannelFuture future = boot.connect(host, port);
            // 等待客戶端鏈路關閉
            future.channel().closeFuture().sync();
        } finally {
            // 優雅的退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }

    class TimeClientHandlerExc02 extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client channelActive(client 連接成功)..");
            for (int i = 0; i < 50; i++) {
                System.out.print(i + ",");
                ctx.writeAndFlush(
                        Unpooled.copiedBuffer("It's a good day , I want to know time--" + i , CharsetUtil.UTF_8)); // 必須有flush
            }
            ctx.flush();
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("client channelRead.." + ++count);
            String msgStr = msg.toString();
            System.out.println("讀入 server 消息:" + msgStr);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }

    }

}

我先運行的是server,然后是client,發現 server 的 channelActive(..) 以及 client 的 channelActive(..) 都有運行到,但是后續的 channelRead(..) 方法卻遲遲沒有運行到,我把 LineBasedFrameDecoder 和 StringDecoder 這兩個 解碼器去掉,則代碼正常,但是會有 TCP 粘包/拆包問題。

在網上查了問題原因,無果,認真看了兩遍書,發現 LineBasedFrameDecoder  的工作原理是“ 它依次遍歷ByteBuf中的可讀字節,判斷看是否有‘\n’或者‘\r\n’,如果有,就在此位置為結束位置,從可讀索引到結束位置區間的字節就組成了一行 ”。

所以,我的問題就出在消息結尾處沒有加上換行符,修改代碼后,可運行。修改后代碼如下:

package com.cd.netty4.zhc.demo.ex01;

import java.text.SimpleDateFormat;
import java.util.Date;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.CharsetUtil;

/** 
 * 本例子參考《Netty權威指南(第2版)》第4章 
 * 先運行 TimeServerExc02,然后運行 TimeClientExc02,可解決 TCP 粘包/拆包問題
 * 使用 LineBasedFrameDecoder + StringDecoder 解決 TCP 粘包/拆包問題
 * 注意:使用 LineBasedFrameDecoder時,發送的消息結尾一定要是\n(官方是 System.getProperty("line.separator")),server端 和 client 都必須如此
 * 因為LineBasedFrameDecoder 的工作原理是,依次遍歷Bytebuf中的可讀字節,判斷是否有“\n”或者“\r\n”,如果有則在此位置結束
 * */
public class TimeServerExc02 {
    public static void main(String args[]) {
        System.out.println("---------------------- server 測試開始 ---------------------");
        new TimeServerExc02().bind("127.0.0.1", 1234);
        System.out.println("---------------------- server 測試end ---------------------");
    }

    public void bind(String host, int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serboot = new ServerBootstrap().group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel arg0) throws Exception {
                        arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
                        arg0.pipeline().addLast("decoder", new StringDecoder());
                        arg0.pipeline().addLast("handler", new TimeServerHandlerExc02());
                    }
                });

        try {
            // 綁定端口,同步等待成功
            ChannelFuture future = serboot.bind(host, port).sync();
            // 等待服務端監聽端口關閉
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

class TimeServerHandlerExc02 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("server channelActive(有client連接上了)..");
        ctx.writeAndFlush(Unpooled.copiedBuffer("您已經成功連接上了 server!", CharsetUtil.UTF_8)); // 必須有flush
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server channelRead..");
        String msgStr = msg.toString();
        System.out.println("讀入client消息:" + msgStr);
        String currentTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        ByteBuf resp = Unpooled.copiedBuffer(currentTime + System.getProperty("line.separator"), CharsetUtil.UTF_8);
        ctx.writeAndFlush(resp);
        System.out.println("向client發送消息:" + currentTime);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}
package com.cd.netty4.zhc.demo.ex01;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.CharsetUtil;

/** 
 * 本例子參考《Netty權威指南(第2版)》第4.2章 
 * 注意:使用 LineBasedFrameDecoder時,發送的消息結尾一定要是\n(官方是 System.getProperty("line.separator")),server端 和 client 都必須如此
 * 因為LineBasedFrameDecoder 的工作原理是,依次遍歷Bytebuf中的可讀字節,判斷是否有“\n”或者“\r\n”,如果有則在此位置結束
 * */
public class TimeClientExc02 {

    public static void main(String[] args) {
        try {
            System.out.println("---------------------- client 測試開始 ---------------------");
            new TimeClientExc02().connect("127.0.0.1", 1234);
            System.out.println("---------------------- client 測試end ---------------------");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private int count;

    public void connect(String host, int port) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap boot = new Bootstrap().group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            ch.pipeline().addLast("decoder", new StringDecoder());
                            ch.pipeline().addLast(new TimeClientHandlerExc02());
                        }
                    });
            ChannelFuture future = boot.connect(host, port);
            // 等待客戶端鏈路關閉
            future.channel().closeFuture().sync();
        } finally {
            // 優雅的退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }

    class TimeClientHandlerExc02 extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client channelActive(client 連接成功)..");
            for (int i = 0; i < 50; i++) {
                System.out.print(i + ",");
                ctx.writeAndFlush(
                        Unpooled.copiedBuffer("It's a good day , I want to know time--" + i + "\n", CharsetUtil.UTF_8)); // 必須有flush
            }
            ctx.flush();
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("client channelRead.." + ++count);
            String msgStr = msg.toString();
            System.out.println("讀入 server 消息:" + msgStr);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }

    }

}

 

查看了 LineBasedFrameDecoder 的部分源碼,確實是以換行作為分割符的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM