一、概念
網絡傳輸的單位是字節,如何將應用程序的數據轉換為字節,以及將字節轉換為應用程序的數據,就要說到到我們該篇介紹的編碼器和解碼器。
將應用程序的數據轉換為網絡格式,以及將網絡格式轉換為應用程序的數據的組件分別叫作編碼器和解碼器,同時具有這兩種功能的單一組件叫作編解碼器。 Netty 提供了一系列用來創建所有這些編碼器、解碼器以及編解碼器的工具,還可以按需定制通用的消息轉換編解碼器。
Netty 的編(解)碼器實現了 ChannelHandlerAdapter,也是一種特殊的 ChannelHandler,所以依賴於 ChannelPipeline,可以將多個編(解)碼器鏈接在一起,以實現復雜的轉換邏輯。
對於編碼器和解碼器來說,其過程也是相當的簡單:一旦消息被編碼或者解碼,它就會被 ReferenceCountUtil.release(message)調用自動釋放。如果你需要保留引用以便稍后使用,那么你可以調用 ReferenceCountUtil.retain(message)方法。這將會增加該引用計數,從而防止該消息被釋放。
二、編碼器
編碼器將應用程序的數據轉換為網絡格式,出站消息時被調用,主要有兩類:
1、將消息編碼為字節:MessageToByteEncoder
public abstract class MessageToByteEncoder<I> extends ChannelHandlerAdapter{}
public class ShortToByteEncoder extends MessageToByteEncoder<Short> { /** * 1、類型為 I 的出站消息被編碼為 ByteBuf * 2、該 ByteBuf 隨后將會被轉發給 ChannelPipeline中的下一個 ChannelOutboundHandler。 */ @Override protected void encode(ChannelHandlerContext channelHandlerContext, Short aShort, ByteBuf byteBuf) throws Exception { byteBuf.writeShort(aShort); } }
2、將消息編碼為消息:MessageToMessageEncoder
public abstract class MessageToMessageEncoder<I> extends ChannelHandlerAdapter{}
public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> { /** * 1、類型為 I 的出站消息被編碼為目標類型 存入List 中 * 2、該 List 隨后將會被轉發給 ChannelPipeline中的下一個 ChannelOutboundHandler。 */ @Override protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception { out.add(String.valueOf(msg)); } }
三、解碼器
解碼器將網絡格式轉換為應用程序的數據,入站消息時被調用。
解碼器比編碼器多了一個 decodeLast 方法,原因是解碼器通常需要在 Channel 關閉之后產生最后一個消息。這顯然不適用於編碼器的場景 —— 在連接被關閉之后仍然產生一個消息是毫無意義的。所以,當 Channel 的狀態變為非活動時,這個方法將會被調用一次。可以重寫該方法以提供特殊的處理。
解碼器主要有兩類:
1、將字節解碼為消息:ByteToMessageDecoder 和 ReplayingDecoder
public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {}
public class MyDecoder extends ByteToMessageDecoder { private static final int MAX_FRAME_SIZE = 1024; /** * 1、該方法被調用時,將會傳入一個包含了傳入數據的 ByteBuf,以及一個用來添加解碼消息的 List. * 2、對該方法的調用將會重復進行,直到確定沒有新的元素被添加到該 List,或者Butebuf 沒有更多可讀取的字節為止。 * 3、List 的內容將會被傳遞給 ChannelPipeline 中的下一個 ChannelInboundHandler。 */ @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { int readableBytes = byteBuf.readableBytes(); //不能讓解碼器緩沖大量的數據以致於耗盡可用的內存 if (readableBytes > MAX_FRAME_SIZE){ //跳過所有的可讀字節 byteBuf.skipBytes(readableBytes); throw new TooLongFrameException("數據超過可緩存字節..."); } //假設需要解析 int 類型的消息(int 4個字節) if (readableBytes > 4){ list.add(byteBuf.readInt()); } } }
----分割線----
//類型參數 S 指定了用於狀態管理的類型,其中 Void 代表不需要狀態管理。 public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder{}
public class MyReplayingDecoder extends ReplayingDecoder<Void> { /** * 1、ReplayingDecoder 擴展了 ByteToMessageDecoder,並且自定義了 ByteBuf 的實現 ReplayingDecoderByteBuf。 * 2、ReplayingDecoderByteBuf 對要轉換的消息的字節數進行內部管理,如果沒有足夠的字節使用,將會拋出一個 Signal,由ReplayingDecoder進行處理。 * * @param byteBuf 傳入的 ByteBuf 實際上是 ReplayingDecoderByteBuf*/ @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { list.add(byteBuf.readInt()); } }
繼承 ByteToMessageDecoder 和繼承 ReplayingDecoder 有什么區別呢?ReplayingDecoder 唯一的好處就是解碼的時候不用進行字節數的判斷,如上 ,因為它交由自定義的 ReplayingDecoderByteBuf 去處理了。但是 ReplayingDecoder 的效率稍慢於ByteToMessageDecoder。一般我們在這兩個解碼器中進行抉擇的准則是:如果使用 ByteToMessageDecoder 不會引入太多的復雜性,那么請使用它;否則,請使用 ReplayingDecoder!
2、將消息解碼為消息:MessageToMessageDecoder
public abstract class MessageToMessageEncoder<I> extends ChannelHandlerAdapter{}
public class IntegerToStringDecoder extends MessageToMessageEncoder<Integer> { /** * 1、對於每個需要被解碼為另一種格式的入站消息來說,該方法將會被調用。 * 2、解碼消息隨后會被傳遞給 ChannelPipeline 中的下一個 ChannelInboundHandler */ @Override protected void encode(ChannelHandlerContext channelHandlerContext, Integer integer, List<Object> list) throws Exception { list.add(String.valueOf(integer)); } }
四、編解碼器
Netty 的抽象編解碼器類捆綁一個解碼器/編碼器對,主要用於在同一個類中管理入站和出站數據和消息的轉換。
個人覺得這個編解碼器略顯雞肋呀,還是喜歡將編碼器和解碼器分開來寫。因為 Netty 設計的一個基本准則就是:盡可能地將兩種功能(編碼器、解碼器)分開,最大化代碼的可重用性和可擴展性。
編解碼器也主要有兩類:
1、字節消息編解碼器:ByteToMessageCodec
public abstract class ByteToMessageCodec<I> extends ChannelHandlerAdapter {}
public class MyBytetoMessageCodec extends ByteToMessageCodec<Short> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { while (in.readableBytes() > 4){ out.add(in.readInt()); } } @Override protected void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception { out.writeShort(msg); } }
2、消息轉換編解碼器:MessageToMessageCodec
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelHandlerAdapter {}

public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> { /** * 1、對於每個 OUTBOUND_IN 類型的消息,這個方法都會被調用。 * 2、這個消息將會被編碼為 INBOUND_IN 類型的消息。 * 3、然后被轉發給 ChannelPipeline 中的下一個 ChannelOutboundHandler */ @Override protected void encode(ChannelHandlerContext ctx, MyWebSocketFrame msg, List<Object> out) throws Exception { ByteBuf byteBuf = msg.getByteBuf().duplicate().retain(); switch (msg.getFrameType()) { case BINARY: out.add(new BinaryWebSocketFrame(byteBuf)); break; case TEXT: out.add(new TextWebSocketFrame(byteBuf)); break; case CLOSE: out.add(new CloseWebSocketFrame(true, 0, byteBuf)); break; case CONTINUATION: out.add(new ContinuationWebSocketFrame(byteBuf)); break; case PONG: out.add(new PongWebSocketFrame(byteBuf)); break; case PING: out.add(new PingWebSocketFrame(byteBuf)); default: break; } } /** * 1、傳入 INBOUND_IN 類型的消息,該方法會被調用。 * 2、這個消息會被解碼為 OUTBOUND_IN 類型的消息。 * 3、然后被轉發給 ChannelPipeline 中的下一個 ChannelInboundHandler */ @Override protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception { ByteBuf byteBuf = msg.content().duplicate().retain(); if (msg instanceof BinaryWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, byteBuf)); } else if (msg instanceof CloseWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, byteBuf)); } else if (msg instanceof TextWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, byteBuf)); } else if (msg instanceof PingWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, byteBuf)); } else if (msg instanceof PongWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, byteBuf)); } else if (msg instanceof ContinuationWebSocketFrame) { out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, byteBuf)); } else { throw new IllegalStateException("Unsupported websocket msg " + msg); } } public static final class MyWebSocketFrame { public enum FrameType { BINARY, CLOSE, PING, PONG, TEXT, CONTINUATION } private final FrameType frameType; private final ByteBuf byteBuf; public MyWebSocketFrame(FrameType frameType, ByteBuf byteBuf) { this.frameType = frameType; this.byteBuf = byteBuf; } public FrameType getFrameType() { return frameType; } public ByteBuf getByteBuf() { return byteBuf; } } }
參考資料:《Netty IN ACTION》