netty自定義簡單解碼器處理粘包、拆包


tcp連接的粘包、拆包發生在長連接中,先了解一下長、短連接的概念

短連接:請求/響應之后,關閉已經建立的tcp連接,下次請求再建立新的連接

長連接:請求/響應之后,不關閉已經建立的tcp連接,多次請求,復用同一個連接

粘包:Nagle算法,客戶端累積一定量或者緩沖一段時間再傳輸。服務端緩沖區堆積,導致多個請求粘在一起

拆包:發送的請求大於發送緩沖區,進行分片傳輸。服務端緩沖區堆積,導致服務端讀取的請求數據不完成

 

可以模擬粘包場景,新建一個socket工程如下所示

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;

public class Client {

    public static void main(String[] args) throws IOException, InterruptedException {

        Socket socket = new Socket();
        socket.connect(new InetSocketAddress(10000));

        OutputStream outputStream = socket.getOutputStream();

        byte[] request = new byte[200];
        byte[] message = "測試測試測試測試測試測試測試測試測試測試測試測試測試測試測試測試測試".getBytes();
        System.arraycopy(message, 0, request, 0, message.length);


        //開十個線程向服務端發送消息
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    outputStream.write(request);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();
        }

    }
}

新建一個netty server如下所示

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {

    public static void main(String[] args) {

        ServerBootstrap serverBootstrap = new ServerBootstrap();

        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            serverBootstrap.group(eventLoopGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(10000)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new Handler());
                        }
                    });
            ChannelFuture future = serverBootstrap.bind().syncUninterruptibly();
            future.channel().closeFuture().syncUninterruptibly();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

新建處理客戶端消息的Handler,這里只是簡單的將消息打印出來

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class Handler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        if (msg instanceof ByteBuf) {
            byte[] bytes = new byte[((ByteBuf) msg).readableBytes()];
            ((ByteBuf) msg).readBytes(bytes);
            System.out.println(new String(bytes));
        } else {
            System.out.println(new String((byte[]) msg));
        }

        ctx.fireChannelRead(msg);
    }
}

運行程序后,服務端打印的數據如下,只打印了兩條消息,這顯然不是我們想要的結果

對於這種情況,如果我們需要自己處理的話,可以繼承netty提供的ByteToMessageDecoder類並實現其decode方法,這其實就是所謂的編碼解碼過程。

代碼如下所示

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

public class Decoder extends ByteToMessageDecoder {

    //為了簡單處理,假設協議為每次固定傳輸200字節
    private static final int POCKET_SIZE = 200;

    //記錄上次未讀完的字節
    private ByteBuf tempMessage = Unpooled.buffer();


    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        int inSize = byteBuf.readableBytes();

        System.out.println("=========收到" + inSize + "字節========");
        ByteBuf inMessage;

        //加上上次未讀取完成的字節
        if (tempMessage.readableBytes() == 0) {
            inMessage = byteBuf;
        } else {
            inMessage = Unpooled.buffer();
            inMessage.writeBytes(tempMessage);
            inMessage.writeBytes(byteBuf);
        }

        int counter = inMessage.readableBytes() / POCKET_SIZE;

        for (int i = 0; i < counter; i++) {
            byte[] bytes = new byte[POCKET_SIZE];
            inMessage.readBytes(bytes);
            //將處理的好的消息放入list中向下傳遞
            list.add(bytes);
        }

        tempMessage.clear();
        if (inMessage.readableBytes() != 0) {
            inMessage.readBytes(tempMessage, inMessage.readableBytes());
        }

    }
}

 解碼器編寫完成之后,還需要再server啟動時加上,代碼如下所示:

serverBootstrap.group(eventLoopGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(10000)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new Decoder());
                            pipeline.addLast(new Handler());
                        }
                    });

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM