ctx.close() 和 ctx.channel().close() 到底有何區別?


我最近在項目中,遇到一個問題,ctx.close() 和 ctx.channel().close() 到底有何區別?

即調用 ChannelHandlerContext#close() 和 Channel#close() 有何不同?

從現象來看

建議先看一下下面這篇文章:

[翻譯]Netty4中 Ctx.close 與 Ctx.channel.close 的區別 跳轉 click here

假如我們有這樣一個 雙向處理器 SomeHandler

import io.netty.channel.*;

@ChannelHandler.Sharable
public class SomeHandler extends ChannelDuplexHandler {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.fireChannelActive();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println(ctx.name() + " channelRead: " + msg);
        String result = (String) msg;
        if (("ctx.close." + ctx.name()).equals(result)) {
            ctx.close();
        } else if (("ctx.channel.close." + ctx.name()).equals(result)) {
            ctx.channel().close();
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        ctx.fireChannelInactive();
    }

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        System.out.println(ctx.name() + " close");
        ctx.close(promise);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        System.out.println(ctx.name() + " write");
        ctx.write(String.format("[%s]%s", ctx.name(), msg), promise);
    }
}

假如,我們的服務器端構建的管道:

ChannelPipeline p = ...;
p.addLast("A", new SomeHandler());
p.addLast("B", new SomeHandler());
p.addLast("C", new SomeHandler());
...
完整的 服務端代碼 點擊此處

public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup();
        StringDecoder stringDecoder = new StringDecoder();
        StringEncoder stringEncoder = new StringEncoder();
        SomeHandler aHandler = new SomeHandler();
        SomeHandler bHandler = new SomeHandler();
        SomeHandler cHandler = new SomeHandler();
        ServerBootstrap bootstrap = new ServerBootstrap()
                .channel(NioServerSocketChannel.class)
                .group(boss, worker)
                .childHandler(new ChannelInitializer
   
   
   
           
             () { @Override protected void initChannel(NioSocketChannel channel) { channel.pipeline() .addLast("decoder", stringDecoder) .addLast("encoder", stringEncoder) .addLast("A", aHandler) .addLast("B", bHandler) .addLast("C", cHandler); } }); bootstrap.bind(8098).sync(); } } 
           

如果,客戶端 發送給服務端的數據是 ctx.close.B,輸出日志將是:

A channelRead: ctx.close.B
B channelRead: ctx.close.B
A close

這里,你沒有看到輸出 B close ,但是不用驚訝,因為,你調用的是上下文 ctx.close() 方法,它是不會再去調用當前處理器對象的 close 方法的。

如果,客戶端 發送給服務端的數據是 channel.close.B,輸出的日志將是:

A channelRead: ctx.channel.close.B
B channelRead: ctx.channel.close.B
C close
B close
A close

從源碼來說

ctx.close()

通常,ctx 的類是 DefaultChannelHandlerContext,在調用 close 或者 writeAndFlush 這類出站方法時,最終會調用 AbstractChannelHandlerContext 的方法:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    @Override
    public ChannelFuture close() {
        return close(newPromise());
    }
    
    @Override
    public ChannelFuture close(final ChannelPromise promise) {
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }
        
        // 這個方法尋找下一個能夠處理 CLOSE 事件的出站處理器
        final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeClose(promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeClose(promise);
                }
            }, promise, null, false);
        }

        return promise;
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg) {
        return writeAndFlush(msg, newPromise());
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        write(msg, true, promise);
        return promise;
    }
    
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        ObjectUtil.checkNotNull(msg, "msg");
        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
        
        // 這個方法尋找下一個能夠處理相應事件的處理器
        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
            if (!safeExecute(executor, task, promise, m, !flush)) {
                // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
                // and put it back in the Recycler for re-use later.
                //
                // See https://github.com/netty/netty/issues/8343.
                task.cancel();
            }
        }
    }    


    private AbstractChannelHandlerContext findContextOutbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            // 向前尋找出站處理器
            ctx = ctx.prev;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
        return ctx;
    }
}

注意哦,出站方法是向前尋找處理器。

ctx.channel().close()

通常,ctx.channel() 返回的類可以是 NioSocketChannel,在調用 close 或者 writeAndFlush 這類出站方法時,
會調用 AbstractChannelclose 或者 writeAndFlush 方法,
再接着就是調用 DefaultChannelPipelineclose 或者 writeAndFlush 方法:

public class DefaultChannelPipeline implements ChannelPipeline {
    @Override
    public final ChannelFuture close(ChannelPromise promise) {
        return tail.close(promise);
    }

    @Override
    public final ChannelFuture writeAndFlush(Object msg) {
        return tail.writeAndFlush(msg);
    }
}

你會發現,都是從管道的 TailContext 開始調用,因此所有符合要求的出站處理器都將被執行。

但是,你還要注意,處理器的寫法

public class SomeHandler extends ChannelOutboundHandlerAdapter {
    /**
     * 如果不覆寫這個 close 方法,直接運行父類的方法,也是正常的。
     */
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        // 如果出站處理器中,不加下面這段話,可能會導致 Channel 無法被正常關閉!
        ctx.close(promise);
    }
    /**
     * 如果不覆寫這個 write 方法,直接運行父類的方法,也是正常的。
     */
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        // 如果出站處理器中,不加下面這段話,可能會導致 Channel 無法正常發出消息!
        ctx.write(msg, promise);
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM