ChannelHandler


ChannelHandler功能介紹

ChannelHandler類似於Servlet的Filter過濾器,負責對I/O事件或者I/O操作進行攔截和處理,它可以選擇性地攔截和處理自己感興趣的事件,也可以透傳和終止事件的傳遞。基於ChannelHandler接口,用戶可以方便地進行業務邏輯定制,例如打印日志、統一封裝異常信息、性能統計和消息編解碼等。

ChannelHandler支持注解,目前支持的注解有兩種。

  1. Sharable:多個ChannelPipeline共用同一個ChannelHandler;
  2. Skip:被Skip注解的方法不會被調用,直接被忽略。

ChannelHandlerAdapter功能說明

對於大多數的ChannelHandler會選擇性地攔截和處理某個或者某些事件,其他的事件會忽略,由下一個ChannelHandler進行攔截和處理。這就會導致一個問題:用戶ChannelHandler必須要實現ChannelHandler的所有接口,包括它不關心的那些事件處理接口,這會導致用戶代碼的冗余和臃腫,代碼的可維護性也會變差。

為了解決這個問題,Netty提供了ChannelHandlerAdapter基類,它的所有接口實現都是事件透傳,如果用戶ChannelHandler關心某個事件,只需要覆蓋ChannelHandlerAdapter對應的方法即可,對於不關心的,可以直接繼承使用父類的方法,這樣子類的代碼就會非常簡潔和清晰。

    @Skip
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }

    @Skip
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

    @Skip
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

這些透傳方法被@Skip注解了,這些方法在執行的過程中會被忽略,直接跳到下一個ChannelHandler中執行對應的方法。

ByteToMessageDecoder功能說明

利用NIO進行網絡編程時,往往需要將讀取到的字節數組或者字節緩沖區解碼為業務可以使用的POJO對象。為了方便業務將ByteBuf解碼成業務POJO對象,Netty提供了ByteToMessageDecoder抽象工具解碼類。

用戶的解碼器繼承ByteToMessageDecoder,只需要實現void decode(ChannelHandler Context ctx, ByteBuf in, List<Object> out)抽象方法即可完成ByteBuf到POJO對象的解碼。

由於ByteToMessageDecoder並沒有考慮TCP粘包和組包等場景,讀半包需要用戶解碼器自己負責處理。正因為如此,對於大多數場景不會直接繼承ByteToMessageDecoder,而是繼承另外一些更高級的解碼器來屏蔽半包的處理。

MessageToMessageDecoder功能說明

MessageToMessageDecoder實際上是Netty的二次解碼器,它的職責是將一個對象二次解碼為其他對象。

為什么稱它為二次解碼器呢?從SocketChannel讀取到的TCP數據報是ByteBuffer,實際就是字節數組,我們首先需要將ByteBuffer緩沖區中的數據報讀取出來,並將其解碼為Java對象;然后對Java對象根據某些規則做二次解碼,將其解碼為另一個POJO對象。因為MessageToMessageDecoder在ByteToMessageDecoder之后,所以稱之為二次解碼器。

二次解碼器在實際的商業項目中非常有用,以HTTP+XML協議棧為例,第一次解碼往往是將字節數組解碼成HttpRequest對象然后對HttpRequest消息中的消息體字符串進行二次解碼,將XML格式的字符串解碼為POJO對象,這就用到了二次解碼器。類似這樣的場景還有很多,不再一一枚舉。

事實上,做一個超級復雜的解碼器將多個解碼器組合成一個大而全的MessageToMessageDecoder解碼器似乎也能解決多次解碼的問題,但是采用這種方式的代碼可維護性會非常差。例如,如果我們打算在HTTP+XML協議棧中增加一個打印碼流的功能,即首次解碼獲取HttpRequest對象之后打印XML格式的碼流。如果采用多個解碼器組合,在中間插入一個打印消息體的Handler即可,不需要修改原有的代碼;如果做一個大而全的解碼器,就需要在解碼的方法中增加打印碼流的代碼,可擴展性和可維護性都會變差。用戶的解碼器只需要實現void decode(ChannelHandlerContext ctx, I msg, List<Object> out)抽象方法即可,由於它是將一個POJO解碼為另一個POJO,所以一般不會涉及到半包的處理,相對於ByteToMessageDecoder更加簡單些。

LengthFieldBasedFrameDecoder功能說明

如何區分一個整包消息,通常有如下4種做法。

  1. 固定長度,例如每120個字節代表一個整包消息,不足的前面補零。解碼器在處理這類定常消息的時候比較簡單,每次讀到指定長度的字節后再進行解碼。DelimiterBased FrameDecoder
  2. 通過回車換行符區分消息,例如FTP協議。這類區分消息的方式多用於文本協議。LineBasedFrameDecoder
  3. 通過分隔符區分整包消息。 DelimiterBased FrameDecoder
  4. 通過指定長度來標識整包消息。LengthFieldBasedFrameDecoder

如果消息是通過長度進行區分的,LengthFieldBasedFrameDecoder都可以自動處理粘包和半包問題,只需要傳入正確的參數,即可輕松搞定“讀半包”問題。

下面我們看看如何通過參數組合的不同來實現不同的“半包”讀取策略。

第一種常用的方式是消息的第一個字段是長度字段,后面是消息體,消息頭中只包含一個長度字段。它的消息結構定義如圖17-16所示。

使用以下參數組合進行解碼。

lengthFieldOffset = 0;

lengthFieldLength = 2;

lengthAdjustment = 0;

initialBytesToStrip = 0。

解碼后的字節緩沖區內容:

 

因為通過ByteBuf.readableBytes()方法我們可以獲取當前消息的長度,所以解碼后的字節緩沖區可以不攜帶長度字段,由於長度字段在起始位置並且長度為2,所以將initialBytesToStrip設置為2,參數組合修改為:

lengthFieldOffset = 0;

lengthFieldLength = 2;

lengthAdjustment = 0;

initialBytesToStrip = 2。

解碼后的字節緩沖區內容如圖:

從圖17-18的解碼結果看,解碼后的字節緩沖區丟棄了長度字段,僅僅包含消息體,不過通過ByteBuf.readableBytes()方法仍然能夠獲取到長度字段的值。

在大多數的應用場景中,長度僅用來標識消息體的長度,這類協議通常由消息長度字段+消息體組成,如圖17-18所示的例子。但是,對於一些協議,長度還包含了消息頭的長度。在這種應用場景中,往往需要使用lengthAdjustment進行修正,修正后的參數組合方式如下。由於整個消息的長度往往都大於消息體的長度,所以,lengthAdjustment為負數,圖17-19展示了通過指定lengthAdjustment字段來包含消息頭的長度

lengthFieldOffset = 0;

lengthFieldLength = 2;

lengthAdjustment = -2;

initialBytesToStrip = 0。

第二種常用的方式當標識消息長度的字段位於消息頭的中間或者尾部時,需要使用lengthFieldOffset字段進行標識,下面的參數組合給出了如何解決消息長度字段不在首位的問題。

lengthFieldOffset = 2;

lengthFieldLength = 3;

lengthAdjustment = 0;

initialBytesToStrip = 0。

 

由於消息頭1的長度為2,所以長度字段的偏移量為2;消息長度字段Length為3,所以lengthFieldLength值為3。由於長度字段僅僅標識消息體的長度,所以lengthAdjustment和initialBytesToStrip都為0。

第三種常用的方式長度字段夾在兩個消息頭之間或者長度字段位於消息頭的中間,前后都有其他消息頭字段,在這種場景下如果想忽略長度字段以及其前面的其他消息頭字段,則可以通過initialBytesToStrip參數來跳過要忽略的字節長度,它的組合效果如下。

lengthFieldOffset = 1;

lengthFieldLength = 2;

lengthAdjustment = 1;

initialBytesToStrip = 3。

首先,由於HDR1的長度為1,所以長度字段的偏移量lengthFieldOffset為1;長度字段為2個字節,所以lengthFieldLength為2。由於長度字段是消息體的長度,解碼后如果攜帶消息頭中的字段,則需要使用lengthAdjustment進行調整,此處它的值為1,表示的是HDR2的長度,最后由於解碼后的緩沖區要忽略長度字段和HDR1部分,所以initialBytesToStrip為3。解碼后的結果為13個字節,HDR1和Length字段被忽略。

事實上,通過4個參數的不同組合,可以達到不同的解碼效果,用戶在使用過程中可以根據業務的實際情況進行靈活調整。

MessageToByteEncoder功能說明

MessageToByteEncoder負責將POJO對象編碼成ByteBuf,用戶的編碼器繼承MessageToByteEncoder,實現void encode(ChannelHandlerContext ctx, I msg, ByteBuf out)接口接口,示例代碼如下。

public class IntegerEncoder extends MessageToByteEncoder {
@Override
    public void encode(ChannelHandlerContext ctx, Integer msg,ByteBuf out) throws Exception {
        out.writeInt(msg);
    }
}

MessageToMessageEncoder功能說明

用戶的編碼器繼承MessageToMessageEncoder解碼器,實現void encode(Channel HandlerContext ctx, I msg, List out)方法即可。注意,它與MessageToByteEncoder的區別是輸出是對象列表而不是ByteBuf

LengthFieldPrepender功能說明

如果協議中的第一個字段為長度字段,Netty提供了LengthFieldPrepender編碼器,它可以計算當前待發送消息的二進制字節長度,將該長度添加到ByteBuf的緩沖區頭中。

通過LengthFieldPrepender可以將待發送消息的長度寫入到ByteBuf的前2個字節,編碼后的消息組成為長度字段+原消息的方式。

通過設置LengthFieldPrepender為true,消息長度將包含長度本身占用的字節數,打開LengthFieldPrepender后(0x000E):

ChannelHandler源碼分析

相對於ByteBuf和Channel,ChannelHandler的類繼承關系稍微簡單些,但是它的子類非常多。由於ChannelHandler是Netty框架和用戶代碼的主要擴展和定制點,所以它的子類種類繁多、功能各異,系統ChannelHandler主要分類如下:

  1. ChannelPipeline的系統ChannelHandler,用於I/O操作和對事件進行預處理,對於用戶不可見,這類ChannelHandler主要包括HeadHandler和TailHandler;
  2. 編解碼ChannelHandler,包括ByteToMessageCodec、MessageToMessageDecoder等,這些編解碼類本身又包含多種子類。
  3. 其他系統功能性ChannelHandler,包括流量整型Handler、讀寫超時Handler、日志Handler等。

ByteToMessageDecoder源碼分析

顧名思義,ByteToMessageDecoder解碼器用於將ByteBuf解碼成POJO對象。

首先看channelRead方法的源碼:

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //首先判斷需要解碼的msg對象是否是ByteBuf,如果是ByteBuf才需要進行解碼,否則直接透傳。
        if (msg instanceof ByteBuf) {
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                //通過cumulation是否為空判斷解碼器是否緩存了沒有解碼完成的半包消息
                first = cumulation == null;
                if (first) {
                    //如果為空,說明是首次解碼或者最近一次已經處理完了半包消息,沒有緩存的半包消息需要處理,直接將需要解碼的ByteBuf賦值給cumulation;
                    cumulation = data;
                } else {
                    //如果cumulation緩存有上次沒有解碼完成的ByteBuf,則進行復制操作,將需要解碼的ByteBuf復制到cumulation中
                    //在復制之前需要對cumulation的可寫緩沖區進行判斷,如果不足則需要動態擴展
                    if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
                        //擴展的代碼很簡單,利用字節緩沖區分配器重新分配一個新的ByteBuf,將老的cumulation復制到新的ByteBuf中,釋放cumulation。
                        //此處內存擴展沒有采用倍增或者步進的方式,分配的緩沖區恰恰夠用,此處的算法可以優化下,以防止連續半包導致的頻繁緩沖區擴張和內存復制。
                        expandCumulation(ctx, data.readableBytes());
                    }
                    //半包解碼前:(半包消息1= cumulation.readableBytes())
                    //半包解碼后:(半包消息2= data.readableBytes()=半包消息1+msg.readableBytes())
                    cumulation.writeBytes(data);
                    data.release();
                }
                //復制操作完成之后釋放需要解碼的ByteBuf對象,調用callDecode方法進行解碼
                //對ByteBuf進行循環解碼,循環的條件是解碼緩沖區對象中有可讀的字節
                //調用抽象decode方法,由用戶的子類解碼器進行解碼
                //解碼后需要對當前的pipeline狀態和解碼結果進行判斷
                //如果當前的ChannelHandlerContext已經被移除,則不能繼續進行解碼,直接退出循環;
                //如果輸出的out列表長度沒變化,說明解碼沒有成功,需要針對以下不同場景進行判斷。
                //A.如果用戶解碼器沒有消費ByteBuf,則說明是個半包消息,需要由I/O線程繼續讀取后續的數據報,在這種場景下要退出循環。
                //B.如果用戶解碼器消費了ByteBuf,說明可以解碼可以繼續進行。業務解碼器需要遵守Netty的某些契約,解碼器才能正常工作,否則可能會導致功能錯誤
                //最重要的契約就是:如果業務解碼器認為當前的字節緩沖區無法完成業務層的解碼,需要將readIndex復位,告訴Netty解碼條件不滿足應當退出解碼,繼續讀取數據報。
                //如果用戶解碼器沒有消費ByteBuf,oldInputLength == in.readableBytes(),但是卻解碼出了一個或者多個對象,這種行為被認為是非法的,需要拋出DecoderException異常。
                //最后通過isSingleDecode進行判斷,如果是單條消息解碼器,第一次解碼完成之后就退出循環。
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    cumulation.release();
                    cumulation = null;
                }
                int size = out.size();
                decodeWasNull = size == 0;

                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.get(i));
                }
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

MessageToMessageDecoder源碼分析

MessageToMessageDecoder負責將一個POJO對象解碼成另一個POJO對象。

首先看channelRead方法的源碼:

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //先通過RecyclableArrayList創建一個新的可循環利用的RecyclableArrayList,
        RecyclableArrayList out = RecyclableArrayList.newInstance();
        try {
            //對解碼的消息類型進行判斷,通過類型參數校驗器看是否是可接收的類型,如果是則校驗通過
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                    //直接調用decode抽象方法,由具體實現子類進行消息解碼
                    decode(ctx, cast, out);
                } finally {
                    //解碼完成之后,調用ReferenceCountUtil的release方法來釋放被解碼的msg對象。
                    ReferenceCountUtil.release(cast);
                }
            } else {
                //如果需要解碼的對象不是當前解碼器可以接收和處理的類型,則將它加入到RecyclableArrayList中不進行解碼。
                out.add(msg);
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            int size = out.size();
            //最后,對RecyclableArrayList進行遍歷,循環調用ChannelHandlerContext的fireChannelRead方法,
            //通知后續的ChannelHandler繼續進行處理。循環通知完成之后,通過recycle方法釋放RecyclableArrayList對象。
            for (int i = 0; i < size; i ++) {
                ctx.fireChannelRead(out.get(i));
            }
            out.recycle();
        }
    }

LengthFieldBasedFrameDecoder源碼分析 

最通用和重要的解碼器——基於消息長度的半包解碼器。

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        //調用內部的decode(ChannelHandlerContext ctx, ByteBuf in)方法,如果解碼成功,將其加入到輸出的List<Object> out列表中。
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }


    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        //判斷discardingTooLongFrame標識,看是否需要丟棄當前可讀的字節緩沖區,如果為真,則執行丟棄操作
        if (discardingTooLongFrame) {
            long bytesToDiscard = this.bytesToDiscard;
            //判斷需要丟棄的字節長度,由於丟棄的字節數不能大於當前緩沖區可讀的字節數,所以需要通過Math.min(bytesToDiscard, in.readableBytes())函數進行選擇,
            //取bytesToDiscard和緩沖區可讀字節數之中的最小值。
            int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
            //計算獲取需要丟棄的字節數之后,調用ByteBuf的skipBytes方法跳過需要忽略的字節長度,
            in.skipBytes(localBytesToDiscard);
            //然后bytesToDiscard減去已經忽略的字節長度。
            bytesToDiscard -= localBytesToDiscard;
            this.bytesToDiscard = bytesToDiscard;
            //最后判斷是否已經達到需要忽略的字節數,達到的話對discardingTooLongFrame等進行置位
            failIfNecessary(false);
        }
        //對當前緩沖區的可讀字節數和長度偏移量進行對比,如果小於長度偏移量,則說明當前緩沖區的數據報不夠,需要返回空,由I/O線程繼續讀取后續的數據報。
        if (in.readableBytes() < lengthFieldEndOffset) {
            return null;
        }

        int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
        //通過讀索引和lengthFieldOffset計算獲取實際的長度字段索引,然后通過索引值獲取消息報文的長度字段
        //根據長度字段自身的字節長度進行判斷,共有以下6種可能的取值。
        //長度所占字節為1,通過ByteBuf的getUnsignedByte方法獲取長度值;
        //長度所占字節為2,通過ByteBuf的getUnsignedShort方法獲取長度值;
        //長度所占字節為3,通過ByteBuf的getUnsignedMedium方法獲取長度值;
        //長度所占字節為4,通過ByteBuf的getUnsignedInt方法獲取長度值;
        //長度所占字節為8,通過ByteBuf的getLong方法獲取長度值;
        //其他長度不支持,拋出DecoderException異常。
        long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
        //如果長度小於0,說明報文非法,跳過lengthFieldEndOffset個字節,拋出Corrupted FrameException異常。
        if (frameLength < 0) {
            in.skipBytes(lengthFieldEndOffset);
            throw new CorruptedFrameException(
                    "negative pre-adjustment length field: " + frameLength);
        }
        //根據lengthFieldEndOffset和lengthAdjustment字段進行長度修正
        frameLength += lengthAdjustment + lengthFieldEndOffset;
        //如果修正后的報文長度小於lengthFieldEndOffset,則說明是非法數據報,需要拋出CorruptedFrameException異常。
        if (frameLength < lengthFieldEndOffset) {
            in.skipBytes(lengthFieldEndOffset);
            throw new CorruptedFrameException(
                    "Adjusted frame length (" + frameLength + ") is less " +
                    "than lengthFieldEndOffset: " + lengthFieldEndOffset);
        }
        //如果修正后的報文長度大於ByteBuf的最大容量,說明接收到的消息長度大於系統允許的最大長度上限,
        //需要設置discardingTooLongFrame,計算需要丟棄的字節數,根據情況選擇是否需要拋出解碼異常。
        if (frameLength > maxFrameLength) {
            //丟棄的策略如下:frameLength減去ByteBuf的可讀字節數就是需要丟棄的字節長度,
            //如果需要丟棄的字節數discard小於緩沖區可讀的字節數,則直接丟棄整包消息。
            //如果需要丟棄的字節數大於當前可讀字節數,說明即便當前所有可讀的字節數全部丟棄,也無法完成任務,則設置discardingTooLongFrame為true,下次解碼的時候繼續丟棄。
            //丟棄操作完成之后,調用failIfNecessary方法根據實際情況拋出異常。
            long discard = frameLength - in.readableBytes();
            tooLongFrameLength = frameLength;

            if (discard < 0) {
                // buffer contains more bytes then the frameLength so we can discard all now
                in.skipBytes((int) frameLength);
            } else {
                // Enter the discard mode and discard everything received so far.
                discardingTooLongFrame = true;
                bytesToDiscard = discard;
                in.skipBytes(in.readableBytes());
            }
            failIfNecessary(true);
            return null;
        }
        //如果當前的可讀字節數小於frameLength,說明是個半包消息,需要返回空,由I/O線程繼續讀取后續的數據報,等待下次解碼。
        // never overflows because it's less than maxFrameLength
        int frameLengthInt = (int) frameLength;
        if (in.readableBytes() < frameLengthInt) {
            return null;
        }
        //對需要忽略的消息頭字段進行判斷,如果大於消息長度frameLength,說明碼流非法,需要忽略當前的數據報,拋出CorruptedFrameException異常。
        if (initialBytesToStrip > frameLengthInt) {
            in.skipBytes(frameLengthInt);
            throw new CorruptedFrameException(
                    "Adjusted frame length (" + frameLength + ") is less " +
                    "than initialBytesToStrip: " + initialBytesToStrip);
        }
        //通過ByteBuf的skipBytes方法忽略消息頭中不需要的字段,得到整包ByteBuf。
        in.skipBytes(initialBytesToStrip);

        int readerIndex = in.readerIndex();
        int actualFrameLength = frameLengthInt - initialBytesToStrip;
        //通過extractFrame方法獲取解碼后的整包消息緩沖區
        //根據消息的實際長度分配一個新的ByteBuf對象,將需要解碼的ByteBuf可寫緩沖區復制到新創建的ByteBuf中並返回,
        //返回之后更新原解碼緩沖區ByteBuf為原讀索引+消息報文的實際長度(actualFrameLength)。
        ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
        in.readerIndex(readerIndex + actualFrameLength);
        return frame;
    }

至此,基於長度的半包解碼器介紹完畢,對於使用者而言,實際不需要對LengthFieldBasedFrameDecoder進行定制。只需要了解每個參數的用法,再結合用戶的業務場景進行參數設置,即可實現半包消息的自動解碼,后面的業務解碼器得到的是個完整的整包消息,不用再額外考慮如何處理半包。這極大地降低了開發難度,提升了開發效率。

MessageToByteEncoder源碼分析

MessageToByteEncoder負責將用戶的POJO對象編碼成ByteBuf,以便通過網絡進行傳輸。

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            //首先判斷當前編碼器是否支持需要發送的消息,如果不支持則直接透傳;
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                //對於直接內存分配ioBuffer(堆外內存),對於堆內存通過heapBuffer方法分配。
                if (preferDirect) {
                    buf = ctx.alloc().ioBuffer();
                } else {
                    buf = ctx.alloc().heapBuffer();
                }
                try {
                    //編碼使用的緩沖區分配完成之后,調用encode抽象方法進行編碼,
                    encode(ctx, cast, buf);
                } finally {
                    //編碼完成之后,調用ReferenceCountUtil的release方法釋放編碼對象msg。
                    ReferenceCountUtil.release(cast);
                }
                //對編碼后的ByteBuf進行以下判斷。
                //如果緩沖區包含可發送的字節,則調用ChannelHandlerContext的write方法發送ByteBuf;
                //如果緩沖區沒有包含可寫的字節,則需要釋放編碼后的ByteBuf,寫入一個空的ByteBuf到ChannelHandlerContext中。
                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                //發送操作完成之后,在方法退出之前釋放編碼緩沖區ByteBuf對象。
                buf = null;
            } else {
                //如果不支持則直接透傳;
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            //發送操作完成之后,在方法退出之前釋放編碼緩沖區ByteBuf對象。
            if (buf != null) {
                buf.release();
            }
        }
    }

MessageToMessageEncoder源碼分析

MessageToMessageEncoder負責將一個POJO對象編碼成另一個POJO對象,例如將XML Document對象編碼成XML格式的字符串。 

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        RecyclableArrayList out = null;
        try {
            //創建RecyclableArrayList對象,判斷當前需要編碼的對象是否是編碼器可處理的類型,如果不是,則忽略,執行下一個ChannelHandler的write方法。
            if (acceptOutboundMessage(msg)) {
                out = RecyclableArrayList.newInstance();
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                    //具體的編碼方法實現由用戶子類編碼器負責完成
                    encode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }
                //如果編碼后的RecyclableArrayList為空,說明編碼沒有成功,釋放RecyclableArrayList引用。
                if (out.isEmpty()) {
                    out.recycle();
                    out = null;

                    throw new EncoderException(
                            StringUtil.simpleClassName(this) + " must produce at least one message.");
                }
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new EncoderException(t);
        } finally {
            //如果編碼成功,則通過遍歷RecyclableArrayList,循環發送編碼后的POJO對象
            if (out != null) {
                final int sizeMinusOne = out.size() - 1;
                if (sizeMinusOne >= 0) {
                    for (int i = 0; i < sizeMinusOne; i ++) {
                        ctx.write(out.get(i));
                    }
                    ctx.write(out.get(sizeMinusOne), promise);
                }
                out.recycle();
            }
        }
    }

LengthFieldPrepender源碼分析

LengthFieldPrepender負責在待發送的ByteBuf消息頭中增加一個長度字段來標識消息的長度,它簡化了用戶的編碼器開發,使用戶不需要額外去設置這個長度字段。

     @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        int length = msg.readableBytes() + lengthAdjustment;
        //首先對長度字段進行設置,如果需要包含消息長度自身,則在原來長度的基礎之上再加上lengthFieldLength的長度。
        if (lengthIncludesLengthFieldLength) {
            length += lengthFieldLength;
        }
        //如果調整后的消息長度小於0,則拋出參數非法異常。
        if (length < 0) {
            throw new IllegalArgumentException(
                    "Adjusted frame length (" + length + ") is less than zero");
        }
        //對消息長度自身所占的字節數進行判斷,以便采用正確的方法將長度字段寫入到ByteBuf中,共有以下6種可能。
        switch (lengthFieldLength) {
        case 1:
            //長度字段所占字節為1:如果使用1個Byte字節代表消息長度,則最大長度需要小於256個字節。對長度進行校驗,如果校驗失敗,則拋出參數非法異常;
            //若校驗通過,則創建新的ByteBuf並通過writeByte將長度值寫入到ByteBuf中。
            if (length >= 256) {
                throw new IllegalArgumentException(
                        "length does not fit into a byte: " + length);
            }
            out.add(ctx.alloc().buffer(1).writeByte((byte) length));
            break;
        case 2:
            //長度字段所占字節為2:如果使用2個Byte字節代表消息長度,則最大長度需要小於65536個字節,對長度進行校驗,如果校驗失敗,則拋出參數非法異常;
            //若校驗通過,則創建新的ByteBuf並通過writeShort將長度值寫入到ByteBuf中。
            if (length >= 65536) {
                throw new IllegalArgumentException(
                        "length does not fit into a short integer: " + length);
            }
            out.add(ctx.alloc().buffer(2).writeShort((short) length));
            break;
        case 3:
            //長度字段所占字節為3:如果使用3個Byte字節代表消息長度,則最大長度需要小於16777216個字節,對長度進行校驗,如果校驗失敗,則拋出參數非法異常;
            //若校驗通過,則創建新的ByteBuf並通過writeMedium將長度值寫入到ByteBuf中。
            if (length >= 16777216) {
                throw new IllegalArgumentException(
                        "length does not fit into a medium integer: " + length);
            }
            out.add(ctx.alloc().buffer(3).writeMedium(length));
            break;
        case 4:
            //長度字段所占字節為4:創建新的ByteBuf,並通過writeInt將長度值寫入到ByteBuf中。
            out.add(ctx.alloc().buffer(4).writeInt(length));
            break;
        case 8:
            //長度字段所占字節為8:創建新的ByteBuf,並通過writeLong將長度值寫入到ByteBuf中。
            out.add(ctx.alloc().buffer(8).writeLong(length));
            break;
        default:
            //其他長度值:直接拋出Error。
            throw new Error("should not reach here");
        }
        //最后將原需要發送的ByteBuf復制到List<Object> out中,完成編碼。
        out.add(msg.retain());
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM