一 處理基於流的傳輸(Scoket Buffer的一點附加說明)
對於例如TCP/IP這種基於流的傳輸協議實現,接收到的數據會被存儲在socket的接受緩沖區內。不幸的是,基於流的傳輸不是一個包隊列而是一個字節隊列。在一個開放的系統中,這意味着即使我們發送了兩條消息分別包含在兩個數據包里,接收方不會當作兩條消息來對待,而是將其放在同一個字節獨列中。因此,傳輸不能保證收到的消息與發送的消息一致。
對於時間客戶端的例子,一個32位的int數據量非常小,一般不會被分片(鏈路層限制一個package大小一般為1500字節),但是問題是它確實有可能被分成多片,分片的概率隨着網絡的繁忙而增加。最簡單的解決辦法就是增加一個內部的累加緩沖,等累計滿4個字節時再向上提交數據。
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { private ByteBuf buf; @Override public void handlerAdded(ChannelHandlerContext ctx) { buf = ctx.alloc().buffer(4); // (1) } @Override public void handlerRemoved(ChannelHandlerContext ctx) { buf.release(); // (1) buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; buf.writeBytes(m); // (2) m.release(); if (buf.readableBytes() >= 4) { // (3) long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
(1)ChannelHandler有兩個存活期方法:handlerAdded()和handlerRemoved(),這兩個方法允許我們自己構造一個初始化任務或結束任務。
(2)首先,所有的接受的數據先放到累計緩存里。
(3)然后,handler必須檢查是否有了足夠的data,如在本例中須足夠4個字節,然后執行實際的業務邏輯。若數據不足,當更多的數據到達時,netty會再次執行channelReade()方法直到累計到4個字節。
第二個解決辦法
當字段便多時,第一種解決方案會變得非常復雜且不可維護,可以通過向ChannelPipeline中增加多個ChannelHandler的方法,將一個大的ChannelHandler分解成多個模塊來降低應用的復雜性。例如,
package io.netty.example.time; public class TimeDecoder extends ByteToMessageDecoder { // (1) @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2) if (in.readableBytes() < 4) { return; // (3) } out.add(in.readBytes(4)); // (4) } }
(1)ByteToMessageDecoder是一ChannelInboundHandler的實現類,可以非常容易的處理分片問題。
(2)當新的數據到達時,ByteToMessageDecoder講數據存儲在一個內在的累積buffer中,調用decode()方法進行處理
(3)decode()根據接收到的字節大小進行判定,若滿4個字節則增加一個對象到list。
(4)如果decode()增加了個一個out的對象,意味着decoder編碼成功。ByteToMessageDecoder會丟棄累積buffer中已經讀過的部分。若out.add(null),decoder即停止。
由於ChannlePipeline中增加了一個handler,因此我們必須修改ChannelInitializer為:
b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); } });
如果你是個冒險者,你可能會嘗試ReplayingDecoder。該handler進一步簡化了decoder。
public class TimeDecoder extends ReplayingDecoder<VoidEnum> { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out, VoidEnum state) { out.add(in.readBytes(4)); } }
另外關於decode的例子可以參考一下兩個包
io.netty.example.factorialfor a binary protocol, andio.netty.example.telnetfor a text line-based protocol.
利用POJO代替ByteBuf
目前我們看到的例子均是使用ByteBuf作為協議消息的數據結構。在這一節,我們將使用POJO來代替ByteBuf來改善Time協議的客戶端和服務器。
使用POJO的有點是十分明顯的,通過分離出解析ByteBuf中數據的代碼,handler會變得更加可維護和重用。在Time的客戶端和服務器的例子中,我們只讀32字節的integer且這並不是一個主要的直接應用ByteBuf的案例。然而,你會發現當你實現一個真正的協議時,做這樣的分離是十分必要的。
首先,我們先定義一個新類型,UnixTime
package io.netty.example.time; import java.util.Date; public class UnixTime { private final int value; public UnixTime() { this((int) (System.currentTimeMillis() / 1000L + 2208988800L)); } public UnixTime(int value) { this.value = value; } public int value() { return value; } @Override public String toString() { return new Date((value() - 2208988800L) * 1000L).toString(); } }
我們重新編寫TimeDecoder來禪師一個UnixTIme。
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < 4) { return; } out.add(new UnixTime(in.readInt())); }
然后我們更新decoder,TimeClientHandler就不再使用ByteBuf了
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { UnixTime m = (UnixTime) msg; System.out.println(m); ctx.close(); }
是不是非常的簡潔優雅。Server端同樣如此。首先更新ServerHandler
@Override public void channelActive(ChannelHandlerContext ctx) { ChannelFuture f = ctx.writeAndFlush(new UnixTime()); f.addListener(ChannelFutureListener.CLOSE); }
現在需要編寫解碼部分,encoder是ChannelOutbountHandler的實現類,將UnixTIme轉化成下層的ByteBuf,編寫encoder要比編寫decoder簡單的多,因為此時不必考慮tcp包分片的問題。
package io.netty.example.time; public class TimeEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { UnixTime m = (UnixTime) msg; ByteBuf encoded = ctx.alloc().buffer(4); encoded.writeInt(m.value()); ctx.write(encoded, promise); // (1) } }
(1)這一行中有些十分重要的內容
首先,我們將原始的ChannelPromis找原來的樣子傳輸,以保證Netty在寫入鏈路時能夠正確的標記成功或失敗。
其次,並不調用ctx.flush().handler有一個默認的flush方法,若想每次寫都flush則須:ctx.write(encoded,false,promise);或ctx.writeAndFlush(encode,promise);
