在實際場景中,使用Netty4來實現RPC框架服務端一般會驗證協議,最簡單的方法的協議探測,判斷魔數是否正確。如果服務端無法識別協議會立即拋出異常,並主動關閉連接,此時客戶端會收到read信號,在發現是一個關閉連接請求后會關閉本地連接
這其中用戶可以感知到的是InboundHandle收到channelInactive方法,可以自定義一些日志或作一下通知操作。
今天想搞清楚的是:
假設consumer端的InboundHandle收到channelInactive事件,是否可以立即結束這次請求並返回請求失敗,而不必等到timeout才結束?
單純的這個協議探測失敗場景中,是可以立即結束的,因為服務端不會響應任何數據。
其他場景呢,在第1個InboundHandle的inactive方法中可以直接響應操作失敗嗎?
服務端主動關閉連接時,Netty的處理流程
在netty4中channel都被綁定到特定I/O EventLoop線程中,I/O線程收到信號為SelectionKey.OP_READ的就緒信號,如果allocHandle.lastBytesRead() = -1,則表示這是連接關閉的請求。
單獨看read的處理邏輯在AbstractNioByteChannel.read()中
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0; // -1表示關閉連接
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline); //執行關連接邏輯
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
close = allocHandle.lastBytesRead() < 0;
根據最后可讀的bytes為-1,即為關閉
接下來是關閉連接的流程
檢查此連接之前是否為active狀態,並且當前是否為active,進入fireChannelInactiveAndDeregister流程,注銷channel
這里不會立即注銷channel,而是以一個任務的形式放到eventloop中稍后執行,文檔中解釋了不立即執行的原因:
private void invokeLater(Runnable task) {
try {
// This method is used by outbound operation implementations to trigger an inbound event later.
// They do not trigger an inbound event immediately because an outbound operation might have been
// triggered by another inbound event handler method. If fired immediately, the call stack
// will look like this for example:
//
// handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
// -> handlerA.ctx.close()
// -> channel.unsafe.close()
// -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
//
// which means the execution of two inbound handler methods of the same handler overlap undesirably.
eventLoop().execute(task);
} catch (RejectedExecutionException e) {
logger.warn("Can't invoke task later as EventLoop rejected it", e);
}
}
大致的意思是 這個方法被用於出港操作來延遲觸發一個到達的事件, 主要針對的場景是InboundHandler中產生了出港操作,比如主動斷開連接,可能出現多個InboundHandler都會觸發出港操作,這些操作可能是相同的如果立即執行,可能會重疊執行。
如果客戶端從未收到任何數據,直接被服務端主動關閉是沒有這個顧慮的,因為還沒有InboundHandler處理數據。
invokeLater(new Runnable() {
@Override
public void run() {
try {
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
pipeline.fireChannelInactive();
}
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered, so check
// if it was registered.
if (registered) {
registered = false;
pipeline.fireChannelUnregistered();
}
safeSetSuccess(promise);
}
}
});
在從eventloop移除當前channel后(doDeregister()方法),進入finally代碼塊,如果之前連接是可用的,則fireChannelInactive為true
接下來進入pipeline.fireChannelInactive();流程
DefaultChannelPipreline 中有一個 AbstractChannelHandlerContext 鏈表,他按用戶配置的順序被寫入Pipiline中,表頭是Netty自定義的HeadContext,先執行頭部的AbstractChannelHandlerContext
@Override
public final ChannelPipeline fireChannelInactive() {
AbstractChannelHandlerContext.invokeChannelInactive(head);
return this;
}
AbstractChannelHandlerContext持有 ChannelHandler對象,此處實際為 ChannelInboundHandler
private void invokeChannelInactive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelInactive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelInactive();
}
}
這里會先進入netty預置的DefaultChannelPipeline$HeadContext實現中,然后走到了我們配置的第一個InboundHandle中來
默認的channelInactive實現在ByteToMessageDecoder中
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
channelInputClosed(ctx, true);
}
Netty對關閉連接容忍度非常高,關閉連接前如果連接仍然有可讀的數據,會嘗試把他讀出來
private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception {
CodecOutputList out = CodecOutputList.newInstance();
try {
channelInputClosed(ctx, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
if (cumulation != null) {
cumulation.release();
cumulation = null;
}
int size = out.size();
fireChannelRead(ctx, out, size);
if (size > 0) {
// Something was read, call fireChannelReadComplete()
ctx.fireChannelReadComplete();
}
if (callChannelInactive) {
ctx.fireChannelInactive();
}
} finally {
// Recycle in all cases
out.recycle();
}
}
}
/**
* Called when the input of the channel was closed which may be because it changed to inactive or because of
* {@link ChannelInputShutdownEvent}.
*/
void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
if (cumulation != null) {
callDecode(ctx, cumulation, out);
decodeLast(ctx, cumulation, out);
} else {
decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
}
}
/**
* Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
*/
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
callDecode會嘗試去decode已經在channel還為取完的數據,如果取到了,則外部方法中的size就會大於0,會走完fireChannelRead的流程
/**
* A {@link Channel} received a message.
*
* This will result in having the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}
* method called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the
* {@link Channel}.
*/
ChannelInboundInvoker fireChannelRead(Object msg);
fireChannelRead方法會調用ChannelPipeline中的下一個ChannelInboundHandler執行channelRead,我們常用的MessageToMessageDecoder執行channelRead時,會調用decode方法,也就是我們實現來處理請求的方法。
然后設置channelReadComplete()
最后調用ctx.fireChannelInactive();將事件傳給下一個handler。
Netty4中I/O EventLoop是單線程執行的,對一個channel來說是線程安全的,而channel上的數據必然是先於close信號到達的,那么,當我們收到close新號時,之前已經發送過來的數據,一定已經至少被嘗試處理過了嗎?如果是這樣,還有必要執行一下callDecode邏輯來fireChannelRead嗎?
回到最初的代碼塊中
AbstractNioByteChannel.read()中
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf); //如果是正常的read,立即 fireChannelRead
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
可以發現,如果是普通的讀操作,會立即觸發fireChannelRead,經過前面的分析,可以知道該方法將會invokeChannelRead,並且當前的executor是在eventLoop中的,那么channelRead會被立即執行,最終觸發我們配置的InboundHandle
而close新號必然晚於正常的數據流,因此數據一定是先被InboundHandle處理后才接受到close信號的。
結論:
單從服務端主動斷開連接的場景,如果收到close新號,則在第一個inboundHandle的 channelInactive 方法中,直接通知業務任務已失敗是安全的,因為數據流如果返回完全,則必然被處理過。
如果客戶端inbountHandle過程中主動close掉channel,也是安全的,因為他會稍后執行,並且嘗試將最新的Inbound數據再次decode,如果有可處理的數據就走完所有InboundHandle(channel已經關閉,寫響應會失敗),沒有則結束