Netty 3升級Netty4實踐


參考

修改點

1、按標准的啟動流程模板將Netty 3替換為Netty 4

2、Handler替換,需要考慮有Netty4的變化,將Handler的功能分析清楚並使用Netty 4的方式實現

3、Netty 3到Netty4的主要修改

  • ChannelBuffer -> ByteBuf

  • ChannelBuffers -> PooledByteBufferAllocator (需要注意使用完成后釋放buffer)或UnpooledByteBufferAllocator

  • 解碼器 FremeDecoder -> ByteToMessageDecoder

  • 編碼器 OneToOneEncoder -> MessageToByteEncoder

版本

netty 4的項目結構變化

二進制JAR已拆分為多個子模塊,因此用戶可以從類路徑中排除不必要的功能。當前結構如下:

Artifact ID 描述


netty-parent Maven父POM
netty-common 工具類和日志框架
netty-buffer 替代 java.nio.ByteBuffer的ByteBuf API
netty-transport Channel API和核心傳輸core transports
netty-transport-rxtx Rxtx傳輸
netty-transport-sctp SCTP傳輸
netty-transport-udt UDT傳輸
netty-handler 有用的ChannelHandler實現
netty-codec 有助於編寫編碼器和解碼器的編解碼器框架
netty-codec-http 與HTTP,Web套接字,SPDY和RTSP相關的編解碼器
netty-codec-socks 與SOCKS協議相關的編解碼器
netty-all 結合了以上所有模塊的多合一JAR
netty-tarball Tarball發行版本
netty-example 例子
netty-testsuite-* 集成測試的集合
netty-microbench 微基准

現在,所有Artifacts(除了netty-all.jar)都是OSGi捆綁包,可以在您喜歡的OSGi容器中使用。

目標版本及依賴的artifactId

升級為4.1.47.Final,沒有依賴軟件和安全漏洞,提供了更豐富的編解碼器(包括SMTP)

注意需要引入netty-all,不要單獨引用netty的分包

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.47.Final</version>
</dependency>

Netty中Channel、ChannelPipeline、ChannelHandler、ChannelHandlerContext之間的關系

Netty中Channel、ChannelPipeline、ChannelHandler、ChannelHandlerContext之間的關系

  • 每一個Channel被創建,就會生成對應的一個ChannelPipeline和它綁定。

  • ChannelPipeline中包含了一個處理該Channel消息的ChannelHandler鏈。

  • 當每一個ChannelHandler被注冊到該ChannelPipeline中就會生成一個對應的 ChannelHandlerContext,和該ChannelHandler進行綁定。

  • 一個ChannelHandler可以從屬於(注冊到)多個ChannelPipeline。所以,一個ChannelHandler可以綁定多個ChannelHandlerContext。不過,這樣的ChannelHandler必須使用@Sharable注解標注,保證它的線程安全性,否則試圖將它注冊到多個ChannelHandlerPipeline中時將會拋出異常。

Netty 4修改項

新的 bootstrap API

bootstrap API已經被重寫,盡管它的目的還是一樣;它執行需要配置和運行服務器或客戶端程序的典型步驟,通常能在樣板代碼里找到。

新的bootstrap同樣采取了流式接口(fluent interface)。

核心修改

啟動服務器方式修改

    public static void main(String[] args) throws Exception {
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .localAddress(8080)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(handler1, handler2, ...);
                        }
                    });
            // Start the server.
            ChannelFuture f = b.bind().sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            // Wait until all threads are terminated.
            bossGroup.terminationFuture().sync();
            workerGroup.terminationFuture().sync();
        }
    }

ChannelPipelineFactory → ChannelInitializer

就像你在在上面的例子注意到的一樣,ChannelPipelineFactory 不再存在了。而是由 ChannelInitializer來替換,它給予了Channel 和 ChannelPipeline 配置的更多控制。

ChannelPipeline 不再讓用戶創建。ChannelPipeline 由 Channel自動創建。

核心修改

設置childHandler時通過如下方法添加pipeline及handler

.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(handler1, handler2, ...);

}
});

Inbound事件

繼承SimpleChannelInboundHandler<I>,覆寫channelRead0方法,Netty會自動釋放泛型資源資源。

核心修改

服務端需要繼承SimpleChannelInboundHandler,同時需要指定泛型類型為

ImapChannelInboundHandler->Imap->SimpleChannelInboundHandler<ImapMessage>

BasicChannelInboundHandler->POP3/SMTP->SimpleChannelInboundHandler< ByteBuf>

注意釋放msg

管道中的每個 inbound (a.k.a. upstream) handler 必須release接收到的消息. Netty不會自動release 它們.

核心修改

繼承ChannelInboundHandlerAdapter,當我們需要釋放ByteBuf相關內存的時候,也可以使用 ReferenceCountUtil#release()。如果繼承SimpleChannelInboundHandler,則其會自動釋放消息資源。

ChannelHandler 不需要 event object

4.0完全移除了event object,取而代之的是強類型的方法調用。 3.x 包含處理所有事件的handler method如 handleUpstream() 和 handleDownstream(), 但Netty 4.0中 每個 event 類型都有它自己的handler method:

// Before:
void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e);
void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e);
 
// After:
void channelRegistered(ChannelHandlerContext ctx);
void channelUnregistered(ChannelHandlerContext ctx);
void channelActive(ChannelHandlerContext ctx);
void channelInactive(ChannelHandlerContext ctx);
void channelRead(ChannelHandlerContext ctx, Object message);
 
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise);
void connect(
        ChannelHandlerContext ctx, SocketAddress remoteAddress,
        SocketAddress localAddress, ChannelPromise promise);
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise);
void close(ChannelHandlerContext ctx, ChannelPromise promise);
void deregister(ChannelHandlerContext ctx, ChannelPromise promise);
void write(ChannelHandlerContext ctx, Object message, ChannelPromise promise);
void flush(ChannelHandlerContext ctx);
void read(ChannelHandlerContext ctx);

ChannelHandlerContext 也進行了更改也體現了上述變化:

// Before:
ctx.sendUpstream(evt);
 
// After:
ctx.fireChannelRead(receivedMessage);
核心修改

連接讀寫消息的獲取:msg.getMessage直接取msg

讀事件傳播:ctx.sendUpstream(evt); ->ctx.fireChannelRead(receivedMessage);

簡化channel 狀態模型

在3.x中,當一個新的Channel被創建並連接成功,至少三個ChannelStateEvent會被觸發:channelOpen、channelBound以及channelConnected.當一個Channel關閉時,也至少有三個事件會被觸發:channelDisconnected、channelUnbound以及channelClosed.

但是,觸發這么多事件的意義並不那么大。更有用的是當一個Channel進入可讀或可寫的狀態時通知用戶。

channelOpen, channelBound, 和 channelConnected 被合並到 channelActive. channelDisconnected, channelUnbound, 和 channelClosed 被合並到 channelInactive.

同樣Channel.isBound() 和 isConnected() 也被合並為isActive().

需要注意的是,channelRegistered and channelUnregistered 這兩個事件與channelOpen and channelClosed並不等。它們是在支持Channel的動態注冊、注銷以及再注冊時被引入的新的狀態。

核心修改

channelBound和channelConnected方法的處理修改為channelActive

channelClosed方法的處理修改為channelInactive

channel.setReadable(false); ->channel.config().setAutoRead(false); 設置是否允許通道讀 參考

線程模型變化-沒有 ExecutionHandler 了

它被放入核心代碼中。在你往ChannelPipeline增加ChannelHandler 時你可以指定一個EventExecutor, 這樣Pipeline總是使用這個EventExecutor來調用這個新增加的 ChannelHandler的handler方法。

核心修改

去掉顯式初始化的ExecutionHandler

write() 不會自動 flush

4.0 引入了新的操作 flush() 它可以顯示地將Channel輸出緩存輸出. write()操作並不會自動 flush. 你可以把它想象成java.io.BufferedOutputStream, 除了 它工作於消息級這一點.

由於這個改變, 你必須萬分小心,寫入數據后不要忘了調用 ctx.flush() . 當然你也可以使用一個更直接的方法 writeAndFlush().

核心修改

寫入數據后不要忘了調用 ctx.flush(),或直接調用writeAndFlush()

編解碼框架

編碼解碼器框架里有實質性的內部改變, 因為4.0需要一個handler來創建和管理它的buffer然而,從用戶角度來看這些變化並不大。

核心編解碼類移入到 io.netty.handler.codec 包下

解碼器的作用

以IMAP為例,在Netty 3.10.6.Final的程序中,IMAPServer#createPipelineFactory創建的pipeline最后先添加了解碼器,再添加了核心處理器

在解碼器ImapRequestFrameDecoder中重寫了decode方法,最終返回ImapMessage類型,之后消息在pipeline傳遞到核心處理器ImapChannelUpstreamHandler#messageReceived方法,在接收的消息事件MessageEvent中,通過getMessage方法獲取到解碼后的類型

ImapMessage message = (ImapMessage) e.getMessage();

如果去掉解碼器ImapRequestFrameDecoder,則傳遞的是默認的消息類型BigEndianHeapChannelBuffer(父類是ChannelBuffer)

核心修改

FrameDecoder 被重新命名為 ByteToMessageDecoder.

OneToOneEncoder和OneToOneDecoder被MessageToMessageEncoder 和 MessageToMessageDecoder 取代.

decode(), decodeLast(), encode() 的方法簽名有些許改變以便支持泛型, 也移除了一些冗余的參數。

需要重新編寫解碼器ImapRequestFrameDecoder,使用Netty 4的

protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

進行解碼

  • in:需要解碼的二進制數據。

  • List<Object> out:解碼后的有效報文列表,我們需要將解碼后的報文添加到這個List中。之所以使用一個List表示,是因為考慮到粘包問題,因此入參的in中可能包含多個有效報文。當然,也有可能發生了拆包,in中包含的數據還不足以構成一個有效報文,此時不往List中添加元素即可

參考

之后可以在核心處理器中獲取解碼后的Message

AttributeMap

為了響應用戶需求,您可以將任何對象附加到Channel和ChannelHandlerContext。增加了一個名為AttributeMap的新接口,該接口被Channel和ChannelHandlerContext實現。同時ChannelLocalChannel.attachment被刪除了。當關聯Channel被垃圾收集時,這些屬性也將被垃圾收集。因此,可以沒有顯式去掉屬性的方法。(關閉channel

參考

每一個ChannelHandlerContext都有屬於自己的上下文,也就說每一個ChannelHandlerContext上如果有AttributeMap都是綁定上下文的,也就說如果A的ChannelHandlerContext中的AttributeMap,B的ChannelHandlerContext是無法讀取到的(Attribute<NettyChannel> attr = ctx.attr(NETTY_CHANNEL_KEY); )

但是Channel上的AttributeMap就是大家共享的,每一個ChannelHandler都能獲取到(Attribute<NettyChannel> attr = ctx.channel().attr(NETTY_CHANNEL_KEY); )

Netty團隊在4.1版本之后,在每一個Channel內部僅僅保留一個map,確保每個key之間的唯一性,因此每一個Channel不需要多個map,因此直接使用Channel的attr添加。

Attribute<NettyChannel> attr = ctx.channel().attr(NETTY_CHANNEL_KEY);

核心修改
private AttributeKey<ImapSession> getAttributeKey(ChannelHandlerContext ctx) {
    return AttributeKey.valueOf(ctx.channel().id().asLongText() + "," + ctx.channel().remoteAddress().toString());
}
// 設置屬性
ctx.channel().attr(getAttributeKey(ctx)).set(imapsession);
NettyConstants的attributes屬性整改(ChannelLocal)

在NettyConstants中定義ChannelLocal<Object> attributes = new ChannelLocal<>();

屬性,通過分析可知,該屬性的作用是添加自定義屬性到attributes中,在pipleline的handler之間共享,可以使用Netty 4中的AttributeMap綁定屬性到channel上。

另,注意對於Netty服務器來說,新來一個連接即建立了一個Channel,每個Channel都新建了一個pipeline與之對應且唯一,在pipeline中是一組handler,且不同的Channel的pipeline和handler都是新創建的,互不干擾。每個handler都有一個上下文ChannelHandlerContext與之唯一對應(handler不是Sharable)。

Channel.attachment整改

attachment也是在channel間共享的數據,可以使用AttributeMap進行替代。

由於attchment原本是DefaultChannelHandlerContext的屬性,Netty 4已經去掉。可以通過將需要設置的對象的地址(object.toString())作為key,對象本身作為內容存入AttributeMap。

由於一個不是@Sharable的ChannelHandler唯一確定一個ChannelHandlerContext,因此可以使用該上下文的地址作為key標識attachment

ByteBuf

Netty 4.x -- ByteBuf

ByteBuf轉為String

buf.toString(CharsetUtil.UTF_8)

核心修改

ChannelBuffers. wrappedBuffer修改為Unpooled.wrappedBuffer

channel的關閉

使用如下方式確保緩沖區內容寫出后關閉channel,如果直接調用channel.close方法會立即關閉,可能會有數據在緩沖區未寫出。

private void bufferFlushOut(Channel channel) {
    if (channel.isActive()) {
        CompositeByteBuf compBuf = Unpooled.compositeBuffer();
        // 初始化一個空的ByteBuf
        ByteBuf heapBuf = Unpooled.buffer(0);
        channel.writeAndFlush(heapBuf).addListener(ChannelFutureListener.CLOSE);
    }
}

遷移服務端的基本點

  • 使用新的bootstrap API重寫 FactorialServer.run() 方法.

  • 不再使用 ChannelFactory 。初始化一個 NioEventLoopGroup (一個用來接受連接,其它的用來處理接受后的連接.

  • 重命名 FactorialServerPipelineFactory 為 FactorialServerInitializer. 讓它擴展 ChannelInitializer.

  • 不創建一個 ChannelPipeline, 而是通過 Channel.pipeline()得到它.

  • 讓 FactorialServerHandler 擴展 ChannelInboundHandlerAdapter.

  • 用channelInactive()替換 channelDisconnected() .

  • 不再使用handleUpstream().

  • 將 messageReceived() 命名為 channelRead(), 並相應的調整方法簽名.

  • 用 ctx.writeAndFlush()替換 ctx.write() .

  • 讓 BigIntegerDecoder 擴展 ByteToMessageDecoder.

  • 讓 NumberEncoder 擴展 MessageToByteEncoder.

  • encode() 不再返回一個buffer. 由ByteToMessageDecoder負責將編碼的數據填入到buffer中.

空閑檢測handler

ImapIdleStateHandler原來實現IdleStateAwareChannelHandler,Netty 4已經沒有該類,需重寫。ImapIdleStateHandler類主要作用是檢測連接的客戶端是否空閑,並執行相應的動作。

Netty 3和Netty 5的心跳機制

玩轉Netty -- 從Netty3升級到Netty4

IdleStateAwareChannelHandlerChannelInboundHandlerAdapter

修改繼承的實現類cp.addLast("heartbeatHandler", new HeartbeatHandler());


public class ImapIdleStateHandler extends SimpleChannelInboundHandler<ByteBuf> implements NettyConstants {
userEventTriggered
channelRead0
}

類似的ImapHeartbeatHandler也這樣處理

IdleStateHandler

IdleStateAwareChannelHandler已經去除,但 IdleStateHandler類還存在,

cp.addLast("idleTimeoutHandler", new IdleStateHandler(getTimer(), getClientIdleTimeout().toMillis(), NO_WRITER_IDLE_TIMEOUT, NO_ALL_IDLE_TIMEOUT, TimeUnit.MILLISECONDS));
修改為
cp.addLast("idleTimeoutHandler", new IdleStateHandler( NO_WRITER_IDLE_TIMEOUT, NO_WRITER_IDLE_TIMEOUT, NO_ALL_IDLE_TIMEOUT, TimeUnit.MILLISECONDS));

編解碼器

ImapRequestFrameDecoder

注意點

非@Sharable的handler必須每次new

如果是非@Sharable的handler,每次添加到pipeline的時候必須new出來,否則會報錯

is not a @Sharable handler。

主要原因在於,每個連接(Channel)接入服務器的時候都會初始化一個pipeline,如果pipeline中的handler是類成員且在實例化的時候初始化,則只會有一個handler。

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1

SimpleChannelInboundHandler 它會自動進行一次釋放(即引用計數減1).

參考

繼承了SimpleChannelInboundHandler的Handler都會自動釋放消息資源

繼承了SimpleChannelInboundHandler,類會自動釋放資源

如果繼承的是ChannelInboundHandlerAdapter,則需要自己釋放(只能在不需要繼續傳遞的handler釋放),如果不釋放且繼續傳遞,則會在TailContext中釋放(io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object))

Netty 4服務端啟動

Netty官方服務器標准樣例中,服務器啟動最后調用了 f.channel().closeFuture().sync();

該方法會阻塞主線程,導致只能啟動一個服務器,因此在代碼中不能添加這行代碼。

服務器的關閉的兩行代碼也要放到unbind方法中。

ChannelOption參數詳解

1、ChannelOption.SO_BACKLOG

ChannelOption.SO_BACKLOG對應的是tcp/ip協議listen函數中的backlog參數,函數listen(int socketfd,int backlog)用來初始化服務端可連接隊列,服務端處理客戶端連接請求是順序處理的,所以同一時間只能處理一個客戶端連接,多個客戶端來的時候,服務端將不能處理的客戶端連接請求放在隊列中等待處理,backlog參數指定了隊列的大小

2、ChannelOption.SO_REUSEADDR

ChanneOption.SO_REUSEADDR對應於套接字選項中的SO_REUSEADDR,這個參數表示允許重復使用本地地址和端口,比如,某個服務器進程占用了TCP的80端口進行監聽,此時再次監聽該端口就會返回錯誤,使用該參數就可以解決問題,該參數允許共用該端口,這個在服務器程序中比較常使用,比如某個進程非正常退出,該程序占用的端口可能要被占用一段時間才能允許其他進程使用,而且程序死掉以后,內核一需要一定的時間才能夠釋放此端口,不設置SO_REUSEADDR就無法正常使用該端口。

3、ChannelOption.SO_KEEPALIVE

Channeloption.SO_KEEPALIVE參數對應於套接字選項中的SO_KEEPALIVE,該參數用於設置TCP連接,當設置該選項以后,連接會測試鏈接的狀態,這個選項用於可能長時間沒有數據交流的連接。當設置該選項以后,如果在兩小時內沒有數據的通信時,TCP會自動發送一個活動探測數據報文

4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF

ChannelOption.SO_SNDBUF參數對應於套接字選項中的SO_SNDBUF,ChannelOption.SO_RCVBUF參數對應於套接字選項中的SO_RCVBUF這兩個參數用於操作接收緩沖區和發送緩沖區的大小,接收緩沖區用於保存網絡協議站內收到的數據,直到應用程序讀取成功,發送緩沖區用於保存發送數據,直到發送成功。

5、ChannelOption.SO_LINGER

ChannelOption.SO_LINGER參數對應於套接字選項中的SO_LINGER,Linux內核默認的處理方式是當用戶調用close()方法的時候,函數返回,在可能的情況下,盡量發送數據,不一定保證會發生剩余的數據,造成了數據的不確定性,使用SO_LINGER可以阻塞close()的調用時間,直到數據完全發送

6、ChannelOption.TCP_NODELAY

ChannelOption.TCP_NODELAY參數對應於套接字選項中的TCP_NODELAY,該參數的使用與Nagle算法有關,Nagle算法是將小的數據包組裝為更大的幀然后進行發送,而不是輸入一次發送一次,因此在數據包不足的時候會等待其他數據的到了,組裝成大的數據包進行發送,雖然該方式有效提高網絡的有效負載,但是卻造成了延時,而該參數的作用就是禁止使用Nagle算法,使用於小數據即時傳輸,於TCP_NODELAY相對應的是TCP_CORK,該選項是需要等到發送的數據量最大的時候,一次性發送數據,適用於文件傳輸。

7、IP_TOS

IP參數,設置IP頭部的Type-of-Service字段,用於描述IP包的優先級和QoS選項。

8、ALLOW_HALF_CLOSURE

Netty參數,一個連接的遠端關閉時本地端是否關閉,默認值為False。值為False時,連接自動關閉;為True時,觸發ChannelInboundHandler的userEventTriggered()方法,事件為ChannelInputShutdownEvent。

Netty的future.channel().closeFuture().sync();到底有什么用?

主線程執行到這里就 wait 子線程結束,子線程才是真正監聽和接受請求的,closeFuture()是開啟了一個channel的監聽器,負責監聽channel是否關閉的狀態,如果監聽到channel關閉了,子線程才會釋放,syncUninterruptibly()讓主線程同步等待子線程結果

參考1

如果我們不想加f.channel().closeFuture().sync()又想保證程序正常運行怎么辦,去掉finally 里面關閉nettyserver的語句即可。

參考2

對javax.servlet-api的依賴

注意項目中依賴javax.servlet-api的scope是provided的,打包的時候不會將該包打到程序中,因此需要確保運行環境中有該包。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM