Unsafe是托委訪問socket,那么Channel是直接提供給開發者使用的
Channel 主要有兩個實現 NioServerSocketChannel同NioSocketChannel 致於其它不常用不在研究范圍內
NioServerSocketChannel 是給server用的,程序由始至終只有一個NioServerSocketChannel
NioSocketChannel 是給客戶端用的,每個連接生成一個NioSocketChannel 對象
NioSocketChannel同NioSocketChannel的繼承關系
NioSocketChannel -> AbstractNioByteChannel -> AbstractNioChannel -> AbstractChannel
NioServerSocketChannel -> AbstractNioMessageChannel-> AbstractNioChannel -> AbstractChannel
小提示:如果看文字不夠直觀可以在eclipse里按快捷鍵 選擇類 ctrl+t
channel有unsafe相應的實現類,反之亦是。其實功能是很簡單的,划分太多對象目的是對某部分功能重用,有時也可能因過渡設計造成
對於channel我們主要分析 I/O read/write操作
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); //構造時就綁定SelectorProvider,然后注冊OP_ACCEPT public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } /** server read操作對應的為readMessages 參數是個數組,是C語言書寫風格,如果需要返回多種類型數據,那么傳個對象進去外部就能獲取到 這里比較重要,當有接收到socket時,生成NioSocketChannel對象
讀者如果還有印象的話在講NioEventLoop 有提示netty read 操作是不分 OP_ACCEPT、OP_READ的,可以在這方法打上斷點觀察 */ @Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { //生成NioSocketChannel buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { ch.close(); } return 0; } //server 應該沒有write操作才對,因為server是一對多處理,不知道發給那一個clinet @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception {} }
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); } //////////////////////////////這部分是unsafe底層調用上層的實現////////////////////////////////////////////// @Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //這里設置byteBuf寫入數據坐標 allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); } @Override protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); } @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { int size = in.size(); //沒有數據退出 if (size == 0) { clearOpWrite(); break; } long writtenBytes = 0; //記錄寫數據size boolean done = false; //是否完成 boolean setOpWrite = false; ByteBuffer[] nioBuffers = in.nioBuffers(); int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); SocketChannel ch = javaChannel(); //這里有三種分支處理 //如果沒有ByteBuffer 有可能只發送幾個byte //1跟default邏輯其實是一樣的 switch (nioBufferCnt) { case 0: //調用父類 AbstractNioByteChannel doWrite,邏輯基本相同,不同的是AbstractNioByteChannel處理的是byte 實現調用的是 doWriteBytes(ByteBuf buf)方法。。。 super.doWrite(in); return; case 1: //這里只循環16次,可以看出是復制下面代碼的哈。。。 ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; default: //多個ByteBuffer時跟上面邏輯一樣 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; } // Release the fully written buffers, and update the indexes of the partially written buffer. in.removeBytes(writtenBytes); if (!done) { // Did not write all buffers completely. incompleteWrite(setOpWrite); break; } } } }
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { //生成NioSocketChannel時就綁定 unsafe pipeline protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } } protected abstract class AbstractUnsafe implements Unsafe { private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); // doRegister 是調用 AbstractNioChannel selectionKey = javaChannel().register(eventLoop().selector, 0, this); neverRegistered = false; registered = true; //這里是添加 Handler 每個Handler會生成一個Context pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); //通知Handler Registered pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { //通知Handler Active pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { //....... } } }
小結:看似很復雜的Channel實現其實沒想象難,大多數讀寫坐標記錄交給ByteBuf處理掉了
1.server每個client連接轉換成NioSocketChannel對象
2.構建NioSocketChannel時就已經生成 unsafe、pipeline