我最近在項目中,遇到一個問題,ctx.close() 和 ctx.channel().close() 到底有何區別?
即調用 ChannelHandlerContext#close() 和 Channel#close() 有何不同?
從現象來看
建議先看一下下面這篇文章:
[翻譯]Netty4中 Ctx.close 與 Ctx.channel.close 的區別 跳轉 click here
假如我們有這樣一個 雙向處理器 SomeHandler
:
import io.netty.channel.*;
@ChannelHandler.Sharable
public class SomeHandler extends ChannelDuplexHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(ctx.name() + " channelRead: " + msg);
String result = (String) msg;
if (("ctx.close." + ctx.name()).equals(result)) {
ctx.close();
} else if (("ctx.channel.close." + ctx.name()).equals(result)) {
ctx.channel().close();
} else {
ctx.fireChannelRead(msg);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
ctx.fireChannelInactive();
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println(ctx.name() + " close");
ctx.close(promise);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
System.out.println(ctx.name() + " write");
ctx.write(String.format("[%s]%s", ctx.name(), msg), promise);
}
}
假如,我們的服務器端構建的管道:
ChannelPipeline p = ...;
p.addLast("A", new SomeHandler());
p.addLast("B", new SomeHandler());
p.addLast("C", new SomeHandler());
...
完整的 服務端代碼 點擊此處
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
StringDecoder stringDecoder = new StringDecoder();
StringEncoder stringEncoder = new StringEncoder();
SomeHandler aHandler = new SomeHandler();
SomeHandler bHandler = new SomeHandler();
SomeHandler cHandler = new SomeHandler();
ServerBootstrap bootstrap = new ServerBootstrap()
.channel(NioServerSocketChannel.class)
.group(boss, worker)
.childHandler(new ChannelInitializer
() { @Override protected void initChannel(NioSocketChannel channel) { channel.pipeline() .addLast("decoder", stringDecoder) .addLast("encoder", stringEncoder) .addLast("A", aHandler) .addLast("B", bHandler) .addLast("C", cHandler); } }); bootstrap.bind(8098).sync(); } }
如果,客戶端 發送給服務端的數據是 ctx.close.B
,輸出日志將是:
A channelRead: ctx.close.B
B channelRead: ctx.close.B
A close
這里,你沒有看到輸出 B close ,但是不用驚訝,因為,你調用的是上下文 ctx.close() 方法,它是不會再去調用當前處理器對象的 close 方法的。
如果,客戶端 發送給服務端的數據是 channel.close.B
,輸出的日志將是:
A channelRead: ctx.channel.close.B
B channelRead: ctx.channel.close.B
C close
B close
A close
從源碼來說
ctx.close()
通常,ctx 的類是 DefaultChannelHandlerContext
,在調用 close
或者 writeAndFlush
這類出站方法時,最終會調用 AbstractChannelHandlerContext
的方法:
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelFuture close() {
return close(newPromise());
}
@Override
public ChannelFuture close(final ChannelPromise promise) {
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
// 這個方法尋找下一個能夠處理 CLOSE 事件的出站處理器
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeClose(promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeClose(promise);
}
}, promise, null, false);
}
return promise;
}
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
// 這個方法尋找下一個能夠處理相應事件的處理器
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
// We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
task.cancel();
}
}
}
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
// 向前尋找出站處理器
ctx = ctx.prev;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
return ctx;
}
}
注意哦,出站方法是向前尋找處理器。
ctx.channel().close()
通常,ctx.channel()
返回的類可以是 NioSocketChannel
,在調用 close
或者 writeAndFlush
這類出站方法時,
會調用 AbstractChannel
的 close
或者 writeAndFlush
方法,
再接着就是調用 DefaultChannelPipeline
的 close
或者 writeAndFlush
方法:
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelFuture close(ChannelPromise promise) {
return tail.close(promise);
}
@Override
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
}
你會發現,都是從管道的 TailContext
開始調用,因此所有符合要求的出站處理器都將被執行。
但是,你還要注意,處理器的寫法
public class SomeHandler extends ChannelOutboundHandlerAdapter {
/**
* 如果不覆寫這個 close 方法,直接運行父類的方法,也是正常的。
*/
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
// 如果出站處理器中,不加下面這段話,可能會導致 Channel 無法被正常關閉!
ctx.close(promise);
}
/**
* 如果不覆寫這個 write 方法,直接運行父類的方法,也是正常的。
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// 如果出站處理器中,不加下面這段話,可能會導致 Channel 無法正常發出消息!
ctx.write(msg, promise);
}
}