Netty(七):流數據的傳輸處理


Socket Buffer的缺陷

對於例如TCP/IP這種基於流的傳輸協議實現,接收到的數據會被存儲在socket的接受緩沖區內。不幸的是,這種基於流的傳輸緩沖區並不是一個包隊列,而是一個字節隊列。這意味着,即使你以兩個數據包的形式發送了兩條消息,操作系統卻不會把它們看成是兩條消息,而僅僅是一個批次的字節序列。因此,在這種情況下我們就無法保證收到的數據恰好就是遠程節點所發送的數據。例如,讓我們假設一個操作系統的TCP/IP堆棧收到了三個數據包:

由於這種流傳輸協議的普遍性質,在你的應用中有較高的可能會把這些數據讀取為另外一種形式:

因此對於數據的接收方,不管是服務端還是客戶端,應當重構這些接收到的數據,讓其變成一種可讓你的應用邏輯易於理解的更有意義的數據結構。在上面所述的這個例子中,接收到的數據應當重構為下面的形式:

第一種解決方案(使用特殊字符分割)

Netty提供了一個分隔符類DelimiterBasedFrameDecoder(自定義分隔符)

下面的開發我是居於我的Netty第一個開發程序來講的,沒看過我的這篇文章可以先看看,想信你在Netty第一個開發程序會捕獲很多你想不到的知識。

服務端

 

public class Server {

    public static void main(String[] args) throws Exception{
        //1 創建2個線程,一個是負責接收客戶端的連接。一個是負責進行數據傳輸的
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();

        //2 創建服務器輔助類
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         .option(ChannelOption.SO_SNDBUF, 32*1024)
         .option(ChannelOption.SO_RCVBUF, 32*1024)
         .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //1 設置特殊分隔符  
                ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
                //2
                sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                //3 設置字符串形式的解碼
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ServerHandler());
            }
        });

        //4 綁定連接
        ChannelFuture cf = b.bind(8765).sync();

        //等待服務器監聽端口關閉
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();

    }

}

關於EventLoopGroup、ServerBootstrap等等之類的我都在Netty的第一個程序都講得很清楚了,需要了解的可以參考我的第一篇文章。

代碼說明:

1、 Unpooled.copiedBuffer(“$_”.getBytes()) 這個是設置特殊分隔符返回的是Netty中的ByteBuf類型這里我設置的是 $_

2、DelimiterBasedFrameDecoder()是處理分隔符的類

3、StringDecoder() 設置字符串形式的解碼

服務端業務處理

public class ServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" server channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String request = (String)msg;
        System.out.println("Server :" + msg);
        String response = "服務器響應:" + msg + "$_";
        ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
    }

}

 

由於在服務端就使用了StringDecoder()解碼成字符串形式,這里不需要用ByteBuf去轉換成字符串。

客戶端

 

public class Client {

    public static void main(String[] args) throws Exception {

        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //1
                ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
                //2
                sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                //3
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ClientHandler());
            }
        });

        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();

        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("777$_".getBytes()));
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("666$_".getBytes()));
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("888$_".getBytes()));


        //等待客戶端端口關閉
        cf.channel().closeFuture().sync();
        group.shutdownGracefully();

    }
}

由於這里客戶端也接收服務端返回的數據所以也采用了與服務端一樣的處理方式。

客戶端業務處理

 

public class ClientHandler extends ChannelHandlerAdapter{

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            String response = (String)msg;
            System.out.println("Client: " + response);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
}

好!到這第一種解決方案就編寫結束了,先啟動服務端,再啟動客戶端

客戶端打印如下:

 

客戶端簽到后服務端的打印如下:

 

第二種解決方案(定長)

Netty提供了一個定長類FixdeLengthFraneDecoder。

使用這個定長的有個弊端:如果由多個字段比如可變長度的字段組成時這個時候並解決不了什么問題,建議使用第一個解決方案。

FixdeLengthFraneDecoder的使用跟DelimiterBasedFrameDecoder差不多,由於代碼都差不多一樣這里我不做太多的說明。

服務端

 

public class Server {

    public static void main(String[] args) throws Exception{
        //創建2個線程,一個是負責接收客戶端的連接。一個是負責進行數據傳輸的
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();

        //創建服務器輔助類
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         .option(ChannelOption.SO_SNDBUF, 32*1024)
         .option(ChannelOption.SO_RCVBUF, 32*1024)
         .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //1  設置定長字符串接收  
                sc.pipeline().addLast(new FixedLengthFrameDecoder(3));
                //2  設置字符串形式的解碼
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ServerHandler());
            }
        });

        //4 綁定連接
        ChannelFuture cf = b.bind(8765).sync();

        //等待服務器監聽端口關閉
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();

    }

}

1、FixedLengthFrameDecoder(3) 這里設置定長字符串接收具體設置多長自己定。

2、StringDecoder() 設置字符串形式的解碼。

服務端業務處理

public class ServerHandler extends ChannelHandlerAdapter {


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" server channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String request = (String)msg;
        System.out.println("Server :" + msg);
        String response =  request ;
        ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
    }
}

 

服務端

 

public class Client {

    public static void main(String[] args) throws Exception {

        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(new FixedLengthFrameDecoder(3));
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ClientHandler());
            }
        });

        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();

        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("777".getBytes()));
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("666".getBytes()));
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("888".getBytes()));

        //等待客戶端端口關閉
        cf.channel().closeFuture().sync();
        group.shutdownGracefully();

    }
}

客戶端業務處理

public class ClientHandler extends ChannelHandlerAdapter{

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            String response = (String)msg;
            System.out.println("Client: " + response);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
}

 

好!到這第二種解決方案就編寫結束了,先啟動服務端,再啟動客戶端

客戶端打印如下:

 

客戶端簽到后服務端的打印如下:

 




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM