Netty 4.0 中文文檔


一 處理基於流的傳輸(Scoket Buffer的一點附加說明)

對於例如TCP/IP這種基於流的傳輸協議實現,接收到的數據會被存儲在socket的接受緩沖區內。不幸的是,基於流的傳輸不是一個包隊列而是一個字節隊列。在一個開放的系統中,這意味着即使我們發送了兩條消息分別包含在兩個數據包里,接收方不會當作兩條消息來對待,而是將其放在同一個字節獨列中。因此,傳輸不能保證收到的消息與發送的消息一致。

對於時間客戶端的例子,一個32位的int數據量非常小,一般不會被分片(鏈路層限制一個package大小一般為1500字節),但是問題是它確實有可能被分成多片,分片的概率隨着網絡的繁忙而增加。最簡單的解決辦法就是增加一個內部的累加緩沖,等累計滿4個字節時再向上提交數據。

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();

        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

(1)ChannelHandler有兩個存活期方法:handlerAdded()和handlerRemoved(),這兩個方法允許我們自己構造一個初始化任務或結束任務。

(2)首先,所有的接受的數據先放到累計緩存里。

(3)然后,handler必須檢查是否有了足夠的data,如在本例中須足夠4個字節,然后執行實際的業務邏輯。若數據不足,當更多的數據到達時,netty會再次執行channelReade()方法直到累計到4個字節。

 

第二個解決辦法

當字段便多時,第一種解決方案會變得非常復雜且不可維護,可以通過向ChannelPipeline中增加多個ChannelHandler的方法,將一個大的ChannelHandler分解成多個模塊來降低應用的復雜性。例如,

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }

        out.add(in.readBytes(4)); // (4)
    }
}

(1)ByteToMessageDecoder是一ChannelInboundHandler的實現類,可以非常容易的處理分片問題。

(2)當新的數據到達時,ByteToMessageDecoder講數據存儲在一個內在的累積buffer中,調用decode()方法進行處理

(3)decode()根據接收到的字節大小進行判定,若滿4個字節則增加一個對象到list。

(4)如果decode()增加了個一個out的對象,意味着decoder編碼成功。ByteToMessageDecoder會丟棄累積buffer中已經讀過的部分。若out.add(null),decoder即停止。

由於ChannlePipeline中增加了一個handler,因此我們必須修改ChannelInitializer為:

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

如果你是個冒險者,你可能會嘗試ReplayingDecoder。該handler進一步簡化了decoder。

public class TimeDecoder extends ReplayingDecoder<VoidEnum> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out, VoidEnum state) {
        out.add(in.readBytes(4));
    }
}

另外關於decode的例子可以參考一下兩個包

利用POJO代替ByteBuf

  目前我們看到的例子均是使用ByteBuf作為協議消息的數據結構。在這一節,我們將使用POJO來代替ByteBuf來改善Time協議的客戶端和服務器。

  使用POJO的有點是十分明顯的,通過分離出解析ByteBuf中數據的代碼,handler會變得更加可維護和重用。在Time的客戶端和服務器的例子中,我們只讀32字節的integer且這並不是一個主要的直接應用ByteBuf的案例。然而,你會發現當你實現一個真正的協議時,做這樣的分離是十分必要的。

首先,我們先定義一個新類型,UnixTime

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final int value;

    public UnixTime() {
        this((int) (System.currentTimeMillis() / 1000L + 2208988800L));
    }

    public UnixTime(int value) {
        this.value = value;
    }

    public int value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

我們重新編寫TimeDecoder來禪師一個UnixTIme。

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readInt()));
}

然后我們更新decoder,TimeClientHandler就不再使用ByteBuf了

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

是不是非常的簡潔優雅。Server端同樣如此。首先更新ServerHandler

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

現在需要編寫解碼部分,encoder是ChannelOutbountHandler的實現類,將UnixTIme轉化成下層的ByteBuf,編寫encoder要比編寫decoder簡單的多,因為此時不必考慮tcp包分片的問題。

 

package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt(m.value());
        ctx.write(encoded, promise); // (1)
    }
}

(1)這一行中有些十分重要的內容

  首先,我們將原始的ChannelPromis找原來的樣子傳輸,以保證Netty在寫入鏈路時能夠正確的標記成功或失敗。

  其次,並不調用ctx.flush().handler有一個默認的flush方法,若想每次寫都flush則須:ctx.write(encoded,false,promise);或ctx.writeAndFlush(encode,promise);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM