Netty學習篇五——netty解決粘包和拆包問題的四種方案


  在RPC框架中,粘包和拆包問題是必須解決一個問題,因為RPC框架中,各個微服務相互之間都是維系了一個TCP長連接,比如dubbo就是一個全雙工的長連接。由於微服務往對方發送信息的時候,所有的請求都是使用的同一個連接,這樣就會產生粘包和拆包的問題。本文首先會對粘包和拆包問題進行描述,然后介紹其常用的解決方案,最后會對Netty提供的幾種解決方案進行講解。這里說明一下本文統一使用“解碼一器”表示該解碼器。

1. 粘包和拆包

   產生粘包和拆包問題的主要原因是,操作系統在發送TCP數據的時候,底層會有一個緩沖區,例如1024個字節大小,如果一次請求發送的數據量比較小,沒達到緩沖區大小,TCP則會將多個請求合並為同一個請求進行發送,這就形成了粘包問題;如果一次請求發送的數據量比較大,超過了緩沖區大小,TCP就會將其拆分為多次發送,這就是拆包,也就是將一個大的包拆分為多個小包進行發送。如下圖展示了粘包和拆包的一個示意圖:

 

 上圖中演示了粘包和拆包的三種情況:

  • A和B兩個包都剛好滿足TCP緩沖區的大小,或者說其等待時間已經達到TCP等待時長,從而還是使用兩個獨立的包進行發送;

  • A和B兩次請求間隔時間內較短,並且數據包較小,因而合並為同一個包發送給服務端;

  • B包比較大,因而將其拆分為兩個包B_1和B_2進行發送,而這里由於拆分后的B_2比較小,其又與A包合並在一起發送。

2. 常見解決方案

對於粘包和拆包問題,常見的解決方案有四種:

  • 客戶端在發送數據包的時候,每個包都固定長度,比如1024個字節大小,如果客戶端發送的數據長度不足1024個字節,則通過補充空格的方式補全到指定長度;

  • 客戶端在每個包的末尾使用固定的分隔符,例如\r\n,如果一個包被拆分了,則等待下一個包發送過來之后找到其中的\r\n,然后對其拆分后的頭部部分與前一個包的剩余部分進行合並,這樣就得到了一個完整的包;

  • 將消息分為頭部和消息體,在頭部中保存有當前整個消息的長度,只有在讀取到足夠長度的消息之后才算是讀到了一個完整的消息;

  • 通過自定義協議進行粘包和拆包的處理。

3. Netty提供的粘包拆包解決方案

3.1  固定長度的拆包器 FixedLengthFrameDecoder,每個應用層數據包的都拆分成都是固定長度的大小

  對於使用固定長度的粘包和拆包場景,可以使用FixedLengthFrameDecoder,該解碼一器會每次讀取固定長度的消息,如果當前讀取到的消息不足指定長度,那么就會等待下一個消息到達后進行補足。其使用也比較簡單,只需要在構造函數中指定每個消息的長度即可。這里需要注意的是,FixedLengthFrameDecoder只是一個解碼一器,Netty也只提供了一個解碼一器,這是因為對於解碼是需要等待下一個包的進行補全的,代碼相對復雜,而對於編碼器,用戶可以自行編寫,因為編碼時只需要將不足指定長度的部分進行補全即可。下面的示例中展示了如何使用FixedLengthFrameDecoder來進行粘包和拆包處理:

public class EchoServer {

  public void bind(int port) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap bootstrap = new ServerBootstrap();
      bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            // 這里將FixedLengthFrameDecoder添加到pipeline中,指定長度為20
            ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
            // 將前一步解碼得到的數據轉碼為字符串
            ch.pipeline().addLast(new StringDecoder());
            // 這里FixedLengthFrameEncoder是我們自定義的,用於將長度不足20的消息進行補全空格
            ch.pipeline().addLast(new FixedLengthFrameEncoder(20));
            // 最終的數據處理
            ch.pipeline().addLast(new EchoServerHandler());
          }
        });

      ChannelFuture future = bootstrap.bind(port).sync();
      future.channel().closeFuture().sync();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }

  public static void main(String[] args) throws InterruptedException {
    new EchoServer().bind(8080);
  }
}

 上面的pipeline中,對於入棧數據,這里主要添加了FixedLengthFrameDecoderStringDecoder,前面一個用於處理固定長度的消息的粘包和拆包問題,第二個則是將處理之后的消息轉換為字符串。最后由EchoServerHandler處理最終得到的數據,處理完成后,將處理得到的數據交由FixedLengthFrameEncoder處理,該編碼器是我們自定義的實現,主要作用是將長度不足20的消息進行空格補全。下面是FixedLengthFrameEncoder的實現代碼: 

public class FixedLengthFrameEncoder extends MessageToByteEncoder<String> {
  private int length;

  public FixedLengthFrameEncoder(int length) {
    this.length = length;
  }

  @Override
  protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out)
      throws Exception {
    // 對於超過指定長度的消息,這里直接拋出異常
    if (msg.length() > length) {
      throw new UnsupportedOperationException(
          "message length is too large, it's limited " + length);
    }

    // 如果長度不足,則進行補全
    if (msg.length() < length) {
      msg = addSpace(msg);
    }

    ctx.writeAndFlush(Unpooled.wrappedBuffer(msg.getBytes()));
  }

  // 進行空格補全
  private String addSpace(String msg) {
    StringBuilder builder = new StringBuilder(msg);
    for (int i = 0; i < length - msg.length(); i++) {
      builder.append(" ");
    }

    return builder.toString();
  }
}

  這里FixedLengthFrameEncoder實現了encode()方法,在該方法中,主要是將消息長度不足20的消息進行空格補全。EchoServerHandler的作用主要是打印接收到的消息,然后發送響應給客戶端: 

  對於客戶端,其實現方式基本與服務端的使用方式類似,只是在最后進行消息發送的時候與服務端的處理方式不同。如下是客戶端EchoClient的代碼:

public class EchoClient {

  public void connect(String host, int port) throws InterruptedException {
    EventLoopGroup group = new NioEventLoopGroup();
    try {
      Bootstrap bootstrap = new Bootstrap();
      bootstrap.group(group)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .handler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            // 對服務端發送的消息進行粘包和拆包處理,由於服務端發送的消息已經進行了空格補全,
            // 並且長度為20,因而這里指定的長度也為20
            ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
            // 將粘包和拆包處理得到的消息轉換為字符串
            ch.pipeline().addLast(new StringDecoder());
            // 對客戶端發送的消息進行空格補全,保證其長度為20
            ch.pipeline().addLast(new FixedLengthFrameEncoder(20));
            // 客戶端發送消息給服務端,並且處理服務端響應的消息
            ch.pipeline().addLast(new EchoClientHandler());
          }
        });

      ChannelFuture future = bootstrap.connect(host, port).sync();
      future.channel().closeFuture().sync();
    } finally {
      group.shutdownGracefully();
    }
  }

  public static void main(String[] args) throws InterruptedException {
    new EchoClient().connect("127.0.0.1", 8080);
  }
}

  對於客戶端而言,其消息的處理流程其實與服務端是相似的,對於入站消息,需要對其進行粘包和拆包處理,然后將其轉碼為字符串,對於出站消息,則需要將長度不足20的消息進行空格補全。客戶端與服務端處理的主要區別在於最后的消息處理handler不一樣,也即這里的EchoClientHandler,如下是該handler的源碼:  

public class EchoClientHandler extends SimpleChannelInboundHandler<String> {

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    System.out.println("client receives message: " + msg.trim());
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.writeAndFlush("hello server!");
  }
}

  這里客戶端的處理主要是重寫了channelActive()channelRead0()兩個方法,這兩個方法的主要作用在於,channelActive()會在客戶端連接上服務器時執行,也就是說,其連上服務器之后就會往服務器發送消息。而channelRead0()主要是在服務器發送響應給客戶端時執行,這里主要是打印服務器的響應消息。對於服務端而言,前面我們我們可以看到,EchoServerHandler只重寫了channelRead0()方法,這是因為服務器只需要等待客戶端發送消息過來,然后在該方法中進行處理,處理完成后直接將響應發送給客戶端。如下是分別啟動服務端和客戶端之后控制台打印的數據: 

3.2 行拆包器 LineBasedFrameDecoder與分隔符拆包器 DelimiterBasedFrameDecoder

       對於通過分隔符進行粘包和拆包問題的處理,Netty提供了兩個編解碼的類,LineBasedFrameDecoderDelimiterBasedFrameDecoder。這里LineBasedFrameDecoder的作用主要是通過換行符,即\n或者\r\n對數據進行處理;而DelimiterBasedFrameDecoder的作用則是通過用戶指定的分隔符對數據進行粘包和拆包處理。同樣的,這兩個類都是解碼一器類,而對於數據的編碼,也即在每個數據包最后添加換行符或者指定分割符的部分需要用戶自行進行處理。這里以DelimiterBasedFrameDecoder為例進行講解,如下是EchoServer中使用該類的代碼片段,其余部分與前面的例子中的完全一致:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    String delimiter = "_$";
    // 將delimiter設置到DelimiterBasedFrameDecoder中,經過該解碼一器進行處理之后,源數據將會
    // 被按照_$進行分隔,這里1024指的是分隔的最大長度,即當讀取到1024個字節的數據之后,若還是未
    // 讀取到分隔符,則舍棄當前數據段,因為其很有可能是由於碼流紊亂造成的
    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
        Unpooled.wrappedBuffer(delimiter.getBytes())));
    // 將分隔之后的字節數據轉換為字符串數據
    ch.pipeline().addLast(new StringDecoder());
    // 這是我們自定義的一個編碼器,主要作用是在返回的響應數據最后添加分隔符
    ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));
    // 最終處理數據並且返回響應的handler
    ch.pipeline().addLast(new EchoServerHandler());
}

   上面pipeline的設置中,添加的解碼一器主要有DelimiterBasedFrameDecoderStringDecoder,經過這兩個處理器處理之后,接收到的字節流就會被分隔,並且轉換為字符串數據,最終交由EchoServerHandler處理。這里DelimiterBasedFrameEncoder是我們自定義的編碼器,主要作用是在返回的響應數據之后添加分隔符。該編碼器源碼如下:

public class DelimiterBasedFrameEncoder extends MessageToByteEncoder<String> {

  private String delimiter;

  public DelimiterBasedFrameEncoder(String delimiter) {
    this.delimiter = delimiter;
  }

  @Override
  protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) 
      throws Exception {
    // 在響應的數據后面添加分隔符
    ctx.writeAndFlush(Unpooled.wrappedBuffer((msg + delimiter).getBytes()));
  }
}

對於客戶端而言,這里的處理方式與服務端類似,其pipeline的添加方式如下:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    String delimiter = "_$";
    // 對服務端返回的消息通過_$進行分隔,並且每次查找的最大大小為1024字節
    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, 
        Unpooled.wrappedBuffer(delimiter.getBytes())));
    // 將分隔之后的字節數據轉換為字符串
    ch.pipeline().addLast(new StringDecoder());
    // 對客戶端發送的數據進行編碼,這里主要是在客戶端發送的數據最后添加分隔符
    ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));
    // 客戶端發送數據給服務端,並且處理從服務端響應的數據
    ch.pipeline().addLast(new EchoClientHandler());
}

這里客戶端的處理方式與服務端基本一致,關於這里沒展示的代碼,其與示例一中的代碼完全一致,這里則不予展示。  

3.3 基於數據包長度的拆包器 LengthFieldBasedFrameDecoder與LengthFieldPrepender

  這里LengthFieldBasedFrameDecoderLengthFieldPrepender需要配合起來使用,其實本質上來講,這兩者一個是解碼,一個是編碼的關系。它們處理粘拆包的主要思想是在生成的數據包中添加一個長度字段,用於記錄當前數據包的長度。LengthFieldBasedFrameDecoder會按照參數指定的包長度偏移量數據對接收到的數據進行解碼,從而得到目標消息體數據;而LengthFieldPrepender則會在響應的數據前面添加指定的字節數據,這個字節數據中保存了當前消息體的整體字節數據長度。LengthFieldBasedFrameDecoder的解碼過程如下圖所示:

LengthFieldPrepender的編碼過程如下圖所示:

 關於LengthFieldBasedFrameDecoder,這里需要對其構造函數參數進行介紹:

  • maxFrameLength:指定了每個包所能傳遞的最大數據包大小;

  • lengthFieldOffset:指定了長度字段在字節碼中的偏移量;

  • lengthFieldLength:指定了長度字段所占用的字節長度;

  • lengthAdjustment:對一些不僅包含有消息頭和消息體的數據進行消息頭的長度的調整,這樣就可以只得到消息體的數據,這里的lengthAdjustment指定的就是消息頭的長度;

  • initialBytesToStrip:對於長度字段在消息頭中間的情況,可以通過initialBytesToStrip忽略掉消息頭以及長度字段占用的字節。

 這里我們以json序列化為例對LengthFieldBasedFrameDecoderLengthFieldPrepender的使用方式進行講解。如下是EchoServer的源碼:

public class EchoServer {

  public void bind(int port) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap bootstrap = new ServerBootstrap();
      bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            // 這里將LengthFieldBasedFrameDecoder添加到pipeline的首位,因為其需要對接收到的數據
            // 進行長度字段解碼,這里也會對數據進行粘包和拆包處理
            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
            // LengthFieldPrepender是一個編碼器,主要是在響應字節數據前面添加字節長度字段
            ch.pipeline().addLast(new LengthFieldPrepender(2));
            // 對經過粘包和拆包處理之后的數據進行json反序列化,從而得到User對象
            ch.pipeline().addLast(new JsonDecoder());
            // 對響應數據進行編碼,主要是將User對象序列化為json
            ch.pipeline().addLast(new JsonEncoder());
            // 處理客戶端的請求的數據,並且進行響應
            ch.pipeline().addLast(new EchoServerHandler());
          }
        });

      ChannelFuture future = bootstrap.bind(port).sync();
      future.channel().closeFuture().sync();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }

  public static void main(String[] args) throws InterruptedException {
    new EchoServer().bind(8080);
  }
}

   這里EchoServer主要是在pipeline中添加了兩個編碼器和兩個解碼一器,編碼器主要是負責將響應的User對象序列化為json對象,然后在其字節數組前面添加一個長度字段的字節數組;解碼一器主要是對接收到的數據進行長度字段的解碼,然后將其反序列化為一個User對象。下面是JsonDecoder的源碼:

public class JsonDecoder extends MessageToMessageDecoder<ByteBuf> {

  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) 
      throws Exception {
    byte[] bytes = new byte[buf.readableBytes()];
    buf.readBytes(bytes);
    User user = JSON.parseObject(new String(bytes, CharsetUtil.UTF_8), User.class);
    out.add(user);
  }
}

  JsonDecoder首先從接收到的數據流中讀取字節數組,然后將其反序列化為一個User對象。JsonEncoder源碼如下: 

public class JsonEncoder extends MessageToByteEncoder<User> {

  @Override
  protected void encode(ChannelHandlerContext ctx, User user, ByteBuf buf)
      throws Exception {
    String json = JSON.toJSONString(user);
    ctx.writeAndFlush(Unpooled.wrappedBuffer(json.getBytes()));
  }
}

  JsonEncoder將響應得到的User對象轉換為一個json對象,然后寫入響應中。對於EchoServerHandler,其主要作用就是接收客戶端數據,並且進行響應,如下是其源碼:

public class EchoServerHandler extends SimpleChannelInboundHandler<User> {

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
    System.out.println("receive from client: " + user);
    ctx.write(user);
  }
}

 對於客戶端,其主要邏輯與服務端的基本類似,這里主要展示其pipeline的添加方式,以及最后發送請求,並且對服務器響應進行處理的過程: 

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
    ch.pipeline().addLast(new LengthFieldPrepender(2));
    ch.pipeline().addLast(new JsonDecoder());
    ch.pipeline().addLast(new JsonEncoder());
    ch.pipeline().addLast(new EchoClientHandler());
}

 

public class EchoClientHandler extends SimpleChannelInboundHandler<User> {

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.write(getUser());
  }

  private User getUser() {
    User user = new User();
    user.setAge(27);
    user.setName("zhangxufeng");
    return user;
  }

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
    System.out.println("receive message from server: " + user);
  }
}

這里客戶端首先會在連接上服務器時,往服務器發送一個User對象數據,然后在接收到服務器響應之后,會打印服務器響應的數據。  

3.4 自定義粘包與拆包器

  對於粘包與拆包問題,前面三種基本上已經能夠滿足大多數情形了,但是對於一些更加復雜的協議,可能有一些定制化的需求。對於這些場景,其實本質上,我們也不需要手動從頭開始寫一份粘包與拆包處理器,而是通過繼承LengthFieldBasedFrameDecoderLengthFieldPrepender來實現粘包和拆包的處理。如果用戶確實需要不通過繼承的方式實現自己的粘包和拆包處理器,這里可以通過實現MessageToByteEncoderByteToMessageDecoder來實現。這里MessageToByteEncoder的作用是將響應數據編碼為一個ByteBuf對象,而ByteToMessageDecoder則是將接收到的ByteBuf數據轉換為某個對象數據。通過實現這兩個抽象類,用戶就可以達到實現自定義粘包和拆包處理的目的。如下是這兩個類及其抽象方法的聲明:

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) 
        throws Exception;
} 
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) 
        throws Exception;
}

4. 小結

       本文首先對粘包和拆包的問題原理進行描述,幫助讀者理解粘包和拆包問題所在。然后對處理粘包和拆包的幾種常用解決方案進行講解。接着通過輔以示例的方式對Netty提供的幾種解決粘包和拆包問題的解決方案進行了詳細講解。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM