Netty 源碼分析之ByteBuf


Netty 源碼分析之ByteBuf


ByteBuf基礎

Java Nio 的Buffer

在進行數據傳輸的過程中,我們經常會用到緩沖區。
在Java NIO 為我們提供了原生的七種緩沖區實現,對應着Java 的七種基本類型。一般使用ByteBuffer較多。原生的Buffer雖然能滿足我們的日常使用,但是要進行復雜的應用的時候,確有點力不從心了,原生Buffer存在着以下缺點。因此Netty對其進行了封裝,提供了更為友好的接口供我們使用。

  • 當我們調用對應Buffer類的allocate方法來創建緩沖區實例的時候,會分配指定的空間,同時緩沖區的長度就會被固定,不能進行動態的增長或者收縮。如果我們寫入的數據大於緩沖區的capacity的時候,就會發生數組越界錯誤。
  • Buffer只有一個位置標志位屬性Position,我們只能flip或者rewind方法來對position進行修改來處理數據的存取位置,一不小心就可能會導致錯誤。
    
  • Buffer只提供了存取、翻轉、釋放、標志、比較、批量移動等緩沖區的基本操作,我們想使用高級的功能,就得自己手動進行封裝及維護,使用非常不方便。
    

ByteBuf工作原理

ByteBuf也是通過字節數組作為緩沖區來存取數據,通過外觀模式聚合了JDK NIO元素的ByteBuffer,進行封裝。
ByteBuf是通過readerIndex跟writerIndex兩個位置指針來協助緩沖區的讀寫操作的。
在對象初始化的時候,readerIndex和writerIndex的值為0,隨着讀操作和寫操作的進行,writerIndex和readerIndex都會增加,不過readerIndex不能超過writerIndex,在進行讀取操作之后,0到readerIndex之間的空間會被視為discard,調用ByteBuf的discardReadBytes方法,可以對這部分空間進行釋放重用,類似於ByteBuffer的compact操作,對緩沖區進行壓縮。readerIndex到writerIndex的空間,相當於ByteBuffer的position到limit的空間,可以對其進行讀取,WriterIndex到capacity的空間,則相當於ByteBuffer的limit到capacity的空間,是可以繼續寫入的。
readerIndex跟writerIndex讓讀寫操作的位置指針分離,不需要對同一個位置指針進行調整,簡化了緩沖區的讀寫操作。
同樣,ByteBuf對讀寫操作進行了封裝,提供了動態擴展的能力,當我們對緩沖區進行寫操作的時候,需要對剩余的可用空間進行校驗,如果可用空間不足,同時要寫入的字節數小於可寫的最大字節數,會對緩沖區進行動態擴展,它會重新創建一個緩沖區,然后將以前的數據復制到新創建的緩沖區中,

ByteBuf基本功能

  • 順序讀
    在進行讀操作之前,首先對緩沖區可用的空間進行校驗。如果要讀取的字節長度小於0,就會拋出IllegalArgumentException異常,如果要讀取的字節長度大於已寫入的字節長度,會拋出IndexOutOfBoundsException異常。通過校驗之后,調用getBytes方法,從當前的readerIndex開始,讀取length長度的字節數據到目標dst中,由於不同的子類實現不一樣,getBytes是個抽象方法,由對應的子類去實現。如果讀取數據成功,readerIndex將會增加相應的length。
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
    checkReadableBytes(length);
    getBytes(readerIndex, dst, dstIndex, length);
    readerIndex += length;
    return this;
}
protected final void checkReadableBytes(int minimumReadableBytes) {
    ensureAccessible();
    if (minimumReadableBytes < 0) {
        throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
    }
    if (readerIndex > writerIndex - minimumReadableBytes) {
        throw new IndexOutOfBoundsException(String.format(
                "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
                readerIndex, minimumReadableBytes, writerIndex, this));
    }
}
  • 順序寫
    讀操作是將源字節數組從srcIndex開始,length長度的數據寫入到當前的ByteBuf中的。
    一開始需要對寫入數組的字節數進行校驗,如果寫入長度小於0,將會拋出IllegalArgumentException異常,如果寫入字節數小於當前ByteBuf的可寫入字節數,則通過檢驗。如果寫入字節數大於緩沖區最大可動態擴展的容量maxCapacity,就會拋出
    IndexOutOfBoundsException異常,否則的話,就會通過動態擴展來滿足寫入需要的字節數。首先通過calculateNewCapacity計算出重新擴展后的容量,然后調用capacity方法進行擴展,不同的子類有不同實現,所以也是一個抽象方法。
    • 計算擴展容量,首先設置門閥值為4m,如果要擴展的容量等於閥值就使用閥值作為緩沖區新的容量,如果大於閥值就以4M作為步長,每次增加4M,如果擴展期間,要擴展的容量比最大可擴展容量還大的話,就以最大可擴展容量maxCapacity為新的容量。否則的話,就從64開始倍增,直到倍增之后的結果大於要擴展的容量,再把結果作為緩沖區的新容量。
    • 通過先倍增再步長來擴展容量,如果我們只是writerIndex+length的值作為緩沖區的新容量,那么再以后進行寫操作的時候,每次都需要進行容量擴展,容量擴展的過程需要進行內存復制,過多內存復制會導致系統的性能下降,之所以是倍增再部長,在最初空間比較小的時候,倍增操作並不會帶來太多的內存浪費,但是內存增長到一定的時候,再進行倍增的時候,就會對內存造成浪費,因此,需要設定一個閥值,到達閥值之后就通過步長的方法進行平滑的增長。
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
    ensureWritable(length);
    setBytes(writerIndex, src, srcIndex, length);
    writerIndex += length;
    return this;
}
public ByteBuf ensureWritable(int minWritableBytes) {
    if (minWritableBytes < 0) {
        throw new IllegalArgumentException(String.format(
                "minWritableBytes: %d (expected: >= 0)", minWritableBytes));
    }

    if (minWritableBytes <= writableBytes()) {
        return this;
    }

    if (minWritableBytes > maxCapacity - writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                writerIndex, minWritableBytes, maxCapacity, this));
    }

    // Normalize the current capacity to the power of 2.
    int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);

    // Adjust to the new capacity.
    capacity(newCapacity);
    return this;
}
private int calculateNewCapacity(int minNewCapacity) {
    final int maxCapacity = this.maxCapacity;
    final int threshold = 1048576 * 4; // 4 MiB page

    if (minNewCapacity == threshold) {
        return threshold;
    }

    // If over threshold, do not double but just increase by threshold.
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } else {
            newCapacity += threshold;
        }
        return newCapacity;
    }

    // Not over threshold. Double up to 4 MiB, starting from 64.
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}
//UnpooledHeapByteBuf的capacity實現
public ByteBuf capacity(int newCapacity) {
    ensureAccessible();
    if (newCapacity < 0 || newCapacity > maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
    }

    int oldCapacity = array.length;
    if (newCapacity > oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        System.arraycopy(array, 0, newArray, 0, array.length);
        setArray(newArray);
    } else if (newCapacity < oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        int readerIndex = readerIndex();
        if (readerIndex < newCapacity) {
            int writerIndex = writerIndex();
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
            }
            System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
        } else {
            setIndex(newCapacity, newCapacity);
        }
        setArray(newArray);
    }
    return this;
}
  • Clear操作
    clear操作只是把readerIndex和writerIndex設置為0,不會對存儲的數據進行修改。
public ByteBuf clear() {
    readerIndex = writerIndex = 0;
    return this;
}
  • 索引操作

    • 讀寫位置索引設置:主要是對邊界條件進行校驗,設置readerIndex的時候,newReaderIndex不能小於0跟大於writerIndex;設置writerIndex的時候,newWriterIndex必須大於readerIndex和小於當前的capacity。如果不能通過校驗的話,就會拋出IndexOutOfBoundsException異常。
    • mark和reset操作:由於有readerIndex和writerIndex,因此進行mark或者reset需要指定相應的操作位置索引,mark操作會把當前的readerIndex或者writerIndex設置為markedReaderIndex或者markedWriterIndex;reset操作的話,它是參入對應的mark值調用對應readerIndex()或者writerIndex();
  • 緩沖區重用
    可以通過discardReadByte方法去重用已經讀取過的緩沖區。
    首先對readerIndex進行判斷:

    • 如果readerIndex等於0,就說明沒有讀取數據,沒有可以用來重用的空間,直接返回;
    • 如果readerIndex大於0且不等於writerIndex的話,說明有進行數據讀取被丟棄的緩沖區,也有還沒有被讀取的緩沖區。調用setBytes方法進行字節數組的復制,將沒被讀取的數據移動到緩沖區的起始位置,重新去設置readerIndex和writerIndex,readerIndex為0,writerIndex為原writerIndex-readerIndex;同時,也需要對mark進行重新設置。
      • 首先對markedReaderIndex進行備份然后跟decrement進行比較,如果markedReaderIndex比decrement小的話,markedReaderIndex設置為0,再用markedWriterIndex跟decrement比較,如果小於的話,markedWriterIndex也設置為0,否則的話markedWriterIndex較少decrement;
      • 如果markedReaderIndex比decrement大的話,markedReaderIndex和markedReaderIndex都減去decrement就可以了。
    • 如果readerIndex等於writerIndex的話,說明沒有可以進行重用的緩沖區,直接對mark重新設置就可以了,不需要內存復制。
public ByteBuf discardReadBytes() {
    ensureAccessible();
    if (readerIndex == 0) {
        return this;
    }

    if (readerIndex != writerIndex) {
        setBytes(0, this, readerIndex, writerIndex - readerIndex);
        writerIndex -= readerIndex;
        adjustMarkers(readerIndex);
        readerIndex = 0;
    } else {
        adjustMarkers(readerIndex);
        writerIndex = readerIndex = 0;
    }
    return this;
}
protected final void adjustMarkers(int decrement) {
    int markedReaderIndex = this.markedReaderIndex;
    if (markedReaderIndex <= decrement) {
        this.markedReaderIndex = 0;
        int markedWriterIndex = this.markedWriterIndex;
        if (markedWriterIndex <= decrement) {
            this.markedWriterIndex = 0;
        } else {
            this.markedWriterIndex = markedWriterIndex - decrement;
        }
    } else {
        this.markedReaderIndex = markedReaderIndex - decrement;
        markedWriterIndex -= decrement;
    }
}
  • skipBytes

當我們需要跳過某些不需要的字節的時候,可以調用skipBytes方法來跳過指定長度的字節來讀取后面的數據。
首先對跳躍長度進行判斷,如果跳躍長度小於0的話,會拋出IllegalArgumentException異常,或者跳躍長度大於當前緩沖區可讀長度的話,會拋出IndexOutOfBoundsException異常。如果校驗通過,新的readerindex為原readerIndex+length,如果新的readerIndex大於writerIndex的話,會拋出IndexOutOfBoundsException異常,否則就更新readerIndex。

public ByteBuf skipBytes(int length) {
    checkReadableBytes(length);
    int newReaderIndex = readerIndex + length;
    if (newReaderIndex > writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "length: %d (expected: readerIndex(%d) + length <= writerIndex(%d))",
                length, readerIndex, writerIndex));
    }
    readerIndex = newReaderIndex;
    return this;
}

ByteBuf源碼分析

ByteBuf

AbstractReferenceCountedByteBuf

AbstractReferenceCountedByteBuf是ByteBuf實現對引用進行計數的基類,用來跟蹤對象的分配和銷毀,實現自動內存回收。

  • 成員變量
    • refCntUpdater refCntUpdater是一個AtomicIntegerFieldUpdater類型的成員變量,它可以對成員變量進行原子性更新操作,達到線程安全。
    • REFCNT_FIELD_OFFSET REFCNT_FIELD_OFFSET是標識refCnt字段在AbstractReferenceCountedByteBuf的內存地址,在UnpooledDirectByteBuf和PooledDirectByteBuf兩個子類中都會使用到這個偏移量。
    • refCnt volatile修飾保證變量的線程可見性,用來跟蹤對象的引用次數
  • 對象引用計數器
    每調用retain方法一次,引用計數器就會加一。retain方法通過自旋對引用計數器進行加一操作,引用計數器的初始值為1,只要程序是正確執行的話,它的最小值應該為1,當申請和釋放次數相等的時候,對應的ByteBuf就會被回收。當次數為0時,表明對象被錯誤的引用,就會拋出IllegalReferenceCountException異常,如果次數等於Integer類型的最大值,就會拋出
    IllegalReferenceCountException異常。retain通過refCntUpdater的compareAndSet方法進行原子操作更新,compareAndSet會使用獲取的值與期望值進行比較,如果在比較器件,有其他線程對變量進行修改,那么比較失敗,會再次自旋,獲取引用計數器的值再次進行比較,否則的話,就會進行加一操作,退出自旋。
    release方法的話與retain方法類似,也是通過自旋循環進行判斷和更新,不過當refCnt的值等於1的時候,表明引用計數器的申請跟釋放次數一樣,對象引用已經不可達了,對象應該要被垃圾收集回收掉了,調用deallocate方法釋放ByteBuf對象
public ByteBuf retain() {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt == 0) {
            throw new IllegalReferenceCountException(0, 1);
        }
        if (refCnt == Integer.MAX_VALUE) {
            throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
        }
        if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
            break;
        }
    }
    return this;
}
    
public final boolean release() {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt == 0) {
            throw new IllegalReferenceCountException(0, -1);
        }

        if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
            if (refCnt == 1) {
                deallocate();
                return true;
            }
            return false;
        }
    }
}

UnpooledHeapByteBuf

UnpooledHeapByteBuf是一個非線程池實現的在堆內存進行內存分配的字節緩沖區,在每次IO操作的都會去創建一個UnpooledHeapByteBuf對象,如果頻繁地對內存進行分配或者釋放會對性能造成影響。

  • 成員變量
    • ByteBufAllocator 用於內存分配
    • array 字節數組作為緩沖區,用於存儲字節數據
    • ByteBuffer 用來實現Netty ByteBuf 到Nio ByteBuffer的變換
  • 動態擴展緩沖區
    調用capacity方法動態擴展緩沖區,首先要對擴展容量進行校驗,如果新容量的大小小於0或者大於最大可擴展容量maxCapacity的話,拋出IllegalArgumentException異常。
    通過校驗之后,如果新擴展容量比原來大的話,則創建一個新的容量為新擴展容量的字節數組緩沖區,然后調用System.arraycopy進行內存復制,將舊的數據復制到新數組中去,然后用setArray進行數組替換。動態擴展之后需要原來的視圖tmpNioBuffer設置為控。
    如果新的容量小於當前緩沖區容量的話,不需要進行動態擴展,但是需要截取部分數據作為子緩沖區。
    • 首先對當前的readerIndex是否小於newCapacity,如果小於的話繼續對writerIndex跟newCapacity進行比較,如果writerIndex大於newCapacity的話,就將writerIndex設置為newCapacity,更新完索引之后就通過System.arrayCopy內存復制將當前可讀的數據復制到新的緩沖區字節數組中。
    • 如果newCapacity小於readerIndex的話,說明沒有新的可讀數據要復制到新的字節數組緩沖區中,只需要把writerIndex跟readerIndex都更新為newCapacity既可,最后調用setArray更換字節數組。
 public ByteBuf capacity(int newCapacity) {
    ensureAccessible();
    if (newCapacity < 0 || newCapacity > maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
    }

    int oldCapacity = array.length;
    if (newCapacity > oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        System.arraycopy(array, 0, newArray, 0, array.length);
        setArray(newArray);
    } else if (newCapacity < oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        int readerIndex = readerIndex();
        if (readerIndex < newCapacity) {
            int writerIndex = writerIndex();
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
            }
            System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
        } else {
            setIndex(newCapacity, newCapacity);
        }
        setArray(newArray);
    }
    return this;
}

  • setBytes
    字節數組復制,首先對數據進行合法性檢驗,如果srcIndex或者index的值小於0,就會拋出IllegalArgumentException,如果index+length的值大於capacity的值或者srcIndex+length的值大於src.length的話,就會拋出IndexOutOfBoundsException異常。通過校驗之后,就調用System.arraycopy進行字節數組復制。
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
    checkSrcIndex(index, length, srcIndex, src.length);
    System.arraycopy(src, srcIndex, array, index, length);
    return this;
}
protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) {
    checkIndex(index, length);
    if (srcIndex < 0 || srcIndex > srcCapacity - length) {
        throw new IndexOutOfBoundsException(String.format(
                "srcIndex: %d, length: %d (expected: range(0, %d))", srcIndex, length, srcCapacity));
    }
}

  • Netty ByteBuf與Nio ByteBuffer轉換
    要將Netty的ByteBuf轉化為Nio ByteBuffer,在ByteBuffer中有wrap靜態方法,只需要傳入對應的字節數組即可創建轉化為ByteBuffer,在nioBuffer方法還調用了slice方法,它可以創建一個從原ByteBuffer的position開始緩沖區,與原緩沖區共享同一段數據元素。nioBuffer方法不會重用緩沖區,只能保證writerIndex跟readerIndex的獨立性。
public ByteBuffer nioBuffer(int index, int length) {
    ensureAccessible();
    return ByteBuffer.wrap(array, index, length).slice();
}

PooledByteBuf

在Netty4之后加入內存池管理,通過內存池管理比之前ByteBuf的創建性能得到了極大提高。

  • PoolChunk
    • Page 可以用來分配的最小內存塊單位
    • Chunk page的集合

PoolChunk主要負責內存塊的分配及釋放,chunk中的page會構建成一顆二叉樹,默認情況下page的大小是8K,chunk的大小是2^11 page,即16M,構成了11層的二叉樹,最下面一層的葉子節點有8192個,與page的數目一樣,每一次內存的分配必須保證連續性,方便內存操作。每個節點會記錄自己在Memory Area的偏移地址,當一個節點表示的內存區域被分配之后,那么該節點會被標志為已分配,該節點的所有子節點的內存請求都會忽略。每次內存分配的都是8k(2^n)大小的內存塊,當需要分配大小為chunkSize/(2^k)的內存端時,為了找到可用的內存段,會從第K層左邊開始尋找可用節點。

  • PoolArena

在內存分配中,為了能夠集中管理內存的分配及釋放,同時提供分配和釋放內存的性能,一般都是會先預先分配一大塊連續的內存,不需要重復頻繁地進行內存操作,那一大塊連續的內存就叫做memory Arena,而PoolArena是Netty的內存池實現類。
在Netty中,PoolArena是由多個Chunk組成的,而每個Chunk則由多個Page組成。PoolArena是由Chunk和Page共同組織和管理的。

  • PoolSubpage

當對於小於一個Page的內存分配的時候,每個Page會被划分為大小相等的內存塊,它的大小是根據第一次申請內存分配的內存塊大小來決定的。一個Page只能分配與第一次內存內存的內存塊的大小相等的內存塊,如果想要想要申請大小不想等的內存塊,只能在新的Page上申請內存分配了。
Page中的存儲區域的使用情況是通過一個long數組bitmap來維護的,每一位表示一個區域的占用情況。

PooledDirectByteBuf

  • 創建字節緩沖區
    由於內存池實現,每次創建字節緩沖區的時候,不是直接new,而是從內存池中去獲取,然后設置引用計數器跟讀寫Index,跟緩沖區最大容量返回。
static PooledHeapByteBuf newInstance(int maxCapacity) {
    PooledHeapByteBuf buf = RECYCLER.get();
    buf.reuse(maxCapacity);
    return buf;
}
final void reuse(int maxCapacity) {
    maxCapacity(maxCapacity);
    setRefCnt(1);
    setIndex0(0, 0);
    discardMarks();
}
  • 復制字節緩沖區實例
    copy方法可以復制一個字節緩沖區實例,與原緩沖區獨立。
    首先要對index和length進行合法性判斷,然后調用PooledByteBufAllocator的directBuffer方法分配一個新的緩沖區。newDirectBuffer方法是一個抽象方法,對於不同的子類有不同的實現。如果是unpooled的話,會直接創建一個新的緩沖區,如果是pooled的話,它會從內存池中獲取一個可用的緩沖區。
public ByteBuf copy(int index, int length) {
    checkIndex(index, length);
    ByteBuf copy = alloc().directBuffer(length, maxCapacity());
    copy.writeBytes(this, index, length);
    return copy;
}
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);
    return newDirectBuffer(initialCapacity, maxCapacity);
}
// PooledByteBufAllocator 
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;

    ByteBuf buf;
    if (directArena != null) {
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        if (PlatformDependent.hasUnsafe()) {
            buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
    }

    return toLeakAwareBuffer(buf);
}
//UnpooledByteBufAllocator
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    ByteBuf buf;
    if (PlatformDependent.hasUnsafe()) {
        buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
}

ByteBuf輔助類分析

ByteBufHolder

ByteBufHolder是ByteBuf的一個容器,它可以更方便地訪問ByteBuf中的數據,在使用不同的協議進行數據傳輸的時候,不同的協議消息體包含的數據格式和字段不一樣,所以抽象一個ByteBufHolder對ByteBuf進行包裝,不同的子類有不同的實現,使用者可以根據自己的需要進行實現。Netty提供了一個默認實現DefaultByteBufHolder。

ByteBufAllocator

ByteBufAllocator是字節緩沖區分配器,根據Netty字節緩沖區的實現不同,分為兩種不同的分配器PooledByteBufAllocator和UnpooledByteBufAllocator。他們提供了不同ByteBuf的分配方法。

CompositeByteBuf

CompositeByteBuf是一個虛擬的Buffer,它可以將多個ByteBuf組裝為一個ByteBuf視圖。
在Java NIO中,我們有兩種實現的方法

  • 將其他ByteBuffer的數據復制到一個ByteBuffer中,或者重新創建一個新的ByteBuffer,將其他的ByteBuffer復制到新建的ByteBuffer中。
  • 通過容器將多個ByteBuffer存儲在一起,進行統一的管理和維護。

在Netty中,CompositeByByteBuf中維護了一個Component類型的集合。Component是ByteBuf的包裝類,它聚合了ByteBuf.維護在集合中的位置偏移量等信息。一般情況下,我們應該使用ByteBufAllocator.compositeBuffer()和Unpooled.wrappedBuffer(ByteBuf...)方法來創建CompositeByteBuf,而不是直接通過構造函數去實例化一個CompositeByteBuf對象。

private int addComponent0(int cIndex, ByteBuf buffer) {
    checkComponentIndex(cIndex);
    if (buffer == null) {
        throw new NullPointerException("buffer");
    }

    int readableBytes = buffer.readableBytes();

    // No need to consolidate - just add a component to the list.
    Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
    if (cIndex == components.size()) {
        components.add(c);
        if (cIndex == 0) {
            c.endOffset = readableBytes;
        } else {
            Component prev = components.get(cIndex - 1);
            c.offset = prev.endOffset;
            c.endOffset = c.offset + readableBytes;
        }
    } else {
        components.add(cIndex, c);
        if (readableBytes != 0) {
            updateComponentOffsets(cIndex);
        }
    }
    return cIndex;
}
private void consolidateIfNeeded() {
    final int numComponents = components.size();
    if (numComponents > maxNumComponents) {
        final int capacity = components.get(numComponents - 1).endOffset;
    
        ByteBuf consolidated = allocBuffer(capacity);
    
        for (int i = 0; i < numComponents; i ++) {
            Component c = components.get(i);
            ByteBuf b = c.buf;
            consolidated.writeBytes(b);
            c.freeIfNecessary();
        }
        Component c = new Component(consolidated);
        c.endOffset = c.length;
        components.clear();
        components.add(c);
    }
}

public CompositeByteBuf removeComponent(int cIndex) {
    checkComponentIndex(cIndex);
    Component comp = components.remove(cIndex);
    comp.freeIfNecessary();
    if (comp.length > 0) {
        updateComponentOffsets(cIndex);
    }
    return this;
}

private static final class Component {
    final ByteBuf buf;
    final int length;
    int offset;
    int endOffset;

    Component(ByteBuf buf) {
        this.buf = buf;
        length = buf.readableBytes();
    }

    void freeIfNecessary() {
        buf.release(); // We should not get a NPE here. If so, it must be a bug.
    }
}

ByteBufUtil

ByteBufUtil是ByteBuf的工具類,它提供了一系列的靜態方法來操作ByteBuf。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM