Netty 那些事兒 ——— 關於 “Netty 發送大數據包時 觸發寫空閑超時” 的一些思考


作者:tomas家的小撥浪鼓
鏈接:https://www.jianshu.com/p/8fe70d313d78
來源:簡書

 

本文是筆者和朋友(筆名:oojeek)一起討論該問題的一個記錄。文章以討論過程中的思路來展現(也是我們解決問題的思路路線),因此可能會有些亂。再者,如果對Netty寫數據流程不了解的朋友,可以先閱讀Netty 源碼解析 ——— writeAndFlush流程分析該篇文章,下面的討論中會涉及不少這篇文章提及的概念。

問題

起因是這樣的,朋友倒騰了個發送大數據包的demo,結果發現在發送大數據包時,寫空閑超時事件被觸發了。即便在設置了IdleStateHandler的observeOutput屬性為true的情況下,依舊會發送在寫一個大數據包的過程中,寫空閑超時事件被觸發。
先來簡單看看朋友的demo,我們來看幾個關鍵類

public class MyClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addFirst("idleStateHandler", new IdleStateHandler(true,9, 2, 11, TimeUnit.SECONDS)); pipeline.addLast(new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, Integer.MAX_VALUE, 0, 4, 0, 4, true)); pipeline.addLast(new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyClientHandler()); } } 

我們定義了一個IdleStateHandler,並且設置了observeOutput屬性為true(即,第一個參數),以及設置了寫空閑超時時間為2秒(即,第二個參數)。

public class MyClientHandler extends SimpleChannelInboundHandler<String> { private String tempString; public MyClientHandler() { StringBuilder builder = new StringBuilder(); for (int i = 0; i < 1024 * 1024; i++) { builder.append("abcdefghijklmnopqrstuvwxyz"); } tempString = builder.toString(); } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(LocalDateTime.now().toString() + "----" + ctx.channel().remoteAddress().toString() + "----" + msg.length()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { sendData(ctx); } private void sendData(ChannelHandlerContext ctx) { if (!ctx.channel().isActive()) { System.out.println("channel inactive..."); ctx.close(); return; } System.out.println("send a pack of data ..."); long tickCount = System.currentTimeMillis(); ChannelFuture future = ctx.writeAndFlush(tempString); ChannelPromise promise = (ChannelPromise)future; promise.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { System.out.println("send completed"); sendData(ctx); } }); System.out.println("Time elapse:" + (System.currentTimeMillis() - tickCount)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); //super.exceptionCaught(ctx, cause); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //System.out.println(LocalDateTime.now().toString()); if (evt == IdleStateEvent.READER_IDLE_STATE_EVENT) { System.out.println("READER_IDLE_STATE_EVENT"); } else if (evt == IdleStateEvent.WRITER_IDLE_STATE_EVENT){ // for heartbit System.out.println("WRITER_IDLE_STATE_EVENT----" + LocalDateTime.now().toString()); //ctx.writeAndFlush("ACK"); } else if (evt == IdleStateEvent.ALL_IDLE_STATE_EVENT) { //System.out.println("ALL_IDLE_STATE_EVENT"); } else if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT) { System.out.println("FIRST_READER_IDLE_STATE_EVENT"); } else if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) { //System.out.println("FIRST_WRITER_IDLE_STATE_EVENT"); } else if (evt == IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT) { //System.out.println("FIRST_ALL_IDLE_STATE_EVENT"); } //super.userEventTriggered(ctx, evt); } } 

這里,定義了一個27262976字節大小的tempString數據,用於發送。並實現了userEventTriggered方法,當寫空閑超時事件發送時,會打印一條『"WRITER_IDLE_STATE_EVENT----" + LocalDateTime.now().toString()』信息。

然后啟動程序,連接的服務端是朋友的騰訊雲,服務器做了帶寬限制,限制為1M,以重現問題。

運行程序的過程中,發現,當大數據包(即,27262976字節大小的tempString)在發送的過程中,寫空閑超時不斷的被觸發調用。並且我們自定義handler中只發送了一個數據包,但到了底層卻有兩個數據包發送出去了。

然后就此情況我們開始了討論。。。

尋找問題發送的根源

首先,IdleStateHandler的write操作確實確實只是將listener加到了write操作的listener集合中,write操作本身不會去修改lastWriteTime。

然后,我們曉得flush是一個出站操作,最終ChannelPipeline的head會對其進行處理。head底層會調用NioSocketChannel.doWrite()方法來將數據刷新到socket中。

doWrite()操作是一個寫循環操作。第一次循環:

 
 
nioBufferCnt為2;說明有2個待發送的ByteBuf。
expectedWrittenBytes:27262980。這個字段表示本次flush操作我們希望寫出去的數據大小,也就是之前我們write操作已經寫入的數據。即:
ChannelFuture future = ctx.writeAndFlush(tempString);

 

public MyClientHandler() { StringBuilder builder = new StringBuilder(); for (int i = 0; i < 1024 * 1024; i++) { builder.append("abcdefghijklmnopqrstuvwxyz"); } tempString = builder.toString(); } 

為什么是2個待發送的ByteBuf了?
這和我們定義了『pipeline.addLast(new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false));』有關:

 
 
所以,在經過LengthFieldPrepender編碼器處理后,傳入給下一個ChannelOutboundHandler的待處理數據已經是2個ByteBuf了。即,如下:
 
 
在encode()函數調用完,是會將編碼后的結果集合中的ByteBuf依次的調用ctx.write()操作來傳遞給下一個ChannelOutboundHandler。這里我們暫時只要知道兩次write操作最終會將兩個ByteBuf給添加到ChannelOutboundBuffer中。一個ByteBuf的capacity是4,里面記錄了我們要發送的消息的大小;另一個ByteBuf就是我們要發送的數據了。


知道了為什么有2個待發送的ByteBuf,我們繼續看doWrite()操作中寫數據的操作:
 
 
這里主要完成了一次寫操作,config().getWriteSpinCount()為16,也就是一次寫操作會最多執行16次的SocketChannel.write操作來將數據寫到網絡中。每次ch.write完都會進行相應的『expectedWrittenBytes -= localWrittenBytes;』操作。如果在最后expectedWrittenBytes依舊大於0,則說明在這16次的socket寫操作后依舊還有未寫完的數據等待被繼續寫,那么done就會為false,那么就會將flush操作封裝為一個task提交至NioEventLoop的任務隊列中,在下一次事件循環時繼續發送未發完的數據;否則若所有的數據都寫完了,done會被置為true。注意,ch.write操作會返回本次寫操作寫出的字節數,但該方法返回0時,即localWrittenBytes為0,則說明底層的寫緩沖區已經滿了(這里應該指的是linux底層的寫緩沖區滿了),這是就會將setOpWrite置為true,此時因為數據還沒寫完done還是false。那么這種情況下就會注冊當前SocketChannel的寫事件:
 
 
當底層緩沖區有空余空間時就會觸發這個寫事件,繼續將為寫完的數據發送出去。


最后,我們來看doWrite()操作中的『in.removeBytes(writtenBytes);』操作
 
 
『 if (readableBytes <= writtenBytes) 』這個if判斷表示:本次socket的write操作(這里是真的是網絡通信寫操作了)已經寫出去的字節數 大於 了當前ByteBuf包可讀取的字節數。 這說明,當前這個包中所有的可寫的數據都已經寫完了(SocketChannel.write(bytebuffer)👈這是將byteBuffer中的數據讀出來,然后寫入到socketChannel中),既然當前這個ByteBuf的數據都寫完了,那么久可以將其刪除了。即,調用『remove()』操作,這個操作就會將回調已經注冊到ByteBuf的promise上的所有listeners,這里包括了“IdleStateHandler 的 writeListener(該listener就會完成對lastWriteTime的更新)”。『remove()』操作還會將當前的ByteBuf指向下一個待處理的ByteBuf。

 

『目前,我們可以先理解為,write操作的數據最終都會放到ChannelOutboundBuffer中,其中有兩個屬性private Entry unflushedEntry、private Entry flushedEntry。它們都是用Entry對象通過next指針來維護的一個單向鏈表。
unflushedEntry表示還未刷新的ByteBuf的鏈表頭;flushedEntry表示調用flush()操作時將會進行刷新的ByteBuf的鏈表頭。
在write的時候會將ByteBuf封裝為一個Entry對象放到unflushedEntry的尾部。當調用flush時,就會將unflushedEntry賦值給flushedEntry,然后將unflushedEntry置null。
同時current()返回當前正在處理的Entry對象(Entry中封裝了ByteBuf)』

到此為止,第一個ByteBuf,即記錄着我們要發送消息長度大小的ByteBuf就發送出去了,並且觸發了一次“IdleStateHandler 的 writeListener”的調用。

 

 

那么,第二個ByteBuf就是我們的大數據包了。
 
 

通過上面的分析,我們知道大數據包走的是else流程。也就是說,本次真實寫出去的數據 比 當前這個ByteBuf的可讀取數據要小。也就說明,當前這個ByteBuf還沒有被完全的寫完。因此並不會通過調用『remove()』操作來觸發“IdleStateHandler 的 writeListener”的回調。直到整個大數據包所有的內容都寫出去了,那么這是if(readableBytes <= writtenBytes)才會為真,這是才會去觸發“IdleStateHandler 的 writeListener”的回調。
也就是說,只有在一個ByteBuf的數據全部都寫完了之后,才會去觸發所有注冊到這個write操作上的GenericFutureListener的回調。
netty其實有提供了一個ChannelProgressiveFuture來監控數據的發送過程,它可以實現在一個大數據發送的過程中回調注冊到其上的ChannelProgressiveFutureListener,比如:

        ChannelProgressivePromise progressivePromise = ctx.channel().newProgressivePromise(); progressivePromise.addListener(new ChannelProgressiveFutureListener(){ @Override public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception { System.out.println("數據正在發送中。。。"); } @Override public void operationComplete(ChannelProgressiveFuture future) throws Exception { System.out.println("數據已經發送完了!"); } }); ctx.writeAndFlush(tempString, progressivePromise); 



最后。說明下,當將大數據包拆成一個個小包發送時,為什么不會導致寫空閑超時的觸發。
因為當大數據包被拆分成一個個小包發送時,每個小數據包就是一個ByteBuf,每個ByteBuf待寫出的數據量就很小,比如本例中,我一個ByteBuf就是一個長度為26的英文字符串,那么每次寫操作完成后在removeBytes()操作:

 
 
總是進入if為true的語句塊中。所以會不同的觸發“IdleStateHandler 的 writeListener”以更新lastWriteTime。


到目前為止,我們已經知道導致寫空閑超時的原因所在了。這時我們可以想到的解決方案有:
① 用變量來記錄是否正在發送中,如果在發送中,即使寫空閑超時被觸發也不發送心跳
② 將打包拆分成小包的方式

 

更進一步

但是,我們還有一個疑惑未解決,那就是IdleStateHandler類中observeOutput屬性到底是干啥用的?
我們先來看看observeOutput屬性在IdleStateHandler中的使用:
首先在doc文檔中,對observeOutput屬性的描述是“在訪問寫空閑超時時,字節消費是否會被考慮進去,默認為false”,也就是說,當字節被消費時,寫空閑超時事件否非該被觸發。
從上文,我們已經得知,只有在每次真正寫完一個Bytebuf后,該ByteBuf的異步寫操作才算是完成,那么才會去觸發該異步寫操作上的listener,也就是這是才會修改IdleStateHandler的lastWriteTime屬性。
起初,我們以為如果將“observeOutput”屬性設置為true,那么即使ByteBuf包沒有被完全寫完,但是已經有字節數據在被寫出了,那么此時也不應該觸發寫空閑超時事件。但,結果卻是寫空閑超時事件依舊被觸發了。這是為什么了?

 

下面我們就來好好說說“observeOutput”屬性的作用,首先我們來看看IdleStateHandler中observeOutput的使用:
 
 
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) { if (observeOutput) { // We can take this shortcut if the ChannelPromises that got passed into write() // appear to complete. It indicates "change" on message level and we simply assume // that there's change happening on byte level. If the user doesn't observe channel // writability events then they'll eventually OOME and there's clearly a different // problem and idleness is least of their concerns. if (lastChangeCheckTimeStamp != lastWriteTime) { lastChangeCheckTimeStamp = lastWriteTime; // But this applies only if it's the non-first call. if (!first) { return true; } } Channel channel = ctx.channel(); Unsafe unsafe = channel.unsafe(); ChannelOutboundBuffer buf = unsafe.outboundBuffer(); if (buf != null) { int messageHashCode = System.identityHashCode(buf.current()); long pendingWriteBytes = buf.totalPendingWriteBytes(); if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) { lastMessageHashCode = messageHashCode; lastPendingWriteBytes = pendingWriteBytes; if (!first) { return true; } } } } return false; } 

這里“observeOutput”為true情況下,主要會根據三對數值的比較情況來覺得輸出是否有改變,① lastChangeCheckTimeStamp 與 lastWriteTime;② messageHashCode 與 lastMessageHashCode;③ pendingWriteBytes 與 lastPendingWriteBytes;
① 和 ② 都好理解,最讓我們困惑的是③,也就是說,pendingWriteBytes屬性並未像我們猜測的那樣隨着ByteBuf中的數據的寫出而改變。 這又是為什么了?

為了解決這個問題,我們通過反向思考來嘗試的解決。即,這個值(pendingWriteBytes)是在什么情況下會被修改?
ChannelOutboundBuffer:

 

totalPendingSize表示的是所有待發送的ByteBuf的總長度,接每次往ChannelOutboundBuffer添加一個ByteBuf的時候就會增加這個字段:
 
 
並且會在,每次發送完一個ByteBuf后,調用『decrementPendingOutboundBytes(long size)』來減少totalPendingSize的值,其中參數size為當前發送出去的ByteBuf的數據大小。
 
 
確實是在每次寫完一個Bytebuf后才會調用一次decrementPendingOutboundBytes(long size)。

好了,現在我們知道,其實pendingWriteBytes實際上也是在一個ByteBuf都寫出后才會被修改的。。。 那么問題又來了,既然是這樣,那么這個pendingWriteBytes又有什么用了?或者說observeOutput屬性的使用到底是在什么場景下??
這個問題其實在hasOutputChanged方法注解的github issues 6150中給出了討論。
目前能得到的結論是observeOutput屬性是為了issues 6150問題所提供的解決方案,而這個問題是在通過HTTP2協議進行數據發送時導致的,討論中提及netty在對HTTP2傳輸協議進行數據傳輸時可能會將多個數據包整合正一個包發送導致寫空閑超時事件被觸發了(因為,該問題與本文的問題並無關聯,所以不做具體說明)。但是通過github issues 6150討論中,我們得知了netty之所以不提供在寫一個大數據包的過程中修改pendingWriteBytes的原因(即,netty不支持某個ByteBuf中寫出部分數據就修改ChannelOutboundBuffer中totalPendingSize值。),這是為了防止ABA問題。

 

下面我們對hasOutputChanged進行更深一步的說明,來看看ABA問題可能出現的情況(也就是因為這些情況,netty不允許在一個ByteBuf未寫完的情況下就修改ChannelOutboundBuffer中totalPendingSize值):
 
 
① 因為IdelStateHandler是可以在非EventLoop現實上執行的,也就是說寫空閑超時任務是可以在非EventLoop線程上執行。這個代碼塊就適用於這種情況下的一個捷徑判斷。因為listener只會在NioEventLoop線程上執行,也就是說,lastWriteTime只會在NioEventLoop線程上被修改。而WriterIdleTimeoutTask則是放到IdelStateHandler所對應的Executor中的,當IdelStateHandler所對應的Executor和NioEventLoop不是同一個時,就可能出現『lastChangeCheckTimeStamp != lastWriteTime』的情況(該判斷在WriterIdleTimeoutTask中被執行,而lastWriteTime在NioEventLoop線程中被修改)。
② 因為如果不是以ByteBuf或者FileRegion為單位修改pending bytes的話,可能出現ABA問題。即,因為write操作可以由多個不同的線程來操作(非EventLoop線程),這可能導致EventLoop線程在進行該OutboundBuffer中ByteBuf的flush操作時,其他線程再往這個OutboundBuffer中加數據,這可能使得最終pending bytes的值並沒有改變,但實際上pending bytes是改變過的了,這樣就會使得判斷錯誤。(PS:目前NIO傳輸時,寫完一個ByteBuf就會觸發該ByteBuf的listener,那么lastWriteTime就會被修改,此時根本不會進入)
③ 而另一個ABA問題是,如果保持了ByteBuf的引用,如果使用池的ByteBuf的話(默認,Netty就是使用池的ByteBuf),如果我們存儲OutBoundBuffer中的當前的(鏈表頭)的那個ByteBuf對象的引用,在每次寫空閑超時事件中判斷這個ByteBuf對象的hashCode與上一次調用時的值做比較來得出是否是同一個ByteBuf。👈這種情況也可能出現ABA問題,正式因為ByteBuf是池的,那么就可能在寫空閑超時事件回調方法中存有的ByteBuf引用還是一樣的,但實際上是被回收后再次分配出去的,因此是邏輯上來說是不一樣的ByteBuf對象了。

 

  • “observeOutput” 字段的使用場景:
    當在寫一個大數據包的時候,且該在寫超時已不是第一次觸發的時候(即,first 為 false),這個大數據包還沒寫完。但在此時,我們已經有 ch.write(data)了其數據了,這會導致『pendingWriteBytes != lastPendingWriteBytes == true』(因為,channelOutboundBytes 只有在一個 ByteBuf 都寫出去后,即,寫到 socket 的寫緩沖后。才會減少其totalPendingWriteBytes 的值。這樣在👆這個場景中,在我們自此write一個data的時候,totalPendingWriteBytes的值會增加),因此來表示 outputChanged。也就是說,observeOutput 觀察的是,是否有新的寫數據操作,而非對已經操作的write的數據的觀察!!!

 

解決方案

好了,到目前為止,我們已經知道為什么我們使用“observeOutput”屬性無法達到我們預計的效果了。那么,關於發送大數據包我們到底可以做處理了。。
這里,我們覺得可以采用的一個方式是,使用“ChunkedWriteHandler”來實現大數據包的傳輸。
這個ChunkedWriteHandler又是怎么突然跑出來的。。是這樣的,其實之前我們也不曉得有這個類,或者說因為了解不深給忽略了它。正好在解決這個問題的間隙,將Netty的寫數據操作給過了邊,在這其中發現了Netty自身目前僅對ChunkedWriteHandler和HTTP2的提供了WriteBufferWaterMark的支持,其余的需要我們程序自行添加支持。而WriteBufferWaterMark通常就是為了控制有大量待寫出數據的情況下對寫出流量進行控制的一個方式,這看似和我們的大數據包寫出還是有些個關系的。因此,我們通過doc簡單了解了下ChunkedWriteHandler的使用,發現確實是個可行的方式。在經過測試后也如預期般達到了我們要的效果!下面,我們就來說說如果通過ChunkedWriteHandler來實現大數據包發送的發送。

這里對ChunkedWriteHandler做一個簡單的介紹:
ChunkedWriteHandler:一個handler,用於支持異步寫大數據流並且不需要消耗大量內存也不會導致內存溢出錯誤( OutOfMemoryError )。
ChunkedWriteHandler僅支持ChunkedInput類型的消息。也就是說,僅當消息類型是ChunkedInput時才能實現ChunkedWriteHandler提供的大數據包傳輸功能(ChunkedInput是一個不確定長度的數據流)。
ChunkedWriteHandler中維護了一個待發送的數據包消息隊列(Queue<PendingWrite> queue,其中PendingWrite封裝了你待發送的消息以及異步寫操作的promise)

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { queue.add(new PendingWrite(msg, promise)); } 

這樣使得你write的數據包在經過ChunkedWriteHandler的時候,會先被存儲到這個消息隊列中,並不會立即放入到ChannelOutboundBuffer里。

 

 

而當你執行flush操作時,ChunkedWriteHandler會依次取出消息隊列中的大數據包,然后拆分成一個個小數據包(ByteBuf)后發給下游的ChannelOutboundHandler,並且在每次發送完一個ByteBuf包后都會立即執行依次ctx.flush()操作將該ByteBuf發送到網絡中。
 
 

但也正是因為,ChunkedWriteHandler將一個大數據包拆分成了一個個小數據包放入底層的ChannelOutboundBuffer進行傳輸,這使得你對大數據包的異步寫操作注冊的listener在底層的ChannelOutboundBuffer已經無法得到並且回調了,這就需要我們通過程序來進行狀態的管理以保持我們原有邏輯的正確性。
ChunkedWriteHandler會為每個發送的小數據包注冊一個listener,這個listener會在小數據包成功發送完成后調用原始大數據包的GenericProgressiveFutureListener,上面我們已經說了通過GenericProgressiveFutureListener我們可以監控數據包的發送進度(通過回調operationProgressed方法實現),以及在大數據包發送完后得到一個通知(通過回調operationComplete方法實現)。因此,我們可以在operationComplete回調方法中對寫原始大數據包的異步操作上注冊的listener進行回調(通過表示寫異步操作promise完成來實現)。

值得一提的時,ChunkedWriteHandler對將大數據包拆分成小數據包發往下游進行的操作是受WriteBufferWaterMark控制的,當寫緩沖區中的數據數量超過了設置的高水位標志,那么Channel#isWritable()方法將開始返回false,那么此時ChunkedWriteHandler就不會繼續拆分大數據包。然后當寫緩沖區中的字節數量減少至小於了低水位標志,Channel#isWritable()方法會重新開始返回true,而此時ChunkedWriteHandler會繼續拆分未拆分完的大數據包,繼續數據的寫操作。

絮絮叨叨了這么多,來看看具體的實現:
首先修改了MyClientInitializer:

public class MyClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addFirst("idleStateHandler", new IdleStateHandler(true, 9, 2, 11, TimeUnit.SECONDS)); pipeline.addLast(new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, Integer.MAX_VALUE, 0, 4, 0, 4, true)); pipeline.addLast(new LengthFieldPrepender(ByteOrder.LITTLE_ENDIAN, 4, 0, false)); pipeline.addLast("chunkedWriteHandler", new ChunkedWriteHandler()); pipeline.addLast("myClientChunkHandler", new MyClientChunkHandler()); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyClientHandler()); } } 

在StringEncoder和LengthFieldPrepender兩個編碼器間添加了MyClientChunkHandler和ChunkedWriteHandler。
MyChunkedWriteHandler是一個出站處理器,它會完成將StringEncoder編碼后的大數據包類型轉換成ChannelInputStream類型,以使得其后的ChunkedWriteHandler能夠對該大數據包實現拆分分發的作用。

接下來是自定義的 MyClientChunkHandler,用於將我們的待發送的大數據包類型轉換成ChunkedInput類型,以使得ChunkedWriteHandler能夠發揮作用。

public class MyServerChunkHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if(msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf)msg; ByteInputStream in = new ByteInputStream(); byte[] data = null; if(buf.hasArray()) { System.out.println("+++ is array"); data = buf.array().clone(); } else { System.out.println("--- is direct"); data = new byte[buf.readableBytes()]; buf.writeBytes(data); } // System.out.println("===== data length : " + data.length); in.setBuf(data); ChunkedStream stream = new ChunkedStream(in); ReferenceCountUtil.release(msg); ctx.write(stream, promise); } else { super.write(ctx, msg, promise); } } } 

👆實現了將數據包類型轉換為ByteInputStream類型,傳遞個下一個ChannelOutboundHandler(也就是ChunkedWriteHandler)

后記

本次問題和朋友陸陸續續的討論了兩個晚上,印象還是比較深刻的。在第一次討論問題的時候,我們對Netty的寫數據流程也沒有比較清晰的概念。后面將這塊流程補上后,再重新回來看待問題,感覺又清晰了不少,再者對於IdleStateHandler的observeOutput屬性確實是比較容易讓人誤解。如果沒有去翻查github和源碼的話,不容易明白這個屬性真正的用意。但也正是因為對Netty寫數據流程的梳理,讓我們發現了一直忽略ChunkedWriteHandler,也讓這個問題有了現在的這個解決方案。當然,可能隨着后面進一步深入的學習,我們會發現更好的解決方案,那么到時候也會繼續分享的。
若文章有任何錯誤,望大家不吝指教:)





免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM