[編織消息框架][netty源碼分析]8 Channel 實現類NioSocketChannel職責與實現


Unsafe是托委訪問socket,那么Channel是直接提供給開發者使用的

Channel 主要有兩個實現 NioServerSocketChannel同NioSocketChannel 致於其它不常用不在研究范圍內

NioServerSocketChannel 是給server用的,程序由始至終只有一個NioServerSocketChannel

NioSocketChannel 是給客戶端用的,每個連接生成一個NioSocketChannel 對象

 

NioSocketChannel同NioSocketChannel的繼承關系

NioSocketChannel -> AbstractNioByteChannel -> AbstractNioChannel -> AbstractChannel

NioServerSocketChannel -> AbstractNioMessageChannel-> AbstractNioChannel -> AbstractChannel

小提示:如果看文字不夠直觀可以在eclipse里按快捷鍵 選擇類 ctrl+t 

channel有unsafe相應的實現類,反之亦是。其實功能是很簡單的,划分太多對象目的是對某部分功能重用,有時也可能因過渡設計造成

對於channel我們主要分析 I/O read/write操作

public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    //構造時就綁定SelectorProvider,然后注冊OP_ACCEPT
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    
 
    /**
    server read操作對應的為readMessages
    參數是個數組,是C語言書寫風格,如果需要返回多種類型數據,那么傳個對象進去外部就能獲取到
    這里比較重要,當有接收到socket時,生成NioSocketChannel對象
   讀者如果還有印象的話在講NioEventLoop 有提示netty read 操作是不分 OP_ACCEPT、OP_READ的,可以在這方法打上斷點觀察
*/ @Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { //生成NioSocketChannel buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { ch.close(); } return 0; } //server 應該沒有write操作才對,因為server是一對多處理,不知道發給那一個clinet @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception {} }

 

 

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
    public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }
    
    //////////////////////////////這部分是unsafe底層調用上層的實現//////////////////////////////////////////////
    @Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        //這里設置byteBuf寫入數據坐標
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }

    @Override
    protected int doWriteBytes(ByteBuf buf) throws Exception {
        final int expectedWrittenBytes = buf.readableBytes();
        return buf.readBytes(javaChannel(), expectedWrittenBytes);
    }

    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {
            int size = in.size();
            //沒有數據退出
            if (size == 0) {
                clearOpWrite();
                break;
            }
            
            long writtenBytes = 0;    //記錄寫數據size
            boolean done = false;    //是否完成
            boolean setOpWrite = false;


            ByteBuffer[] nioBuffers = in.nioBuffers();
            int nioBufferCnt = in.nioBufferCount();
            long expectedWrittenBytes = in.nioBufferSize();
            SocketChannel ch = javaChannel();

            //這里有三種分支處理
            //如果沒有ByteBuffer 有可能只發送幾個byte
            //1跟default邏輯其實是一樣的
            switch (nioBufferCnt) {
                case 0:
                    //調用父類 AbstractNioByteChannel doWrite,邏輯基本相同,不同的是AbstractNioByteChannel處理的是byte 實現調用的是 doWriteBytes(ByteBuf buf)方法。。。
                    super.doWrite(in);
                    return;
                case 1:
                    //這里只循環16次,可以看出是復制下面代碼的哈。。。
                    ByteBuffer nioBuffer = nioBuffers[0];
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final int localWrittenBytes = ch.write(nioBuffer);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
                default:
                    //多個ByteBuffer時跟上面邏輯一樣
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
            }

            // Release the fully written buffers, and update the indexes of the partially written buffer.
            in.removeBytes(writtenBytes);

            if (!done) {
                // Did not write all buffers completely.
                incompleteWrite(setOpWrite);
                break;
            }
        }
    }
}

 

 

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    //生成NioSocketChannel時就綁定 unsafe pipeline
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
}
protected abstract class AbstractUnsafe implements Unsafe {
    private void register0(ChannelPromise promise) {
        try {
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            boolean firstRegistration = neverRegistered;
            doRegister();
            // doRegister 是調用 AbstractNioChannel selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            neverRegistered = false;
            registered = true;
            //這里是添加 Handler 每個Handler會生成一個Context
            pipeline.invokeHandlerAddedIfNeeded();

            safeSetSuccess(promise);
            //通知Handler Registered
            pipeline.fireChannelRegistered();
            if (isActive()) {
                if (firstRegistration) {
                    //通知Handler Active
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    beginRead();
                }
            }
        } catch (Throwable t) {
            //.......
        }
    }
}

小結:看似很復雜的Channel實現其實沒想象難,大多數讀寫坐標記錄交給ByteBuf處理掉了

1.server每個client連接轉換成NioSocketChannel對象

2.構建NioSocketChannel時就已經生成 unsafe、pipeline


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM