一、前言
前面已經學習完了Netty框架中的主要組件,接着學習codec框架。
二、codec框架
每個網絡應用程序必須定義如何將在對等體之間傳輸的原始字節解析並轉換為目標程序的數據格式,這種轉換邏輯有codec處理,其由編碼器和解碼器組成,每個編碼器和解碼器將字節流從一種格式轉換到另一種格式。若將消息視為具有特定意義的結構化字節序列,那么編碼器將該消息轉換成適合於傳輸的格式(很可能是字節流),反之,解碼器將網絡流轉換回應用程序的消息格式,然后,編碼器處理出站數據,解碼器處理入站數據。
2.1 解碼器
解碼器類涵蓋兩個不同的使用用例。
· 將字節解碼為消息 - ByteToMessageDecoder和ReplayingDecoder。
· 將一個消息類型解碼為另一個 - MessageToMessageDecoder。
解碼器負責將入站數據從一種格式轉換到另一種格式,所以Netty的解碼器實現了ChannelInboundHandler接口。當需要在ChannelPipeline中為下一個ChannelInboundHandler轉換入站數據時需要使用解碼器。由於Netty支持代碼模塊化和重用,因此可以鏈接多個解碼器來實現任意復雜的轉換邏輯。
1. ByteToMessageDecoder抽象類
從字節到消息(或另一個字節序列)的解碼是一個常見的任務,Netty使用ByteToMessageDecoder抽象類完成該任務,由於無法知道遠程對等體是否一次發送完整的消息,因此該類會緩沖入站數據,直到所有待處理的數據已經准備好。
假設你收到一個包含簡單int的字節流,每個int都要單獨處理。 此時將從入站ByteBuf讀取每個int,並將其傳遞給下一個ChannelInboundHandler。而為解碼字節流,需要擴展ByteToMessageDecoder(當int添加到List時,它將自動裝箱到Integer類型),整個過程如下圖所示。

一次從ByteBuf中讀取四個字節解析成一個int類型,並添加到List中,當讀取完成后,將會被傳遞至下個ChannelHandler中,下面是ToIntegerDecoder的源代碼。
public class ToIntegerDecoder extends ByteToMessageDecoder { @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() >= 4) { out.add(in.readInt()); } } }
2. ReplayingDecoder抽象類
ReplayingDecoder繼承ByteToMessageDecoder類,並且不再需要調用readableBytes()方法,其通過自定義的ReplayingDecoderBuffer來實現該功能,其代碼如下所示。
public class ToIntegerDecoder2 extends ReplayingDecoder<Void> { @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { out.add(in.readInt()); } }
其中Void表示不執行任何操作,即不管理任何狀態類型。
3. MessageToMessageDecoder抽象類
可以使用MessageToMessageDecoder在消息格式之間進行轉換,僅需要實現其decode方法。如IntegerToStringDecoder繼承MessageToMessageDecoder,其將Integer類型轉化為對應的String表示,轉化圖如下圖所示。

下面是IntegerToStringDecoder的代碼。
public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> { @Override public void decode(ChannelHandlerContext ctx, Integer msg List<Object> out) throws Exception { out.add(String.valueOf(msg)); } }
4. TooLongFrameException類
由於Netty是一個異步框架,所以需要緩沖內存中的字節,直到能夠對其進行解碼。不能讓解碼器緩沖太多的數據以致耗盡可用內存,為解決該問題,Netty提供了TooLongFrameException類異常,如果超過指定的大小限制,則由解碼器拋出該異常。為了避免這種情況,可以設置最大字節數的閾值,如果超出,將拋出TooLongFrameException異常,然后由解碼器的用戶決定如何處理異常。某些協議(例如HTTP)可能允許返回特殊響應,而在其他情況下,可能只能關閉連接。
下面代碼展示了ByteToMessageDecoder是如何使用TooLongFrameException來通知ChannelPipeline中的其他ChannelHandler關於幀大小超出運行,特別是當使用具有可變框架大小的協議時,此類保護會尤為重要。
public class SafeByteToMessageDecoder extends ByteToMessageDecoder { private static final int MAX_FRAME_SIZE = 1024; @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int readable = in.readableBytes(); if (readable > MAX_FRAME_SIZE) { in.skipBytes(readable); throw new TooLongFrameException("Frame too big!"); } // do something ... } }
2.2 編碼器
編碼器實現了ChannelOutboundHandler,並將出站數據從一種格式轉換為另一種格式。Netty提供可幫助編寫具有以下功能的編碼器的類:
· 將消息編碼為字節。
· 將消息編碼為消息。
1. MessageToByteEncoder抽象類
與ByteToMessageDecoder作用相反,MessageToByteEncoder將消息編碼為字節,其只有encode方法,下圖顯示了ShortToByteEncoder將Shor類型編碼為字節類型,並寫入ByteBuf,然后轉發給管道中的下一個ChannelOutboundHandler。

ShortToByteEncoder的代碼如下所示。
public class ShortToByteEncoder extends MessageToByteEncoder<Short> { @Override public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception { out.writeShort(msg); } }
2. MessageToMessageEncoder抽象類
MessageToMessageEncoder可將一個類型編碼為另一個類型,其只有一個encode方法,下圖展示了IntegerToStringEncoder如何將Integer類型轉化為String類型。

IntegerToStringEncoder的代碼如下所示。
public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> { @Override public void encode(ChannelHandlerContext ctx, Integer msg List<Object> out) throws Exception { out.add(String.valueOf(msg)); } }
2.3 codec抽象類
上面單獨討論了解碼器和編碼器,對於出站數據使用編碼器,入站數據使用解碼器,但可否對於兩者只使用一個類進行處理。Netty的codec類可以滿足此需求,每個codec類綁定了編碼器和解碼器,用來處理不同類型的操作,這些類實現了ChannelInboundHandler和ChannelOutboundHandler。
1. ByteToMessageCodec抽象類
假如首先需要將字節類型解碼成消息,然后在編碼成另一種類型,ByteToMessageCodec可以用來處理這種情況,其結合了ByteToMessageDecoder和MessageToByteEncoder。其包含兩個類的共三個方法,decode、decodeLast、encode。任何請求/響應協議都可以使用ByteToMessageCodec。
2. MessageToMessageCodec抽象類
MessageToMessageCodec在消息之間進行編碼和解碼操作,其簽名為MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>,其包含decode和encode兩個方法。decode方法將INBOUND_IN類型轉化為OUTBOUND_IN,而encode則相反。可將INBOUND_IN作為通過線路發送的消息類型,OUTBOUND_IN作為應用程序處理的消息類型。如下代碼展示了具體的使用。
public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> { @Override protected void encode(ChannelHandlerContext ctx, WebSocketConvertHandler.MyWebSocketFrame msg, List<Object> out) throws Exception { ByteBuf payload = msg.getData().duplicate().retain(); switch (msg.getType()) { case BINARY: out.add(new BinaryWebSocketFrame(payload)); break; case TEXT: out.add(new TextWebSocketFrame(payload)); break; case CLOSE: out.add(new CloseWebSocketFrame(true, 0, payload)); break; case CONTINUATION: out.add(new ContinuationWebSocketFrame(payload)); break; case PONG: out.add(new PongWebSocketFrame(payload)); break; case PING: out.add(new PingWebSocketFrame(payload)); break; default: throw new IllegalStateException( "Unsupported websocket msg " + msg); } } @Override protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception { ByteBuf payload = msg.getData().duplicate().retain(); if (msg instanceof BinaryWebSocketFrame) { out.add(new MyWebSocketFrame( MyWebSocketFrame.FrameType.BINARY, payload)); } else if (msg instanceof CloseWebSocketFrame) { out.add(new MyWebSocketFrame ( MyWebSocketFrame.FrameType.CLOSE, payload)); } else if (msg instanceof PingWebSocketFrame) { out.add(new MyWebSocketFrame ( MyWebSocketFrame.FrameType.PING, payload)); } else if (msg instanceof PongWebSocketFrame) { out.add(new MyWebSocketFrame ( MyWebSocketFrame.FrameType.PONG, payload)); } else if (msg instanceof TextWebSocketFrame) { out.add(new MyWebSocketFrame ( MyWebSocketFrame.FrameType.TEXT, payload)); } else if (msg instanceof ContinuationWebSocketFrame) { out.add(new MyWebSocketFrame ( MyWebSocketFrame.FrameType.CONTINUATION, payload)); } else { throw new IllegalStateException( "Unsupported websocket msg " + msg); } } public static final class MyWebSocketFrame { public enum FrameType { BINARY, CLOSE, PING, PONG, TEXT, CONTINUATION } private final FrameType type; private final ByteBuf data; public WebSocketFrame(FrameType type, ByteBuf data) { this.type = type; this.data = data; } public FrameType getType() { return type; } public ByteBuf getData() { return data; } } }
3. CombinedChannelDuplexHandler類
組合解碼器和編碼器可能對可重用性有影響,可以使用CombinedChannelDuplexHandler可通過分別擴展解碼器類和編碼器類的類型來實現編解碼器,而不必直接擴展抽象編解碼器類。
如下代碼中,ByteToCharDecoder將從ByteBuf中讀取字符。
public class ByteToCharDecoder extends ByteToMessageDecoder { @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { while (in.readableBytes() >= 2) { out.add(in.readChar()); } } }
一次性從ByteBuf中讀取兩個字節(char由兩個字節組成),然后裝箱后添加至List中。
如下代碼中,CharToByteEncoder則將Char轉化為字節類型並寫入ByteBuf中。
public class CharToByteEncoder extends MessageToByteEncoder<Character> { @Override public void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception { out.writeChar(msg); } }
我們可以直接使用codec來實現上述代碼示例,假設已經有了ByteToCharDecoder和CharToByteEncoder,其代碼如下所示。
public class CombinedByteCharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> { public CombinedByteCharCodec() { super(new ByteToCharDecoder(), new CharToByteEncoder()); } }
可以看到代碼非常簡潔便完成了編碼和解碼操作。
三、總結
本篇學習了Netty中的編碼和解碼操作及其相關的類型,編解碼器是進行數據處理的基礎。也謝謝各位園友的觀看~
