基於流的數據傳輸:
在基於流的傳輸(如TCP / IP)中,接收的數據被存儲到套接字接收緩沖器中。 不幸的是,基於流的傳輸的緩沖區不是數據包的隊列,而是字節隊列。 這意味着,即使您將兩個消息作為兩個獨立數據包發送,操作系統也不會把它們視為兩個消息,而只是一堆字節。 因此,您無法保證您所讀取的內容正是您遠程發送信息時的正確切分。 例如,假設操作系統的TCP / IP堆棧已經收到三個數據包:

由於基於流的協議,讀取的數據分片信息可能如下:

因此,無論服務器端或客戶端如何,接收部分都應將接收到的數據進行碎片整理,以將其應用到邏輯上容易理解的一個或多個有意義的幀中。 在上述示例的情況下,接收到的數據應該如下所示:

解決方案一:
現在讓我們回到TIME時間客戶端的例子。 我們在這里也有同樣的問題。 一個32位整數是非常少量的數據,它不可能經常被分段。 然而,問題是可以分散,碎片化的可能性會隨着流量的增加而增加。
簡單的解決方案是創建內部累積緩沖區,並等待所有4個字節都被接收到內部緩沖區。 以下是修改的TimeClientHandler實現,可以解決問題:
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { private ByteBuf buf; @Override public void handlerAdded(ChannelHandlerContext ctx) { buf = ctx.alloc().buffer(4); // (1) } @Override public void handlerRemoved(ChannelHandlerContext ctx) { buf.release(); // (1) buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; buf.writeBytes(m); // (2) m.release(); if (buf.readableBytes() >= 4) { // (3) long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
ChannelHandler有兩個生命周期偵聽器方法:handlerAdded()和handlerRemoved()。 只要阻塞時間不長,您可以執行任意的初始化任務。
首先,所有收到的數據應該被累積到buf。然后,處理程序必須檢查buf是否有足夠的數據,在此示例中為4個字節,然后繼續執行業務邏輯。 否則,Netty會在更多數據到達時再次調用channelRead()方法,最終所有4個字節都將被累積。
解決方案二:
你會發現第一種方式缺少靈活性,無法應對可變的長度字段,ChannelInboundHandler實現將很快變得不可靠。
您可能已經注意到,您可以向ChannelPipeline添加多個ChannelHandler,因此,您可以將一個單一的ChannelHandler拆分成多個模塊化通道,以減少應用程序的復雜性。 例如,您可以將TimeClientHandler拆分為兩個處理程序:
處理碎片問題的TimeDecoder,
初始簡單版本的TimeClientHandler。
幸運的是,Netty提供了一個可擴展的類,可幫助您編寫開箱即用的第一個:
package io.netty.example.time; public class TimeDecoder extends ByteToMessageDecoder { // (1) @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2) if (in.readableBytes() < 4) { return; // (3) } out.add(in.readBytes(4)); // (4) } }
1,ByteToMessageDecoder是ChannelInboundHandler的一個實現,它可以輕松應對碎片問題。
2,當接收到新數據時,ByteToMessageDecoder會在內部維護的累積緩沖區中調用decode()方法。
decode()累積到足夠的數據到緩存區還會刷出數據。 當接收到更多數據時,ByteToMessageDecoder將持續調用decode()。
3,如果decode()將對象刷出去,則表示解碼器成功解碼了消息,ByteToMessageDecoder將丟棄累積緩沖區的已經刷出的讀取部分。
4,ByteToMessageDecoder將繼續調用decode()方法,直到它沒有讀取到更多的數據。
現在我們有另一個處理程序插入ChannelPipeline,我們應該修改TimeClient中的ChannelInitializer實現:
b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); } });
如果你是一個冒險的人,你可能想嘗試ReplayingDecoder,這更簡單的解碼器。 但是,您需要參考API參考資料。
public class TimeDecoder extends ReplayingDecoder<Void> { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { out.add(in.readBytes(4)); } }
此外,Netty還提供開箱即用的解碼器,使您能夠輕松實現大多數協議,並幫助您避免使用單一的不可維護的處理程序實現。 有關更多詳細示例,請參閱以下軟件包:
io.netty.example.factorial為二進制協議
io.netty.example.telnet用於基於文本行的協議。
==========================================================================================
讀取實體(pojo)代替二進制流
我們迄今為止審查的所有示例都使用ByteBuf作為協議消息的主要數據結構。 在本節中,我們將改進TIME協議客戶端和服務器示例,以使用POJO而不是ByteBuf。
在您的ChannelHandlers中使用POJO的優勢是顯而易見的 您的處理程序變得更加可維護,並且可以通過將從ByteBuf中提取信息的代碼從處理程序中分離出來來重新使用。 在TIME客戶端和服務器示例中,我們只讀取一個32位整數,直接使用ByteBuf不是一個主要問題。 但是,您將發現在實現真實世界協議時需要進行分離。
首先,讓我們定義一個名為UnixTime的新類型。
package io.netty.example.time; import java.util.Date; public class UnixTime { private final long value; public UnixTime() { this(System.currentTimeMillis() / 1000L + 2208988800L); } public UnixTime(long value) { this.value = value; } public long value() { return value; } @Override public String toString() { return new Date((value() - 2208988800L) * 1000L).toString(); } }
我們現在可以修改TimeDecoder來生成UnixTime而不是ByteBuf。
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < 4) { return; } out.add(new UnixTime(in.readUnsignedInt())); }
使用更新的解碼器,TimeClientHandler不再使用ByteBuf:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { UnixTime m = (UnixTime) msg; System.out.println(m); ctx.close(); }
更簡單和優雅,對吧? 同樣的技術可以在服務器端應用。 這次我們先來更新TimeServerHandler:
@Override public void channelActive(ChannelHandlerContext ctx) { ChannelFuture f = ctx.writeAndFlush(new UnixTime()); f.addListener(ChannelFutureListener.CLOSE); }
現在,唯一缺少的部分是一個編碼器,它是一個ChannelOutboundHandler的實現,將UnixTime轉換成一個ByteBuf。 它比編寫解碼器要簡單得多,因為在編碼消息時不需要處理數據包碎片和匯編。
package io.netty.example.time; public class TimeEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { UnixTime m = (UnixTime) msg; ByteBuf encoded = ctx.alloc().buffer(4); encoded.writeInt((int)m.value()); ctx.write(encoded, promise); // (1) } }
這一行中有很多重要的事情。
首先,我們按原樣傳遞原來的ChannelPromise,以便Netty將編碼數據實際寫入電線時將其標記為成功或失敗。
其次,我們沒有調用ctx.flush()。 有一個單獨的處理方法void flush(ChannelHandlerContext ctx),用於覆蓋flush()操作。
為了進一步簡化,您可以使用MessageToByteEncoder:
public class TimeEncoder extends MessageToByteEncoder<UnixTime> { @Override protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) { out.writeInt((int)msg.value()); } }
最后一個任務是將TimeEncoder插入到TimeServerHandler之前的服務器端的ChannelPipeline中,這是一個簡單的練習。
===============================================================================
關閉你的應用
關閉Netty應用程序通常就像關閉通過shutdownGracefully()創建的所有EventLoopGroups一樣簡單。 它返回一個將EventLoopGroup完全終止並且屬於該組的所有通道已關閉的通知您的未來。
總結
在本章中,我們快速瀏覽了Netty,並展示了如何在Netty上編寫完整的網絡應用程序。
有關Netty的更多詳細信息在即將到來的章節。 我們還鼓勵您查看io.netty.example包中的Netty示例。
還請注意,社區一直在等待您的問題和想法,以幫助您,並根據您的反饋不斷改進Netty及其文檔。
