參考
修改點
1、按標准的啟動流程模板將Netty 3替換為Netty 4
2、Handler替換,需要考慮有Netty4的變化,將Handler的功能分析清楚並使用Netty 4的方式實現
3、Netty 3到Netty4的主要修改
-
ChannelBuffer -> ByteBuf
-
ChannelBuffers -> PooledByteBufferAllocator (需要注意使用完成后釋放buffer)或UnpooledByteBufferAllocator
-
解碼器 FremeDecoder -> ByteToMessageDecoder
-
編碼器 OneToOneEncoder -> MessageToByteEncoder
版本
netty 4的項目結構變化
二進制JAR已拆分為多個子模塊,因此用戶可以從類路徑中排除不必要的功能。當前結構如下:
Artifact ID 描述
netty-parent Maven父POM
netty-common 工具類和日志框架
netty-buffer 替代 java.nio.ByteBuffer的ByteBuf API
netty-transport Channel API和核心傳輸core transports
netty-transport-rxtx Rxtx傳輸
netty-transport-sctp SCTP傳輸
netty-transport-udt UDT傳輸
netty-handler 有用的ChannelHandler實現
netty-codec 有助於編寫編碼器和解碼器的編解碼器框架
netty-codec-http 與HTTP,Web套接字,SPDY和RTSP相關的編解碼器
netty-codec-socks 與SOCKS協議相關的編解碼器
netty-all 結合了以上所有模塊的多合一JAR
netty-tarball Tarball發行版本
netty-example 例子
netty-testsuite-* 集成測試的集合
netty-microbench 微基准
現在,所有Artifacts(除了netty-all.jar)都是OSGi捆綁包,可以在您喜歡的OSGi容器中使用。
目標版本及依賴的artifactId
升級為4.1.47.Final,沒有依賴軟件和安全漏洞,提供了更豐富的編解碼器(包括SMTP)
注意需要引入netty-all,不要單獨引用netty的分包
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.47.Final</version>
</dependency>
Netty中Channel、ChannelPipeline、ChannelHandler、ChannelHandlerContext之間的關系
Netty中Channel、ChannelPipeline、ChannelHandler、ChannelHandlerContext之間的關系
-
每一個Channel被創建,就會生成對應的一個ChannelPipeline和它綁定。
-
ChannelPipeline中包含了一個處理該Channel消息的ChannelHandler鏈。
-
當每一個ChannelHandler被注冊到該ChannelPipeline中就會生成一個對應的 ChannelHandlerContext,和該ChannelHandler進行綁定。
-
一個ChannelHandler可以從屬於(注冊到)多個ChannelPipeline。所以,一個ChannelHandler可以綁定多個ChannelHandlerContext。不過,這樣的ChannelHandler必須使用@Sharable注解標注,保證它的線程安全性,否則試圖將它注冊到多個ChannelHandlerPipeline中時將會拋出異常。
Netty 4修改項
新的 bootstrap API
bootstrap API已經被重寫,盡管它的目的還是一樣;它執行需要配置和運行服務器或客戶端程序的典型步驟,通常能在樣板代碼里找到。
新的bootstrap同樣采取了流式接口(fluent interface)。
核心修改
啟動服務器方式修改
public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.localAddress(8080)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(handler1, handler2, ...);
}
});
// Start the server.
ChannelFuture f = b.bind().sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
// Wait until all threads are terminated.
bossGroup.terminationFuture().sync();
workerGroup.terminationFuture().sync();
}
}
ChannelPipelineFactory → ChannelInitializer
就像你在在上面的例子注意到的一樣,ChannelPipelineFactory 不再存在了。而是由 ChannelInitializer來替換,它給予了Channel 和 ChannelPipeline 配置的更多控制。
ChannelPipeline 不再讓用戶創建。ChannelPipeline 由 Channel自動創建。
核心修改
設置childHandler時通過如下方法添加pipeline及handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(handler1, handler2, ...);
}
});
Inbound事件
繼承SimpleChannelInboundHandler<I>,覆寫channelRead0方法,Netty會自動釋放泛型資源資源。
核心修改
服務端需要繼承SimpleChannelInboundHandler,同時需要指定泛型類型為
ImapChannelInboundHandler->Imap->SimpleChannelInboundHandler<ImapMessage>
BasicChannelInboundHandler->POP3/SMTP->SimpleChannelInboundHandler< ByteBuf>
注意釋放msg
管道中的每個 inbound (a.k.a. upstream) handler 必須release接收到的消息. Netty不會自動release 它們.
核心修改
繼承ChannelInboundHandlerAdapter,當我們需要釋放ByteBuf相關內存的時候,也可以使用 ReferenceCountUtil#release()。如果繼承SimpleChannelInboundHandler,則其會自動釋放消息資源。
ChannelHandler 不需要 event object
4.0完全移除了event object,取而代之的是強類型的方法調用。 3.x 包含處理所有事件的handler method如 handleUpstream() 和 handleDownstream(), 但Netty 4.0中 每個 event 類型都有它自己的handler method:
// Before:
void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e);
void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e);
// After:
void channelRegistered(ChannelHandlerContext ctx);
void channelUnregistered(ChannelHandlerContext ctx);
void channelActive(ChannelHandlerContext ctx);
void channelInactive(ChannelHandlerContext ctx);
void channelRead(ChannelHandlerContext ctx, Object message);
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise);
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise);
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise);
void close(ChannelHandlerContext ctx, ChannelPromise promise);
void deregister(ChannelHandlerContext ctx, ChannelPromise promise);
void write(ChannelHandlerContext ctx, Object message, ChannelPromise promise);
void flush(ChannelHandlerContext ctx);
void read(ChannelHandlerContext ctx);
ChannelHandlerContext
也進行了更改也體現了上述變化:
// Before:
ctx.sendUpstream(evt);
// After:
ctx.fireChannelRead(receivedMessage);
核心修改
連接讀寫消息的獲取:msg.getMessage直接取msg
讀事件傳播:ctx.sendUpstream(evt); ->ctx.fireChannelRead(receivedMessage);
簡化channel 狀態模型
在3.x中,當一個新的Channel被創建並連接成功,至少三個ChannelStateEvent會被觸發:channelOpen、channelBound以及channelConnected.當一個Channel關閉時,也至少有三個事件會被觸發:channelDisconnected、channelUnbound以及channelClosed.
但是,觸發這么多事件的意義並不那么大。更有用的是當一個Channel進入可讀或可寫的狀態時通知用戶。
channelOpen, channelBound, 和 channelConnected 被合並到 channelActive. channelDisconnected, channelUnbound, 和 channelClosed 被合並到 channelInactive.
同樣Channel.isBound() 和 isConnected() 也被合並為isActive().
需要注意的是,channelRegistered and channelUnregistered 這兩個事件與channelOpen and channelClosed並不等。它們是在支持Channel的動態注冊、注銷以及再注冊時被引入的新的狀態。
核心修改
channelBound和channelConnected方法的處理修改為channelActive
channelClosed方法的處理修改為channelInactive
channel.setReadable(false); ->channel.config().setAutoRead(false); 設置是否允許通道讀 參考
線程模型變化-沒有 ExecutionHandler 了
它被放入核心代碼中。在你往ChannelPipeline增加ChannelHandler 時你可以指定一個EventExecutor, 這樣Pipeline總是使用這個EventExecutor來調用這個新增加的 ChannelHandler的handler方法。
核心修改
去掉顯式初始化的ExecutionHandler
write() 不會自動 flush
4.0 引入了新的操作 flush() 它可以顯示地將Channel輸出緩存輸出. write()操作並不會自動 flush. 你可以把它想象成java.io.BufferedOutputStream, 除了 它工作於消息級這一點.
由於這個改變, 你必須萬分小心,寫入數據后不要忘了調用 ctx.flush() . 當然你也可以使用一個更直接的方法 writeAndFlush().
核心修改
寫入數據后不要忘了調用 ctx.flush(),或直接調用writeAndFlush()
編解碼框架
編碼解碼器框架里有實質性的內部改變, 因為4.0需要一個handler來創建和管理它的buffer然而,從用戶角度來看這些變化並不大。
核心編解碼類移入到 io.netty.handler.codec 包下
解碼器的作用
以IMAP為例,在Netty 3.10.6.Final的程序中,IMAPServer#createPipelineFactory創建的pipeline最后先添加了解碼器,再添加了核心處理器
在解碼器ImapRequestFrameDecoder中重寫了decode方法,最終返回ImapMessage類型,之后消息在pipeline傳遞到核心處理器ImapChannelUpstreamHandler#messageReceived方法,在接收的消息事件MessageEvent中,通過getMessage方法獲取到解碼后的類型
ImapMessage message = (ImapMessage) e.getMessage();
如果去掉解碼器ImapRequestFrameDecoder,則傳遞的是默認的消息類型BigEndianHeapChannelBuffer(父類是ChannelBuffer)
核心修改
FrameDecoder 被重新命名為 ByteToMessageDecoder.
OneToOneEncoder和OneToOneDecoder被MessageToMessageEncoder 和 MessageToMessageDecoder 取代.
decode(), decodeLast(), encode() 的方法簽名有些許改變以便支持泛型, 也移除了一些冗余的參數。
需要重新編寫解碼器ImapRequestFrameDecoder,使用Netty 4的
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
進行解碼
-
in:需要解碼的二進制數據。
-
List<Object> out:解碼后的有效報文列表,我們需要將解碼后的報文添加到這個List中。之所以使用一個List表示,是因為考慮到粘包問題,因此入參的in中可能包含多個有效報文。當然,也有可能發生了拆包,in中包含的數據還不足以構成一個有效報文,此時不往List中添加元素即可
之后可以在核心處理器中獲取解碼后的Message
AttributeMap
為了響應用戶需求,您可以將任何對象附加到Channel和ChannelHandlerContext。增加了一個名為AttributeMap的新接口,該接口被Channel和ChannelHandlerContext實現。同時ChannelLocal和Channel.attachment被刪除了。當關聯Channel被垃圾收集時,這些屬性也將被垃圾收集。因此,可以沒有顯式去掉屬性的方法。(關閉channel)
每一個ChannelHandlerContext都有屬於自己的上下文,也就說每一個ChannelHandlerContext上如果有AttributeMap都是綁定上下文的,也就說如果A的ChannelHandlerContext中的AttributeMap,B的ChannelHandlerContext是無法讀取到的(Attribute<NettyChannel> attr = ctx.attr(NETTY_CHANNEL_KEY); )
但是Channel上的AttributeMap就是大家共享的,每一個ChannelHandler都能獲取到(Attribute<NettyChannel> attr = ctx.channel().attr(NETTY_CHANNEL_KEY); )
Netty團隊在4.1版本之后,在每一個Channel內部僅僅保留一個map,確保每個key之間的唯一性,因此每一個Channel不需要多個map,因此直接使用Channel的attr添加。
Attribute<NettyChannel> attr = ctx.channel().attr(NETTY_CHANNEL_KEY);
核心修改
private AttributeKey<ImapSession> getAttributeKey(ChannelHandlerContext ctx) {
return AttributeKey.valueOf(ctx.channel().id().asLongText() + "," + ctx.channel().remoteAddress().toString());
}
// 設置屬性
ctx.channel().attr(getAttributeKey(ctx)).set(imapsession);
NettyConstants的attributes屬性整改(ChannelLocal)
在NettyConstants中定義ChannelLocal<Object> attributes = new ChannelLocal<>();
屬性,通過分析可知,該屬性的作用是添加自定義屬性到attributes中,在pipleline的handler之間共享,可以使用Netty 4中的AttributeMap綁定屬性到channel上。
另,注意對於Netty服務器來說,新來一個連接即建立了一個Channel,每個Channel都新建了一個pipeline與之對應且唯一,在pipeline中是一組handler,且不同的Channel的pipeline和handler都是新創建的,互不干擾。每個handler都有一個上下文ChannelHandlerContext與之唯一對應(handler不是Sharable)。
Channel.attachment整改
attachment也是在channel間共享的數據,可以使用AttributeMap進行替代。
由於attchment原本是DefaultChannelHandlerContext的屬性,Netty 4已經去掉。可以通過將需要設置的對象的地址(object.toString())作為key,對象本身作為內容存入AttributeMap。
由於一個不是@Sharable的ChannelHandler唯一確定一個ChannelHandlerContext,因此可以使用該上下文的地址作為key標識attachment
ByteBuf
ByteBuf轉為String
buf.toString(CharsetUtil.UTF_8)
核心修改
ChannelBuffers. wrappedBuffer修改為Unpooled.wrappedBuffer
channel的關閉
使用如下方式確保緩沖區內容寫出后關閉channel,如果直接調用channel.close方法會立即關閉,可能會有數據在緩沖區未寫出。
private void bufferFlushOut(Channel channel) {
if (channel.isActive()) {
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
// 初始化一個空的ByteBuf
ByteBuf heapBuf = Unpooled.buffer(0);
channel.writeAndFlush(heapBuf).addListener(ChannelFutureListener.CLOSE);
}
}
遷移服務端的基本點
-
使用新的bootstrap API重寫 FactorialServer.run() 方法.
-
不再使用 ChannelFactory 。初始化一個 NioEventLoopGroup (一個用來接受連接,其它的用來處理接受后的連接.
-
重命名 FactorialServerPipelineFactory 為 FactorialServerInitializer. 讓它擴展 ChannelInitializer.
-
不創建一個 ChannelPipeline, 而是通過 Channel.pipeline()得到它.
-
讓 FactorialServerHandler 擴展 ChannelInboundHandlerAdapter.
-
用channelInactive()替換 channelDisconnected() .
-
不再使用handleUpstream().
-
將 messageReceived() 命名為 channelRead(), 並相應的調整方法簽名.
-
用 ctx.writeAndFlush()替換 ctx.write() .
-
讓 BigIntegerDecoder 擴展 ByteToMessageDecoder.
-
讓 NumberEncoder 擴展 MessageToByteEncoder.
-
encode() 不再返回一個buffer. 由ByteToMessageDecoder負責將編碼的數據填入到buffer中.
空閑檢測handler
ImapIdleStateHandler原來實現IdleStateAwareChannelHandler,Netty 4已經沒有該類,需重寫。ImapIdleStateHandler類主要作用是檢測連接的客戶端是否空閑,並執行相應的動作。
IdleStateAwareChannelHandlerChannelInboundHandlerAdapter
修改繼承的實現類cp.addLast("heartbeatHandler", new HeartbeatHandler());
public class ImapIdleStateHandler extends SimpleChannelInboundHandler<ByteBuf> implements NettyConstants {
userEventTriggered
channelRead0
}
類似的ImapHeartbeatHandler也這樣處理
IdleStateHandler
IdleStateAwareChannelHandler已經去除,但 IdleStateHandler類還存在,
cp.addLast("idleTimeoutHandler", new IdleStateHandler(getTimer(), getClientIdleTimeout().toMillis(), NO_WRITER_IDLE_TIMEOUT, NO_ALL_IDLE_TIMEOUT, TimeUnit.MILLISECONDS));
修改為
cp.addLast("idleTimeoutHandler", new IdleStateHandler( NO_WRITER_IDLE_TIMEOUT, NO_WRITER_IDLE_TIMEOUT, NO_ALL_IDLE_TIMEOUT, TimeUnit.MILLISECONDS));
編解碼器
ImapRequestFrameDecoder
注意點
非@Sharable的handler必須每次new
如果是非@Sharable的handler,每次添加到pipeline的時候必須new出來,否則會報錯
is not a @Sharable handler。
主要原因在於,每個連接(Channel)接入服務器的時候都會初始化一個pipeline,如果pipeline中的handler是類成員且在實例化的時候初始化,則只會有一個handler。
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
SimpleChannelInboundHandler 它會自動進行一次釋放(即引用計數減1).
繼承了SimpleChannelInboundHandler的Handler都會自動釋放消息資源
繼承了SimpleChannelInboundHandler,類會自動釋放資源
如果繼承的是ChannelInboundHandlerAdapter,則需要自己釋放(只能在不需要繼續傳遞的handler釋放),如果不釋放且繼續傳遞,則會在TailContext中釋放(io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object))
Netty 4服務端啟動
在Netty官方服務器標准樣例中,服務器啟動最后調用了 f.channel().closeFuture().sync();
該方法會阻塞主線程,導致只能啟動一個服務器,因此在代碼中不能添加這行代碼。
服務器的關閉的兩行代碼也要放到unbind方法中。
ChannelOption參數詳解
1、ChannelOption.SO_BACKLOG
ChannelOption.SO_BACKLOG對應的是tcp/ip協議listen函數中的backlog參數,函數listen(int socketfd,int backlog)用來初始化服務端可連接隊列,服務端處理客戶端連接請求是順序處理的,所以同一時間只能處理一個客戶端連接,多個客戶端來的時候,服務端將不能處理的客戶端連接請求放在隊列中等待處理,backlog參數指定了隊列的大小
2、ChannelOption.SO_REUSEADDR
ChanneOption.SO_REUSEADDR對應於套接字選項中的SO_REUSEADDR,這個參數表示允許重復使用本地地址和端口,比如,某個服務器進程占用了TCP的80端口進行監聽,此時再次監聽該端口就會返回錯誤,使用該參數就可以解決問題,該參數允許共用該端口,這個在服務器程序中比較常使用,比如某個進程非正常退出,該程序占用的端口可能要被占用一段時間才能允許其他進程使用,而且程序死掉以后,內核一需要一定的時間才能夠釋放此端口,不設置SO_REUSEADDR就無法正常使用該端口。
3、ChannelOption.SO_KEEPALIVE
Channeloption.SO_KEEPALIVE參數對應於套接字選項中的SO_KEEPALIVE,該參數用於設置TCP連接,當設置該選項以后,連接會測試鏈接的狀態,這個選項用於可能長時間沒有數據交流的連接。當設置該選項以后,如果在兩小時內沒有數據的通信時,TCP會自動發送一個活動探測數據報文
4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF參數對應於套接字選項中的SO_SNDBUF,ChannelOption.SO_RCVBUF參數對應於套接字選項中的SO_RCVBUF這兩個參數用於操作接收緩沖區和發送緩沖區的大小,接收緩沖區用於保存網絡協議站內收到的數據,直到應用程序讀取成功,發送緩沖區用於保存發送數據,直到發送成功。
5、ChannelOption.SO_LINGER
ChannelOption.SO_LINGER參數對應於套接字選項中的SO_LINGER,Linux內核默認的處理方式是當用戶調用close()方法的時候,函數返回,在可能的情況下,盡量發送數據,不一定保證會發生剩余的數據,造成了數據的不確定性,使用SO_LINGER可以阻塞close()的調用時間,直到數據完全發送
6、ChannelOption.TCP_NODELAY
ChannelOption.TCP_NODELAY參數對應於套接字選項中的TCP_NODELAY,該參數的使用與Nagle算法有關,Nagle算法是將小的數據包組裝為更大的幀然后進行發送,而不是輸入一次發送一次,因此在數據包不足的時候會等待其他數據的到了,組裝成大的數據包進行發送,雖然該方式有效提高網絡的有效負載,但是卻造成了延時,而該參數的作用就是禁止使用Nagle算法,使用於小數據即時傳輸,於TCP_NODELAY相對應的是TCP_CORK,該選項是需要等到發送的數據量最大的時候,一次性發送數據,適用於文件傳輸。
7、IP_TOS
IP參數,設置IP頭部的Type-of-Service字段,用於描述IP包的優先級和QoS選項。
8、ALLOW_HALF_CLOSURE
Netty參數,一個連接的遠端關閉時本地端是否關閉,默認值為False。值為False時,連接自動關閉;為True時,觸發ChannelInboundHandler的userEventTriggered()方法,事件為ChannelInputShutdownEvent。
Netty的future.channel().closeFuture().sync();到底有什么用?
主線程執行到這里就 wait 子線程結束,子線程才是真正監聽和接受請求的,closeFuture()是開啟了一個channel的監聽器,負責監聽channel是否關閉的狀態,如果監聽到channel關閉了,子線程才會釋放,syncUninterruptibly()讓主線程同步等待子線程結果
如果我們不想加f.channel().closeFuture().sync()又想保證程序正常運行怎么辦,去掉finally 里面關閉nettyserver的語句即可。
對javax.servlet-api的依賴
注意項目中依賴javax.servlet-api的scope是provided的,打包的時候不會將該包打到程序中,因此需要確保運行環境中有該包。