Netty源碼分析 (七)----- read過程 源碼分析


在上一篇文章中,我們分析了processSelectedKey這個方法中的accept過程,本文將分析一下work線程中的read過程。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final NioUnsafe unsafe = ch.unsafe();
    //檢查該SelectionKey是否有效,如果無效,則關閉channel
    if (!k.isValid()) {
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        // 如果准備好READ或ACCEPT則觸發unsafe.read() ,檢查是否為0,如上面的源碼英文注釋所說:解決JDK可能會產生死循環的一個bug。
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {//如果已經關閉,則直接返回即可,不需要再處理該channel的其他事件
                // Connection already closed - no need to handle write.
                return;
            }
        }
        // 如果准備好了WRITE則將緩沖區中的數據發送出去,如果緩沖區中數據都發送完成,則清除之前關注的OP_WRITE標記
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }
        // 如果是OP_CONNECT,則需要移除OP_CONNECT否則Selector.select(timeout)將立即返回不會有任何阻塞,這樣可能會出現cpu 100%
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

該方法主要是對SelectionKey k進行了檢查,有如下幾種不同的情況

1)OP_ACCEPT,接受客戶端連接

2)OP_READ, 可讀事件, 即 Channel 中收到了新數據可供上層讀取。

3)OP_WRITE, 可寫事件, 即上層可以向 Channel 寫入數據。

4)OP_CONNECT, 連接建立事件, 即 TCP 連接已經建立, Channel 處於 active 狀態。

本篇博文主要來看下當work 線程 selector檢測到OP_READ事件時,內部干了些什么。

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
    if (!ch.isOpen()) {//如果已經關閉,則直接返回即可,不需要再處理該channel的其他事件
        // Connection already closed - no need to handle write.
        return;
    }
} 

從代碼中可以看到,當selectionKey發生的事件是SelectionKey.OP_READ,執行unsafe的read方法。注意這里的unsafe是NioByteUnsafe的實例

為什么說這里的unsafe是NioByteUnsafe的實例呢?在上篇博文Netty源碼分析:accept中我們知道Boss NioEventLoopGroup中的NioEventLoop只負責accpt客戶端連接,然后將該客戶端注冊到Work NioEventLoopGroup中的NioEventLoop中,即最終是由work線程對應的selector來進行read等時間的監聽,即work線程中的channel為SocketChannel,SocketChannel的unsafe就是NioByteUnsafe的實例

下面來看下NioByteUnsafe中的read方法

@Override
    public void read() {
        final ChannelConfig config = config();
        if (!config.isAutoRead() && !isReadPending()) {
            // ChannelConfig.setAutoRead(false) was called in the meantime
            removeReadOp();
            return;
        }

        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final int maxMessagesPerRead = config.getMaxMessagesPerRead();
        RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
        if (allocHandle == null) {
            this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
        }

        ByteBuf byteBuf = null;
        int messages = 0;
        boolean close = false;
        try {
            int totalReadAmount = 0;
            boolean readPendingReset = false;
            do {
                //1、分配緩存
                byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes();//可寫的字節容量
                //2、將socketChannel數據寫入緩存
                int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) {
                    // not was read release the buffer
                    byteBuf.release();
                    close = localReadAmount < 0;
                    break;
                }
                if (!readPendingReset) {
                    readPendingReset = true;
                    setReadPending(false);
                }
                //3、觸發pipeline的ChannelRead事件來對byteBuf進行后續處理
 pipeline.fireChannelRead(byteBuf);
                byteBuf = null;

                if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                    // Avoid overflow.
                    totalReadAmount = Integer.MAX_VALUE;
                    break;
                }

                totalReadAmount += localReadAmount;

                // stop reading
                if (!config.isAutoRead()) {
                    break;
                }

                if (localReadAmount < writable) {
                    // Read less than what the buffer can hold,
                    // which might mean we drained the recv buffer completely.
                    break;
                }
            } while (++ messages < maxMessagesPerRead);

        pipeline.fireChannelReadComplete();
            allocHandle.record(totalReadAmount);

            if (close) {
                closeOnRead(pipeline);
                close = false;
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close);
        } finally {
            if (!config.isAutoRead() && !isReadPending()) {
                removeReadOp();
            }
        }
    }
} 

下面一一介紹比較重要的代碼

allocHandler的實例化過程

allocHandle負責自適應調整當前緩存分配的大小,以防止緩存分配過多或過少,先看allocHandler的實例化過程

RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
    this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}

其中, config.getRecvByteBufAllocator()得到的是一個 AdaptiveRecvByteBufAllocator實例DEFAULT。

public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();

而AdaptiveRecvByteBufAllocator中的newHandler()方法的代碼如下:

@Override
public Handle newHandle() {
    return new HandleImpl(minIndex, maxIndex, initial);
}

HandleImpl(int minIndex, int maxIndex, int initial) {
    this.minIndex = minIndex;
    this.maxIndex = maxIndex;

    index = getSizeTableIndex(initial);
    nextReceiveBufferSize = SIZE_TABLE[index];
}

其中,上面方法中所用到參數:minIndex maxIndex initial是什么意思呢?含義如下:minIndex是最小緩存在SIZE_TABLE中對應的下標。maxIndex是最大緩存在SIZE_TABLE中對應的下標,initial為初始化緩存大小。

AdaptiveRecvByteBufAllocator的相關常量字段

public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator {

        static final int DEFAULT_MINIMUM = 64;
        static final int DEFAULT_INITIAL = 1024;
        static final int DEFAULT_MAXIMUM = 65536;

        private static final int INDEX_INCREMENT = 4;
        private static final int INDEX_DECREMENT = 1;

        private static final int[] SIZE_TABLE; 

上面這些字段的具體含義說明如下:

1)、SIZE_TABLE:按照從小到大的順序預先存儲可以分配的緩存大小。 
從16開始,每次累加16,直到496,接着從512開始,每次增大一倍,直到溢出。SIZE_TABLE初始化過程如下。

static {
    List<Integer> sizeTable = new ArrayList<Integer>();
    for (int i = 16; i < 512; i += 16) {
        sizeTable.add(i);
    }

    for (int i = 512; i > 0; i <<= 1) {
        sizeTable.add(i);
    }

    SIZE_TABLE = new int[sizeTable.size()];
    for (int i = 0; i < SIZE_TABLE.length; i ++) {
        SIZE_TABLE[i] = sizeTable.get(i);
    }
}

2)、DEFAULT_MINIMUM:最小緩存(64),在SIZE_TABLE中對應的下標為3。

3)、DEFAULT_MAXIMUM :最大緩存(65536),在SIZE_TABLE中對應的下標為38。

4)、DEFAULT_INITIAL :初始化緩存大小,第一次分配緩存時,由於沒有上一次實際收到的字節數做參考,需要給一個默認初始值。

5)、INDEX_INCREMENT:上次預估緩存偏小,下次index的遞增值。

6)、INDEX_DECREMENT :上次預估緩存偏大,下次index的遞減值。

構造函數:

private AdaptiveRecvByteBufAllocator() {
    this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}

public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
    if (minimum <= 0) {
        throw new IllegalArgumentException("minimum: " + minimum);
    }
    if (initial < minimum) {
        throw new IllegalArgumentException("initial: " + initial);
    }
    if (maximum < initial) {
        throw new IllegalArgumentException("maximum: " + maximum);
    }

    int minIndex = getSizeTableIndex(minimum);
    if (SIZE_TABLE[minIndex] < minimum) {
        this.minIndex = minIndex + 1;
    } else {
        this.minIndex = minIndex;
    }

    int maxIndex = getSizeTableIndex(maximum);
    if (SIZE_TABLE[maxIndex] > maximum) {
        this.maxIndex = maxIndex - 1;
    } else {
        this.maxIndex = maxIndex;
    }

    this.initial = initial;
}

該構造函數對參數進行了有效性檢查,然后初始化了如下3個字段,這3個字段就是上面用於產生allocHandle對象所要用到的參數。

private final int minIndex;
private final int maxIndex;
private final int initial;

其中,getSizeTableIndex函數的代碼如下,該函數的功能為:找到SIZE_TABLE中的元素剛好大於或等於size的位置。

private static int getSizeTableIndex(final int size) {
    for (int low = 0, high = SIZE_TABLE.length - 1;;) {
        if (high < low) {
            return low;
        }
        if (high == low) {
            return high;
        }

        int mid = low + high >>> 1;
        int a = SIZE_TABLE[mid];
        int b = SIZE_TABLE[mid + 1];
        if (size > b) {
            low = mid + 1;
        } else if (size < a) {
            high = mid - 1;
        } else if (size == a) {
            return mid;
        } else { //這里的情況就是 a < size <= b 的情況
            return mid + 1;
        }
    }
}

byteBuf = allocHandle.allocate(allocator);

申請一塊指定大小的內存

AdaptiveRecvByteBufAllocator#HandlerImpl

@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
    return alloc.ioBuffer(nextReceiveBufferSize);
}

直接調用了ioBuffer方法,繼續看

AbstractByteBufAllocator.java

@Override
public ByteBuf ioBuffer(int initialCapacity) {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity);
    }
    return heapBuffer(initialCapacity);
}

ioBuffer函數中主要邏輯為:看平台是否支持unsafe,選擇使用直接物理內存還是堆上內存。先看 heapBuffer

AbstractByteBufAllocator.java 

@Override
public ByteBuf heapBuffer(int initialCapacity) {
    return heapBuffer(initialCapacity, Integer.MAX_VALUE);
}

@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);
    return newHeapBuffer(initialCapacity, maxCapacity);
} 

這里的newHeapBuffer有兩種實現:至於具體用哪一種,取決於我們對系統屬性io.netty.allocator.type的設置,如果設置為: “pooled”,則緩存分配器就為:PooledByteBufAllocator,進而利用對象池技術進行內存分配。如果不設置或者設置為其他,則緩存分配器為:UnPooledByteBufAllocator,則直接返回一個UnpooledHeapByteBuf對象。

UnpooledByteBufAllocator.java

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

PooledByteBufAllocator.java

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<byte[]> heapArena = cache.heapArena;

    ByteBuf buf;
    if (heapArena != null) {
        buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
}

再看directBuffer

AbstractByteBufAllocator.java

@Override
public ByteBuf directBuffer(int initialCapacity) {
    return directBuffer(initialCapacity, Integer.MAX_VALUE);
}  

@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);//參數的有效性檢查
    return newDirectBuffer(initialCapacity, maxCapacity);
}

與newHeapBuffer一樣,這里的newDirectBuffer方法也有兩種實現:至於具體用哪一種,取決於我們對系統屬性io.netty.allocator.type的設置,如果設置為: “pooled”,則緩存分配器就為:PooledByteBufAllocator,進而利用對象池技術進行內存分配。如果不設置或者設置為其他,則緩存分配器為:UnPooledByteBufAllocator。這里主要看下UnpooledByteBufAllocator. newDirectBuffer的內部實現

UnpooledByteBufAllocator.java

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    ByteBuf buf;
    if (PlatformDependent.hasUnsafe()) {
        buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
}

UnpooledUnsafeDirectByteBuf是如何實現緩存管理的?對Nio的ByteBuffer進行了封裝,通過ByteBuffer的allocateDirect方法實現緩存的申請。

protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);
    //省略了部分參數檢查的代碼
    this.alloc = alloc;
    setByteBuffer(allocateDirect(initialCapacity));
}
protected ByteBuffer allocateDirect(int initialCapacity) {
    return ByteBuffer.allocateDirect(initialCapacity);
}

private void setByteBuffer(ByteBuffer buffer) {
    ByteBuffer oldBuffer = this.buffer;
    if (oldBuffer != null) {
        if (doNotFree) {
            doNotFree = false;
        } else {
            freeDirect(oldBuffer);
        }
    }

    this.buffer = buffer;
    memoryAddress = PlatformDependent.directBufferAddress(buffer);
    tmpNioBuf = null;
    capacity = buffer.remaining();
}

上面代碼的主要邏輯為:

1、先利用ByteBuffer的allocateDirect方法分配了大小為initialCapacity的緩存

2、然后判斷將舊緩存給free掉

3、最后將新緩存賦給字段buffer上

其中:memoryAddress = PlatformDependent.directBufferAddress(buffer) 獲取buffer的address字段值,指向緩存地址。
capacity = buffer.remaining() 獲取緩存容量。

接下來看toLeakAwareBuffer(buf)方法

protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
    ResourceLeak leak;
    switch (ResourceLeakDetector.getLevel()) {
        case SIMPLE:
            leak = AbstractByteBuf.leakDetector.open(buf);
            if (leak != null) {
                buf = new SimpleLeakAwareByteBuf(buf, leak);
            }
            break;
        case ADVANCED:
        case PARANOID:
            leak = AbstractByteBuf.leakDetector.open(buf);
            if (leak != null) {
                buf = new AdvancedLeakAwareByteBuf(buf, leak);
            }
            break;
    }
    return buf;
}

方法toLeakAwareBuffer(buf)對申請的buf又進行了一次包裝。

上面一長串的分析,得到了緩存后,回到AbstractNioByteChannel.read方法,繼續看。

doReadBytes方法

下面看下doReadBytes方法:將socketChannel數據寫入緩存。

NioSocketChannel.java

@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
}

將Channel中的數據讀入緩存byteBuf中。繼續看

WrappedByteBuf.java

@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
    return buf.writeBytes(in, length);
} 

AbstractByteBuf.java

@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
    ensureAccessible();
    ensureWritable(length);
    int writtenBytes = setBytes(writerIndex, in, length); if (writtenBytes > 0) {
        writerIndex += writtenBytes;
    }
    return writtenBytes;
}

這里的setBytes方法有不同的實現,這里看下UnpooledUnsafeDirectByteBuf的setBytes的實現。

UnpooledUnsafeDirectByteBuf.java

@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    ensureAccessible();
    ByteBuffer tmpBuf = internalNioBuffer();
    tmpBuf.clear().position(index).limit(index + length);
    try {
        return in.read(tmpBuf);
    } catch (ClosedChannelException ignored) {
        return -1;//當Channel 已經關閉,則返回-1.    
    }
} 

private ByteBuffer internalNioBuffer() {
    ByteBuffer tmpNioBuf = this.tmpNioBuf;
    if (tmpNioBuf == null) {
        this.tmpNioBuf = tmpNioBuf = buffer.duplicate();
    }
    return tmpNioBuf;
}

最終底層采用ByteBuffer實現read操作,無論是PooledByteBuf、還是UnpooledXXXBuf,里面都將底層數據結構BufBuffer/array轉換為ByteBuffer 來實現read操作。即無論是UnPooledXXXBuf還是PooledXXXBuf里面都有一個ByteBuffer tmpNioBuf,這個tmpNioBuf才是真正用來存儲從管道Channel中讀取出的內容的。到這里就完成了將channel的數據讀入到了緩存Buf中。

我們具體來看看 in.read(tmpBuf); FileChannel和SocketChannel的read最后都是依賴的IOUtil來實現,代碼如下

public int read(ByteBuffer dst) throws IOException {
    ensureOpen();
    if (!readable)
        throw new NonReadableChannelException();
    synchronized (positionLock) {
        int n = 0;
        int ti = -1;
        try {
            begin();
            ti = threads.add();
            if (!isOpen())
                return 0;
            do {
             n = IOUtil.read(fd, dst, -1, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            threads.remove(ti);
            end(n > 0);
            assert IOStatus.check(n);
        }
    }
}

最后目的就是將SocketChannel中的數據讀出存放到ByteBuffer dst我們看看 IOUtil.read(fd, dst, -1, nd)

static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
    if (var1.isReadOnly()) {
        throw new IllegalArgumentException("Read-only buffer");
    //如果最終承載數據的buffer是DirectBuffer,則直接將數據讀入到堆外內存中
    } else if (var1 instanceof DirectBuffer) {
        return readIntoNativeBuffer(var0, var1, var2, var4);
    } else {
        // 分配臨時的堆外內存
        ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());

        int var7;
        try {
            // Socket I/O 操作會將數據讀入到堆外內存中
            int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
            var5.flip();
            if (var6 > 0) {
                // 將堆外內存的數據拷貝到堆內存中(用戶定義的緩存,在jvm中分配內存)
                var1.put(var5);
            }

            var7 = var6;
        } finally {
            // 里面會調用DirectBuffer.cleaner().clean()來釋放臨時的堆外內存
            Util.offerFirstTemporaryDirectBuffer(var5);
        }

        return var7;
    }
}
通過上述實現可以看出,基於channel的數據讀取步驟如下:
1、如果緩存內存是DirectBuffer,就直接將Channel中的數據讀取到堆外內存
2、如果緩存內存是堆內存,則先申請一塊和緩存同大小的臨時 DirectByteBuffer var5。
3、將內核緩存中的數據讀到堆外緩存var5,底層由NativeDispatcher的read實現。
4、把堆外緩存var5的數據拷貝到堆內存var1(用戶定義的緩存,在jvm中分配內存)。
5、會調用DirectBuffer.cleaner().clean()來釋放創建的臨時的堆外內存
如果AbstractNioByteChannel.read中第一步創建的是堆外內存,則會直接將數據讀入到堆外內存,並不會先創建臨時堆外內存,再將數據讀入到堆外內存,最后將堆外內存拷貝到堆內存
簡單的說,如果使用堆外內存,則只會復制一次數據,如果使用堆內存,則會復制兩次數據
我們來看看readIntoNativeBuffer
private static int readIntoNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, long l, NativeDispatcher nativedispatcher, Object obj)  throws IOException  {  
    int i = bytebuffer.position();  
    int j = bytebuffer.limit();  
    //如果斷言開啟,buffer的position大於limit,則拋出斷言錯誤  
    if(!$assertionsDisabled && i > j)  
        throw new AssertionError();  
    //獲取需要讀的字節數  
    int k = i > j ? 0 : j - i;  
    if(k == 0)  
        return 0;  
    int i1 = 0;  
    //從輸入流讀取k個字節到buffer  
    if(l != -1L)  
        i1 = nativedispatcher.pread(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, l, obj);  
    else  
        i1 = nativedispatcher.read(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k);  
    //重新定位buffer的position  
    if(i1 > 0)  
        bytebuffer.position(i + i1);  
    return i1;  
}  
這個函數就是將內核緩沖區中的數據讀取到堆外緩存DirectBuffer

回到AbstractNioByteChannel.read方法,繼續看。

@Override
public void read() {
        //...
        try {
            int totalReadAmount = 0;
            boolean readPendingReset = false;
            do {
                byteBuf = allocHandle.allocate(allocator);
                int writable = byteBuf.writableBytes();
                int localReadAmount = doReadBytes(byteBuf);
                if (localReadAmount <= 0) {
                    // not was read release the buffer
                    byteBuf.release();
                    close = localReadAmount < 0;
                    break;
                }
                if (!readPendingReset) {
                    readPendingReset = true;
                    setReadPending(false);
                }
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;

                if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                    // Avoid overflow.
                    totalReadAmount = Integer.MAX_VALUE;
                    break;
                }

                totalReadAmount += localReadAmount;

                // stop reading
                if (!config.isAutoRead()) {
                    break;
                }

                if (localReadAmount < writable) {
                    // Read less than what the buffer can hold,
                    // which might mean we drained the recv buffer completely.
                    break;
                }
            } while (++ messages < maxMessagesPerRead);

            pipeline.fireChannelReadComplete();
            allocHandle.record(totalReadAmount);

            if (close) {
                closeOnRead(pipeline);
                close = false;
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close);
        } finally {
            if (!config.isAutoRead() && !isReadPending()) {
                removeReadOp();
            }
        }
    }
}

int localReadAmount = doReadBytes(byteBuf);
1、如果返回0,則表示沒有讀取到數據,則退出循環。
2、如果返回-1,表示對端已經關閉連接,則退出循環。
3、否則,表示讀取到了數據,數據讀入緩存后,觸發pipeline的ChannelRead事件,byteBuf作為參數進行后續處理,這時自定義Inbound類型的handler就可以進行業務處理了。Pipeline的事件處理在我之前的博文中有詳細的介紹。處理完成之后,再一次從Channel讀取數據,直至退出循環。

4、循環次數超過maxMessagesPerRead時,即只能在管道中讀取maxMessagesPerRead次數據,既是還沒有讀完也要退出。在上篇博文中,Boss線程接受客戶端連接也用到了此變量,即當boss線程 selector檢測到OP_ACCEPT事件后一次只能接受maxMessagesPerRead個客戶端連接

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM