前言
在上一講網絡編程-關閉連接(2)-Java的NIO在關閉socket時,究竟用了哪個系統調用函數?中,我們做了個實驗,研究了java nio的close函數究竟調用了哪個系統調用,答案是close,但在真實的測試代碼中,其實我犯了一個小錯誤,在close之后並沒有return,所以在測試close之后,還做了writeAndFlush操作發送了一條數據,並且執行過程並沒有報錯。這件事讓我關注起了close和之后的writeAndFlush之間的關系。為什么在close之后”看起來“還可以繼續寫入呢?
原始代碼如下:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//寫入本地文件測試字符,然后關閉channel
FileWriter fileWriter = new FileWriter("/root/test.txt");
fileWriter.write("test test hold on");
fileWriter.flush();
fileWriter.close();
//調用同步方法關閉
ChannelFuture sync = ctx.channel().close().sync();
if(sync.isSuccess()){
System.out.println("關閉成功!");
}else{
System.out.println("關閉失敗!");
}
//這里開始,是誤執行的語句
this.ctx = ctx;
//發送心跳指令
if (count.intValue() > 150) {
count.set(1);
}
Command0C04 command0C04 = new Command0C04(count.intValue());
byte[] encode = command0C04.encode();
logger.info("心跳指令:" + HexStringUtils.toHexString(encode));
ctx.channel().writeAndFlush(encode).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("success:"+channelFuture.isSuccess());
System.out.println("cancelled:"+channelFuture.isCancelled());
System.out.println("done:"+channelFuture.isDone());
System.out.println("isCancellable:"+channelFuture.isCancellable());
}
});
count.getAndIncrement();
}
我們知道,close系統調用會關閉讀和寫兩個方向的操作,那么writeAndFlush在close之后具體是如何執行的?netty是怎么確保不會寫入到發送緩沖區中呢?
想研究清楚這個問題,需要先看writeAndFlush操作做了什么,涉及到什么底層的數據結構。
writeAndFlush原理
簡言之,writeAndFlush,在底層會做兩個操作
- write操作
- flush操作
首先分析write操作。
write操作
netty底層會維護一個重要的數據結構,ChannelOutboundBuffer,這是一個單向鏈表。我們調用寫的方法其實會把數據先緩存到這個數據結構中,等調用flush之后,就會真正的把數據寫入到發送緩沖區當中。
ChannelOutBoundBuffer中有以下幾個重要的指針:
- Entry代表了我們發送的數據
- flushedEntry代表需要寫入到發送緩沖區的第一個Entry
- unflushedEntry代表第一個等待寫入發送緩沖區的Entry
當第一次調用addMessage方法往ChannelOutBoundBuffer中添加數據時
第二次調用addMessage方法時,數據指針如下
如果不調用Flush,那么flushedEntry指針一直為null,數據會一直寫入到后面的鏈表中。
Flush操作
當調用Flush操作后,指針情況如圖:
之后的代碼,就是遍歷這段節點數據,寫入到發送緩沖區中,並且寫入后釋放節點內存。
判斷緩沖區是否可寫(小知識)
在實際flush之前,netty調用isFlushPending判斷,這個channel是否注冊了可寫事件,如果有可寫事件就等會再發送。如果沒有,就會調用父類的flush0方法直接寫。
- 注:如果到達發送緩沖區的水位線了,發送緩沖區本身就不可寫了,這個時候會(XX會)注冊一個可寫事件到selector中,netty就是使用這個可寫判斷是否可以真正的發送。
protected final void flush0() {
if (!isFlushPending()) {
super.flush0();
}
}
private boolean isFlushPending() {
SelectionKey selectionKey = selectionKey();
return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}
OOM?
如果接收端消費速度很慢,接收緩沖區滿了以后,會導致發送緩沖區無法繼續發送數據,在一直發送數據的前提下,ChannelOutboundBuffer會一直上漲,可能會引起OOM問題。
Netty官方提供了兩個ChannelOutBoundBuffer配置參數、一個Channel屬性和一個用戶回調方法來幫助我們識別和解決這件事。
兩個ChannelOutBoundBuffer配置參數:
-
Channel.config().setWriteBufferHighWaterMark:高水位,默認64 kb
-
Channel.config().setWriteBufferLowWaterMark :低水位:默認32 kb
一個Channel屬性:isWritable
一個用戶回調方法:fireChannelWritabilityChanged
內部邏輯如下:
- 當本次需要添加到ChannelOutBoundBuffer的數據量超過了高水位,會改變isWritable對應的屬性值從0變為1,並且觸發一個ChannelWritabilityChanged事件。
- 當flush或者remove后,如果數據恢復到最低水位下了,會改變isWritable對應的屬性值從1變為0,並且觸發一個ChannelWritabilityChanged事件。
用戶可以通過屬性和回調方法來檢查是否可寫,做相關的業務處理。
writeAndFlush總結
在調用寫入方法后,netty並不會直接把數據寫入到發送緩沖區中,而是存儲在了ChannelOutboundBuffer中,等到調用flush操作后,再把數據真正寫入Socket的發送緩沖區中。
close以后是否還能寫入數據?
跟蹤close源碼,最后會跟蹤到io.netty.channel.AbstractChannel 的內部類 AbstractUnsafe中的close方法,方法代碼如下(部分代碼省略,只保留這個問題相關的核心代碼):
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
}
可以看到,這里有一句this.outboundBuffer = null; 相當於把上文分析的ChannelOutboundBuffer置空。
結合同在AbstractUnsafe中的write代碼中的這一部分來看(同樣省略了非問題關注的代碼)
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, newWriteException(initialCloseCause));
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
}
在write之前,會做判斷,如果如果ChannelOutboundBuffer為空為空,那么釋放內存,不發送數據並返回。
總結
首先我們了解了,在發送過程中比較重要的數據結構ChannelOutboundBuffer,然后我們了解了在close的時候,會把如果ChannelOutboundBuffer置空,並且在write的時候,會判斷該buffer是否為空,為空則不發送,並設置失敗,到此我們的問題就研究明白了。