netty 入門二 (傳輸bytebuf 或者pojo)


基於流的數據傳輸:
在基於流的傳輸(如TCP / IP)中,接收的數據被存儲到套接字接收緩沖器中。 不幸的是,基於流的傳輸的緩沖區不是數據包的隊列,而是字節隊列。 這意味着,即使您將兩個消息作為兩個獨立數據包發送,操作系統也不會把它們視為兩個消息,而只是一堆字節。 因此,您無法保證您所讀取的內容正是您遠程發送信息時的正確切分。 例如,假設操作系統的TCP / IP堆棧已經收到三個數據包:



由於基於流的協議,讀取的數據分片信息可能如下:
 

因此,無論服務器端或客戶端如何,接收部分都應將接收到的數據進行碎片整理,以將其應用到邏輯上容易理解的一個或多個有意義的幀中。 在上述示例的情況下,接收到的數據應該如下所示:

 


解決方案一:

現在讓我們回到TIME時間客戶端的例子。 我們在這里也有同樣的問題。 一個32位整數是非常少量的數據,它不可能經常被分段。 然而,問題是可以分散,碎片化的可能性會隨着流量的增加而增加。
簡單的解決方案是創建內部累積緩沖區,並等待所有4個字節都被接收到內部緩沖區。 以下是修改的TimeClientHandler實現,可以解決問題:

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

ChannelHandler有兩個生命周期偵聽器方法:handlerAdded()和handlerRemoved()。 只要阻塞時間不長,您可以執行任意的初始化任務。
首先,所有收到的數據應該被累積到buf。然后,處理程序必須檢查buf是否有足夠的數據,在此示例中為4個字節,然后繼續執行業務邏輯。 否則,Netty會在更多數據到達時再次調用channelRead()方法,最終所有4個字節都將被累積。

解決方案二:

你會發現第一種方式缺少靈活性,無法應對可變的長度字段,ChannelInboundHandler實現將很快變得不可靠。
您可能已經注意到,您可以向ChannelPipeline添加多個ChannelHandler,因此,您可以將一個單一的ChannelHandler拆分成多個模塊化通道,以減少應用程序的復雜性。 例如,您可以將TimeClientHandler拆分為兩個處理程序:
處理碎片問題的TimeDecoder,
初始簡單版本的TimeClientHandler。
幸運的是,Netty提供了一個可擴展的類,可幫助您編寫開箱即用的第一個:

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes(4)); // (4)
    }
}

1,ByteToMessageDecoder是ChannelInboundHandler的一個實現,它可以輕松應對碎片問題。

2,當接收到新數據時,ByteToMessageDecoder會在內部維護的累積緩沖區中調用decode()方法。
decode()累積到足夠的數據到緩存區還會刷出數據。 當接收到更多數據時,ByteToMessageDecoder將持續調用decode()。
3,如果decode()將對象刷出去,則表示解碼器成功解碼了消息,ByteToMessageDecoder將丟棄累積緩沖區的已經刷出的讀取部分。
4,ByteToMessageDecoder將繼續調用decode()方法,直到它沒有讀取到更多的數據。

現在我們有另一個處理程序插入ChannelPipeline,我們應該修改TimeClient中的ChannelInitializer實現:

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

如果你是一個冒險的人,你可能想嘗試ReplayingDecoder,這更簡單的解碼器。 但是,您需要參考API參考資料。

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

此外,Netty還提供開箱即用的解碼器,使您能夠輕松實現大多數協議,並幫助您避免使用單一的不可維護的處理程序實現。 有關更多詳細示例,請參閱以下軟件包:

io.netty.example.factorial為二進制協議
io.netty.example.telnet用於基於文本行的協議。

==========================================================================================
讀取實體(pojo)代替二進制流


我們迄今為止審查的所有示例都使用ByteBuf作為協議消息的主要數據結構。 在本節中,我們將改進TIME協議客戶端和服務器示例,以使用POJO而不是ByteBuf。

在您的ChannelHandlers中使用POJO的優勢是顯而易見的 您的處理程序變得更加可維護,並且可以通過將從ByteBuf中提取信息的代碼從處理程序中分離出來來重新使用。 在TIME客戶端和服務器示例中,我們只讀取一個32位整數,直接使用ByteBuf不是一個主要問題。 但是,您將發現在實現真實世界協議時需要進行分離。
首先,讓我們定義一個名為UnixTime的新類型。

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

我們現在可以修改TimeDecoder來生成UnixTime而不是ByteBuf。

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readUnsignedInt()));
}

使用更新的解碼器,TimeClientHandler不再使用ByteBuf:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

更簡單和優雅,對吧? 同樣的技術可以在服務器端應用。 這次我們先來更新TimeServerHandler:

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

現在,唯一缺少的部分是一個編碼器,它是一個ChannelOutboundHandler的實現,將UnixTime轉換成一個ByteBuf。 它比編寫解碼器要簡單得多,因為在編碼消息時不需要處理數據包碎片和匯編。

package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}

這一行中有很多重要的事情。
首先,我們按原樣傳遞原來的ChannelPromise,以便Netty將編碼數據實際寫入電線時將其標記為成功或失敗。
其次,我們沒有調用ctx.flush()。 有一個單獨的處理方法void flush(ChannelHandlerContext ctx),用於覆蓋flush()操作。
為了進一步簡化,您可以使用MessageToByteEncoder:

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

最后一個任務是將TimeEncoder插入到TimeServerHandler之前的服務器端的ChannelPipeline中,這是一個簡單的練習。

===============================================================================

關閉你的應用

關閉Netty應用程序通常就像關閉通過shutdownGracefully()創建的所有EventLoopGroups一樣簡單。 它返回一個將EventLoopGroup完全終止並且屬於該組的所有通道已關閉的通知您的未來。

總結

在本章中,我們快速瀏覽了Netty,並展示了如何在Netty上編寫完整的網絡應用程序。
有關Netty的更多詳細信息在即將到來的章節。 我們還鼓勵您查看io.netty.example包中的Netty示例。
還請注意,社區一直在等待您的問題和想法,以幫助您,並根據您的反饋不斷改進Netty及其文檔。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM