Netty 框架學習 —— 編解碼器框架



編解碼器

每個網絡應用程序都必須定義如何解析在兩個節點之間來回傳輸的原始字節,以及如何將其和目標應用程序的數據格式做相互轉換。這種轉換邏輯由編解碼器處理,編解碼器由編碼器和解碼器組成,它們每種都可以將字節流從一種格式轉換為另一種格式

  • 編碼器將消息轉換為適合於傳輸的格式(最有可能的就是字節流)
  • 解碼器則是將 網絡字節流轉換回應用程序的消息格式

因此,編碼器操作出站數據,而解碼器處理入站數據

1. 解碼器

在這一節,我們將研究 Netty 所提供的解碼器類,並提供關於何時以及如何使用它們的具體示例,這些類覆蓋了兩個不同的用例:

  • 將字節解碼為消息 —— ByteToMessageDecoder 和 ReplayingDecoder
  • 將一種消息類型解碼為另一種 —— MessageToMessageDecoder

什么時候會用到解碼器呢?很簡單,每當需要為 ChannelPipeline 中的下一個 ChannelInboundHandler 轉換入站數據時會用到。此外,得益於 ChannelPipeline 的設計,可以將多個解碼器鏈接在一起,以實現任意復雜的轉換邏輯

1.1 抽象類 ByteToMessageDecoder

將字節解碼為消息是一項常見的任務,Netty 它提供了一個 抽象基類 ByteToMessageDecoder,這個類會對入站數據進行緩沖,直到它准備好處理

下面舉一個如何使用這個類的示例,假設你接收了一個包含簡單 int 的字節流,每個 int 都需要被單獨處理。在這種情況下,你需要從入站 ByteBuf 中讀取每個 int,並將它傳遞給 ChannelPipeline 中的下一個 ChannelInboundHandler。為了解碼這個字節流,你要擴展 ByteToMessageDecoder 類(需要注意的是,原子類型的 int 在被添加到 List 中時,會被自動裝箱為 Integer)

// 擴展 ByteToMessageDecoder,以將字節解碼為特定的格式
public class ToIntegerDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        //檢查是否至少有 4 字節可讀(1 個int的字節長度)
        if (in.readableBytes() >= 4) {
            //從入站 ByteBuf 中讀取一個 int,並將其添加到解碼消息的 List 中
            out.add(in.readInt());
        }
    }
}

雖然 ByteToMessageDecoder 使得可以很簡單地實現這種模式,但是你可能會發現,在調用 readInt()方法前不得不驗證所輸入的 ByteBuf 是否具有足夠的數據有點繁瑣。下面說的 ReplayingDecoder,它是一個特殊的解碼器,以少量的開銷消除了這個步驟

1.2 抽象類 ReplayingDecoder

ReplayingDecoder 擴展了 ByteToMessageDecoder 類,使得我們不必調用 readableBytes() 方法。它通過使用一個自定義的 ByteBuf 實現,ReplayingDecoderByteBuf,包裝傳入的 ByteBuf 實現了這一點,其將在內部執行該調用

這個類的完整聲明是:

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder

類型參數 S 指定了用於狀態管理的類型,其中 Void 代表不需要狀態管理。下述代碼展示了基於 ReplayingDecoder 重新實現的 ToIntegerDecoder

// 擴展ReplayingDecoder<Void> 以將字節解碼為消息
public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {
    // 傳入的 ByteBuf 是 ReplayingDecoderByteBuf
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 從入站 ByteBuf 中讀取一個 int,並將其添加到解碼消息的 List 中
        out.add(in.readInt());
    }
}

和之前一樣,從 ByteBuf 中提取的int將會被添加到List中。如果沒有足夠的字節可用,這 個 readInt() 方法的實現將會拋出一個 Error,其將在基類中被捕獲並處理。當有更多的數據可供讀取時,該 decode() 方法將會被再次調用

請注意 ReplayingDecoderByteBuf 的下面這些方面:

  • 並不是所有的 ByteBuf 操作都被支持,如果調用了一個不被支持的方法,將會拋出一個 UnsupportedOperationException
  • ReplayingDecoder 稍慢於 ByteToMessageDecoder

下面這些類用於處理更加復雜的用例:

  • io.netty.handler.codec.LineBasedFrameDecoder —— 這個類在 Netty 內部也有使用,它使用了行尾控制字符(\n 或者 \r\n)來解析消息數據
  • io.netty.handler.codec.http.HttpObjectDecoder —— HTTP 數據解碼器
1.3 抽象類 MessageToMessageDecoder

在這一節,我們將解釋如何在兩個消息格式之間進行轉換,例如,從一種 POJO 類型轉換為另一種

public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter

參數類型 I 指定了 decode() 方法的輸入參數 msg 的類型,它是你必須實現的唯一方法

我們將編寫一個 IntegerToStringDecoder 解碼器來擴展 MessageToMessageDecoder,它的 decode() 方法會把 Integer 參數轉換為 String 表示。和之前一樣,解碼的 String 將被添加到傳出的 List 中,並轉發給下一個 ChannelInboundHandler

public class IntegerToStringDecoder extends MessageToMessageEncoder<Integer> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        //將 Integer 消息轉換為它的 String 表示,並將其添加到輸出的 List 中
        out.add(String.valueOf(msg));
    }
}
1.4 TooLongFrameException

由於 Netty 是一個異步框架,所以需要在字節可以解碼之前在內存中緩沖它們。因此,不能讓解碼器緩沖大量的數據以至於耗盡可用的內存。為了解除這個常見的顧慮,Netty 提供了 TooLongFrameException 類,其將由解碼器在幀超出指定的大小限制時拋出

為了避免這種情況,你可以設置一個最大字節數的閾值,如果超出該閾值,則會導致拋出一個 TooLongFrameException(隨后會被 ChannelHandler.exceptionCaught() 方法捕獲)。然后,如何處理該異常則完全取決於該解碼器的用戶。某些協議(如 HTTP)可能允許你返回一個特殊的響應。而在其他的情況下,唯一的選擇可能就是關閉對應的連接

下面的示例使用 TooLongFrameException 來通知 ChannelPipeline 中的其他 ChannelHandler 發生了幀大小溢出的。需要注意的是,如果你正在使用一個可變幀大小的協議,那么這種保護措施將是尤為重要的

public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
    public static final int MAX_FRAME_SIZE = 1024;
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int readable = in.readableBytes();
        // 檢查緩沖區是否有超過 MAX_FRAME_SIZE 個字節
        if (readable > MAX_FRAME_SIZE) {
            // 跳過所有的可讀字節,拋出 TooLongFrameException 並通知 ChannelHandler
            in.skipBytes(readable);
            throw new TooLongFrameException("Frame too big!");
        }
        //do something
    }
}

2. 編碼器

編碼器實現了 ChannelOutboundHandler,並將出站數據從一種格式轉換為另一種格式,和我們方才學習的解碼器的功能正好相反。Netty 提供了一組類,用於幫助你編寫具有以下功能的編碼器:

  • 將消息編碼為字節
  • 將消息編碼為消息
2.1 抽象類 MessageToByteEncoder

前面我們看到了如何使用 ByteToMessageDecoder 來將字節轉換為消息,現在我們使用 MessageToByteEncoder 來做逆向的事情

這個類只有一個方法,而解碼器有兩個。原因是解碼器通常需要在 Channel 關閉之后產生最后一個消息(因此也就有了 decodeLast() 方法。顯然這不適用於編碼器的場景 —— 在連接被關閉之后仍然產生一個消息是毫無意義的

下述代碼展示了 ShortToByteEncoder,其接受一個 Short 類型的實例作為消息,將它編碼為Short的原子類型值,並將它寫入 ByteBuf 中,其將隨后被轉發給 ChannelPipeline 中的 下一個 ChannelOutboundHandler。每個傳出的 Short 值都將會占用 ByteBuf 中的 2 字節。

public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
        // 將 Short 寫入 ByteBuf
        out.writeShort(msg);
    }
}
2.2 抽象類 MessageToMessageEncoder

MessageToMessageEncoder 類的 encode() 方法提供了將入站數據從一個消息格式解碼為另一種

下述代碼使用 IntegerToStringEncoder 擴展了 MessageToMessageEncoder,編碼器將每個出站 Integer 的 String 表示添加到了該 List 中

public class IntegerToStringEncoder extends MessageToMessageEncoder {
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
        out.add(String.valueOf(msg));
    }
}

抽象的編解碼器類

雖然我們一直將解碼器和編碼器作為單獨的實體討論,但是你有時將會發現在同一個類中管理入站和出站數據和消息的轉換是很有用的。Netty 的抽象編解碼器類正好用於這個目的,因為它們每個都將捆綁一個解碼器/編碼器對,以處理我們一直在學習的這兩種類型的操作。正如同你可能已經猜想到的,這些類同時實現了 ChannelInboundHandler 和 ChannelOutboundHandler 接口

為什么我們並沒有一直優先於單獨的解碼器和編碼器使用這些復合類呢?因為通過盡可能地將這兩種功能分開,最大化了代碼的可重用性和可擴展性,這是 Netty 設計的一個基本原則

1. 抽象類 ByteToMessageCodec

讓我們來研究這樣的一個場景:我們需要將字節解碼為某種形式的消息,可能是 POJO,隨后再次對它進行編碼。ByteToMessageCodec 將為我們處理好這一切,因為它結合了 ByteToMessageDecoder 以及它的逆向 —— MessageToByteEncoder

任何的請求/響應協議都可以作為使用 ByteToMessageCodec 的理想選擇。例如,在某個 SMTP 的實現中,編解碼器將讀取傳入字節,並將它們解碼為一個自定義的消息類型,如 SmtpRequest。而在接收端,當一個響應被創建時,將會產生一個 SmtpResponse,其將被編碼回字節以便進行傳輸

2. 抽象類 MessageToMessageCodec

通過使用 MessageToMessageCodec,我們可以在一個單個的類中實現該轉換的往返過程。MessageToMessageCodec 是一個參數化的類,定義如下:

public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>

decode() 方法是將 INBOUND_IN 類型的消息轉換為 OUTBOUND_IN 類型的消息,而 encode() 方法則進行它的逆向操作。將 INBOUND_IN 類型的消息看作是通過網絡發送的類型, 而將 OUTBOUND_IN 類型的消息看作是應用程序所處理的類型,將可能有所裨益

WebSocket 協議

下面關於 MessageToMessageCodec 的示例引用了一個新出的 WebSocket 協議,這個協議能實現 Web 瀏覽器和服務器之間的全雙向通信

我們的 WebSocketConvertHandler 在參數化 MessageToMessageCodec 時將使用 INBOUND_IN 類型的 WebSocketFrame,以及 OUTBOUND_IN 類型的 MyWebSocketFrame,后者是 WebSocketConvertHandler 本身的一個靜態嵌套類

public class WebSocketConvertHandler 
        extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {


    @Override
    protected void encode(ChannelHandlerContext ctx, MyWebSocketFrame msg, List<Object> out) throws Exception {
        // 實例化一個指定子類型的 WebSocketFrame
        ByteBuf payload = msg.getData().duplicate().retain();
        switch (msg.getType()) {
            case BINARY:
                out.add(new BinaryWebSocketFrame(payload));
                break;
            case TEXT:
                out.add(new TextWebSocketFrame(payload));
                break;
            case CLOSE:
                out.add(new CloseWebSocketFrame(true, 0, payload));
                break;
            case CONTINUATION:
                out.add(new ContinuationWebSocketFrame(payload));
                break;
            case PONG:
                out.add(new PongWebSocketFrame(payload));
                break;
            case PING:
                out.add(new PingWebSocketFrame(payload));
                break;
            default:
                throw new IllegalStateException("Unsupported websocket msg " + msg);
        }
    }

    // 將 WebSocketFrame 解碼為 MyWebSocketFrame,並設置 FrameType
    @Override
    protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
        ByteBuf paload = msg.content().duplicate().retain();
        if (msg instanceof  BinaryWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, paload));
        } else
        if (msg instanceof  CloseWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, paload));
        } else
        if (msg instanceof  PingWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, paload));
        } else
        if (msg instanceof  PongWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, paload));
        } else
        if (msg instanceof  TextWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, paload));
        } else
        if (msg instanceof  ContinuationWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, paload));
        } else {
            throw new IllegalStateException("Unsupported websocket msg " + msg);
        }
    }

    public static final class MyWebSocketFrame {
        public enum FrameType {
            BINARY,
            CLOSE,
            PING,
            PONG,
            TEXT,
            CONTINUATION
        }
        private final FrameType type;
        private final ByteBuf data;

        public MyWebSocketFrame(FrameType type, ByteBuf data) {
            this.type = type;
            this.data = data;
        }

        public FrameType getType() {
            return type;
        }

        public ByteBuf getData() {
            return data;
        }
    }
}

3. CombinedChannelDuplexHandler 類

正如我們前面所提到的,結合一個解碼器和編碼器可能會對可重用性造成影響。但是,有一 種方法既能夠避免這種懲罰,又不會犧牲將一個解碼器和一個編碼器作為一個單獨的單元部署所 帶來的便利性。CombinedChannelDuplexHandler 提供了這個解決方案,其聲明為:

public class CombinedChannelDuplexHandler
	<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>

這個類充當了 ChannelInboundHandler 和 ChannelOutboundHandler(該類的類型參數 I 和 O)的容器。通過提供分別繼承了解碼器類和編碼器類的類型,我們可以實現一個編解碼器,而又不必直接擴展抽象的編解碼器類

首先,讓我們研究下述代碼,該實現擴展了 ByteToMessageDecoder,因為它要從 ByteBuf 讀取字符

public class ByteToCharDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        while (in.readableBytes() >= 2) {
            out.add(in.readChar());
        }
    }
}

這里的 decode() 方法一次將從 ByteBuf 中提取 2 字節,並將它們作為 char 寫入到 List 中,其將會被自動裝箱為 Character 對象

下述代碼將 Character 轉換回字節。這個類擴展了 MessageToByteEncoder,因為它需要將 char 消息編碼到 ByteBuf 中。這是通過直接寫入 ByteBuf 做到的

public class CharToByteEncoder extends MessageToByteEncoder<Character> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception {
        out.writeChar(msg);
    }
}

既然我們有了解碼器和編碼器,我們可以結合它們來構建一個編解碼器

// 通過該解碼器和編碼器實現參數化CombinedByteCharCodec
public class CombinedChannelDuplexHandler extends
        io.netty.channel.CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
    public CombinedChannelDuplexHandler() {
        // 將委托實例傳遞給父類
        super(new ByteToCharDecoder(), new CharToByteEncoder());
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM