Netty學習筆記(三) 自定義編碼器


編寫一個網絡應用程序需要實現某種編解碼器,編解碼器的作用就是講原始字節數據與自定義的消息對象進行互轉。網絡中都是以字節碼的數據形式來傳輸數據的,服務器編碼數據后發送到客戶端,客戶端需要對數據進行解碼,因為編解碼器由兩部分組成:

  • Decoder(解碼器)
  • Encoder(編碼器)

解碼器負責處理“入站”數據,編碼器負責處理“出站”數據。編碼器和解碼器的結構很簡單,消息被編碼后解碼后會自動通過ReferenceCountUtil.release(message)釋放。

需要補充說明的是,Netty中有兩個方向的數據流

  • 入站(ChannelInboundHandler):從遠程主機到用戶應用程序則是“入站(inbound)”

  • 出站(ChannelOutboundHandler):從用戶應用程序到遠程主機則是“出站(outbound)”

今天我們主要學習編碼器,也就是Encoder

實現邏輯

完成一個編碼器的編寫主要是實現一個抽象類MessageToMessageEncoder,其中我們需要重寫方法是

    /**
     * Encode from one message to an other. This method will be called for each written message that can be handled
     * by this encoder.
     *
     * @param ctx           the {@link ChannelHandlerContext} which this {@link MessageToMessageEncoder} belongs to
     * @param msg           the message to encode to an other one
     * @param out           the {@link List} into which the encoded msg should be added
     *                      needs to do some kind of aggragation
     * @throws Exception    is thrown if an error accour
     */
    protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;

其中泛型參數I表示我們需要接收的參數類型,如你需要將ByteBuf類型轉換為Date類型,那么泛型I就是ByteBuf(事實上當實現ByteBuf編碼為其他類型的時候是不需要使用MessageToMessageEncoder,Netty提供了ByteToMessageCodec,其本質也是實現了MessageToMessageEncoder)

代碼編寫

需求說明

客戶端發過來一個數字(ByteBuf),我們將此類型轉換為數字,獲取當前時間加上此數字的時間后返回客戶端,具體邏輯如下:

編碼器的編寫

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.CharsetUtil;

import java.time.LocalDateTime;
import java.util.List;

public class TimeEncoder extends MessageToMessageDecoder<ByteBuf> {

  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
    //將ByteBuf轉換為String,注意此處,我是Mac OS,數據結尾是\r\n,如果是其他類型的OS,此處可能需要調整
    String dataStr = msg.toString(CharsetUtil.UTF_8).replace("\r\n","");
    //將String轉換為Integer
    Integer dataInteger = Integer.valueOf(dataStr);
    //獲取當前時間N小時后的數據
    LocalDateTime now = LocalDateTime.now();
    LocalDateTime dataLocalDatetime = now.plusHours(dataInteger);
    out.add(dataLocalDatetime);
  }
}

服務端處理代碼

此處的服務端HandleAdapter和前面兩個章節的HandleAdapter有所區別的是:其繼承了SimpleChannelInboundHandler<I> 並且傳遞了一個泛型參數,這里需要說明一下,SimpleChannelInboundHandler是ChannelInboundHandler一個子類,他能夠自動幫我們處理一些數據,在ChannelInboundHandler中,我們使用channel方法來接收數據,那么在SimpleChannelInboundHandler中我們使用protected abstract void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception;來接收客戶端的參數,可以看到的是,其參數中自動的實現了我們需要處理的泛型I msg,另外看一下SimpleChannelInboundHandler中channelRead方法的實現代碼

   @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
			//acceptInboundMessage() 檢查參數的類型是否和設定的泛型是否匹配
			//可以看到匹配的話,會進行強制類型轉換並調用messageReceived方法
			//否則的話,不執行,也就是說,這里的泛型一定要和編碼器轉換的結果類型一致,否則將接收不到參數
			//當前如果你需要自己轉換,那么你也可以和ChannelInBoundHandleAdapter一樣,重寫channelRead方法
			
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                messageReceived(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }

那么繼續實現我們的HandleAdapter,代碼非常簡單,這里不再贅述。需要注意的是,我們這里沒有做解碼器,也就是說入站的時候需要ByteBuf類型的數據,因此使用channel.writeAndFlush(Object)的時候,需要的就是ByteBuf類型的數據類型(當然如果pipeline中添加了StringDecoder解碼器,那么你就可以直接使用字符串類型的數據了)

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.Charset;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class TimeServerChannelHandleAdapter extends SimpleChannelInboundHandler<LocalDateTime> {

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("添加了新的連接信息:id = " + ctx.channel().id());
  }

  @Override
  protected void messageReceived(ChannelHandlerContext ctx, LocalDateTime msg) throws Exception {
    // 轉換時間格式
    DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    String dateFormat = msg.format(dateTimeFormatter);
    System.out.println("接收到參數:" + dateFormat);
    ctx.channel().writeAndFlush(Unpooled.copiedBuffer(dateFormat, Charset.defaultCharset()));
  }
}

服務端啟動代碼

服務前啟動代碼和以前的代碼非常類似,只需要在pipeline添加上適配的編碼器即可,當然需要注意順序(這個知識點以后我在仔細的闡述)

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TimeServer {

  public void start() throws Exception {
    EventLoopGroup boosGroup = new NioEventLoopGroup();
    EventLoopGroup workGroup = new NioEventLoopGroup();

    try {
      ServerBootstrap bootstrap = new ServerBootstrap();
      bootstrap
          .group(boosGroup, workGroup)
          .channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 128)
          .childHandler(
              new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                  ChannelPipeline pipeline = ch.pipeline();
                  //設置編碼
                  pipeline.addLast(new TimeEncoder());
                  //設置服務處理
                  pipeline.addLast(new TimeServerChannelHandleAdapter());
                }
              });

      ChannelFuture sync = bootstrap.bind(9998).sync();
      System.out.println("Netty Server start with 9998 port");
      sync.channel().closeFuture().sync();
    } finally {
      workGroup.shutdownGracefully();
      boosGroup.shutdownGracefully();
    }
  }

  public static void main(String[] args) throws Exception {
    TimeServer server = new TimeServer();
    server.start();
  }
}

效果展示

這里為了不寫太多的代碼,防止造成知識的不理解,迷惑,這里我們使用telnet命令來測試數據,

啟動服務器端

運行TimeServer代碼的中的main方法即可

使用Telnet發送數據

telnet的命令格式是

usage: telnet [-l user] [-a] [-s src_addr] host-name [port] 

可以看到大部分參數都是可選的,只有主機名稱必填

發送數據的效果

繼續在telnet中發送一個數據5,我們分別看下服務端的打印的數據和telnet接收到的數據

服務端打印的數據如下:

telnet端打印的接收到的數據

總結

至此,一個簡單的編碼器就完成,我們總結一下步驟

  • 繼承MessageToMessageDecoder抽象類,實現decode()方法
  • 配置HandleAdapter 實現channelRead或者messageReceived方法
  • 配置服務啟動類,配置ChannelPipeline,添加編碼器和HandleAdapter
  • 編寫客戶端或者使用telnet或者其他手段測試


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM