高性能無鎖隊列 Mpsc Queue


JDK 原生並發隊列

JDK 並發隊列按照實現方式可以分為阻塞隊列和非阻塞隊列兩種類型,阻塞隊列是基於鎖實現的,非阻塞隊列是基於 CAS 操作實現的。JDK 中包含多種阻塞和非阻塞的隊列實現,如下圖所示。

隊列是一種 FIFO(先進先出)的數據結構,JDK 中定義了 java.util.Queue 的隊列接口,與 List、Set 接口類似,java.util.Queue 也繼承於 Collection 集合接口。此外,JDK 還提供了一種雙端隊列接口 java.util.Deque,我們最常用的 LinkedList 就是實現了 Deque 接口。下面我們簡單說說上圖中的每個隊列的特點,並給出一些對比和總結。

阻塞隊列

阻塞隊列在隊列為空或者隊列滿時,都會發生阻塞。阻塞隊列自身是線程安全的,使用者無需關心線程安全問題,降低了多線程開發難度。阻塞隊列主要分為以下幾種:

  • ArrayBlockingQueue:最基礎且開發中最常用的阻塞隊列,底層采用數組實現的有界隊列,初始化需要指定隊列的容量。ArrayBlockingQueue 是如何保證線程安全的呢?它內部是使用了一個重入鎖 ReentrantLock,並搭配 notEmpty、notFull 兩個條件變量 Condition 來控制並發訪問。從隊列讀取數據時,如果隊列為空,那么會阻塞等待,直到隊列有數據了才會被喚醒。如果隊列已經滿了,也同樣會進入阻塞狀態,直到隊列有空閑才會被喚醒。
  • LinkedBlockingQueue:內部采用的數據結構是鏈表,隊列的長度可以是有界或者無界的,初始化不需要指定隊列長度,默認是 Integer.MAX_VALUE。LinkedBlockingQueue 內部使用了 takeLock、putLock兩個重入鎖 ReentrantLock,以及 notEmpty、notFull 兩個條件變量 Condition 來控制並發訪問。采用讀鎖和寫鎖的好處是可以避免讀寫時相互競爭鎖的現象,所以相比於 ArrayBlockingQueue,LinkedBlockingQueue 的性能要更好。
  • PriorityBlockingQueue:采用最小堆實現的優先級隊列,隊列中的元素按照優先級進行排列,每次出隊都是返回優先級最高的元素。PriorityBlockingQueue 內部是使用了一個 ReentrantLock 以及一個條件變量 Condition notEmpty 來控制並發訪問,不需要 notFull 是因為 PriorityBlockingQueue 是無界隊列,所以每次 put 都不會發生阻塞。PriorityBlockingQueue 底層的最小堆是采用數組實現的,當元素個數大於等於最大容量時會觸發擴容,在擴容時會先釋放鎖,保證其他元素可以正常出隊,然后使用 CAS 操作確保只有一個線程可以執行擴容邏輯。
  • DelayQueue,一種支持延遲獲取元素的阻塞隊列,常用於緩存、定時任務調度等場景。DelayQueue 內部是采用優先級隊列 PriorityQueue 存儲對象。DelayQueue 中的每個對象都必須實現 Delayed 接口,並重寫 compareTo 和 getDelay 方法。向隊列中存放元素的時候必須指定延遲時間,只有延遲時間已滿的元素才能從隊列中取出。
  • SynchronizedQueue,又稱無緩沖隊列。比較特別的是 SynchronizedQueue 內部不會存儲元素。與 ArrayBlockingQueue、LinkedBlockingQueue 不同,SynchronizedQueue 直接使用 CAS 操作控制線程的安全訪問。其中 put 和 take 操作都是阻塞的,每一個 put 操作都必須阻塞等待一個 take 操作,反之亦然。所以 SynchronizedQueue 可以理解為生產者和消費者配對的場景,雙方必須互相等待,直至配對成功。在 JDK 的線程池 Executors.newCachedThreadPool 中就存在 SynchronousQueue 的運用,對於新提交的任務,如果有空閑線程,將重復利用空閑線程處理任務,否則將新建線程進行處理。
  • LinkedTransferQueue,一種特殊的無界阻塞隊列,可以看作 LinkedBlockingQueues、SynchronousQueue(公平模式)、ConcurrentLinkedQueue 的合體。與 SynchronousQueue 不同的是,LinkedTransferQueue 內部可以存儲實際的數據,當執行 put 操作時,如果有等待線程,那么直接將數據交給對方,否則放入隊列中。與 LinkedBlockingQueues 相比,LinkedTransferQueue 使用 CAS 無鎖操作進一步提升了性能。

非阻塞隊列

說完阻塞隊列,我們再來看下非阻塞隊列。非阻塞隊列不需要通過加鎖的方式對線程阻塞,並發性能更好。JDK 中常用的非阻塞隊列有以下幾種:

  • ConcurrentLinkedQueue,它是一個采用雙向鏈表實現的無界並發非阻塞隊列,它屬於 LinkedQueue 的安全版本。ConcurrentLinkedQueue 內部采用 CAS 操作保證線程安全,這是非阻塞隊列實現的基礎,相比 ArrayBlockingQueue、LinkedBlockingQueue 具備較高的性能。
  • ConcurrentLinkedDeque,也是一種采用雙向鏈表結構的無界並發非阻塞隊列。與 ConcurrentLinkedQueue 不同的是,ConcurrentLinkedDeque 屬於雙端隊列,它同時支持 FIFO 和 FILO 兩種模式,可以從隊列的頭部插入和刪除數據,也可以從隊列尾部插入和刪除數據,適用於多生產者和多消費者的場景。

至此,常見的隊列類型我們已經介紹完了。我們在平時開發中使用頻率最高的是 BlockingQueue。

我們可以通過下面一張表格,對上述 BlockingQueue 接口的具體行為進行歸類。

JDK 提供的並發隊列已經能夠滿足我們大部分的需求,但是在大規模流量的高並發系統中,如果對性能要求嚴苛,JDK 的非阻塞並發隊列可選擇面較少且性能並不夠出色。如果需要一個數組 + CAS 操作實現的無鎖安全隊列就需要第三方框架提供的高性能無鎖隊列,其中非常出名的有 Disruptor 和 JCTools。

Disruptor 是 LMAX 公司開發的一款高性能無鎖隊列,我們平時常稱它為 RingBuffer,其設計初衷是為了解決內存隊列的延遲問題。Disruptor 內部采用環形數組和 CAS 操作實現,性能非常優越。為什么 Disruptor 的性能會比 JDK 原生的無鎖隊列要好呢?環形數組可以復用內存,減少分配內存和釋放內存帶來的性能損耗。而且數組可以設置長度為 2 的次冪,直接通過位運算加快數組下標的定位速度。此外,Disruptor 還解決了偽共享問題,對 CPU Cache 更加友好。Disruptor 已經開源,詳細可查閱 Github 地址 https://github.com/LMAX-Exchange/disruptor

JCTools 也是一個開源項目,Github 地址為 https://github.com/JCTools/JCTools。JCTools 是適用於 JVM 並發開發的工具,主要提供了一些 JDK 確實的並發數據結構,例如非阻塞 Map、非阻塞 Queue 等。其中非阻塞隊列可以分為四種類型,可以根據不同的場景選擇使用。

  • Spsc 單生產者單消費者;
  • Mpsc 多生產者單消費者;
  • Spmc 單生產者多消費者;
  • Mpmc 多生產者多消費者。

Mpsc Queue 基礎知識

Mpsc 的全稱是 Multi Producer Single Consumer,多生產者單消費者。Mpsc Queue 可以保證多個生產者同時訪問隊列是線程安全的,而且同一時刻只允許一個消費者從隊列中讀取數據。Netty Reactor 線程中任務隊列 taskQueue 必須滿足多個生產者可以同時提交任務,所以 JCTools 提供的 Mpsc Queue 非常適合 Netty Reactor 線程模型。

Mpsc Queue 有多種的實現類,例如 MpscArrayQueue、MpscUnboundedArrayQueue、MpscChunkedArrayQueue 等。

首先我們看下 MpscArrayQueue 的繼承關系

 

 

 除了頂層 JDK 原生的 AbstractCollection、AbstractQueue,MpscArrayQueue 還繼承了很多類似於 MpscXxxPad 以及 MpscXxxField 的類。我們可以發現一個很有意思的規律,每個有包含屬性的類后面都會被 MpscXxxPad 類隔開。MpscXxxPad 到底起到什么作用呢?我們自頂向下,將所有類的字段合並在一起,看下 MpscArrayQueue 的整體結構。

// ConcurrentCircularArrayQueueL0Pad
    byte b000,b001,b002,b003,b004,b005,b006,b007;//  8b
    byte b010,b011,b012,b013,b014,b015,b016,b017;// 16b
    byte b020,b021,b022,b023,b024,b025,b026,b027;// 24b
    byte b030,b031,b032,b033,b034,b035,b036,b037;// 32b
    byte b040,b041,b042,b043,b044,b045,b046,b047;// 40b
    byte b050,b051,b052,b053,b054,b055,b056,b057;// 48b
    byte b060,b061,b062,b063,b064,b065,b066,b067;// 56b
    byte b070,b071,b072,b073,b074,b075,b076,b077;// 64b
    byte b100,b101,b102,b103,b104,b105,b106,b107;// 72b
    byte b110,b111,b112,b113,b114,b115,b116,b117;// 80b
    byte b120,b121,b122,b123,b124,b125,b126,b127;// 88b
    byte b130,b131,b132,b133,b134,b135,b136,b137;// 96b
    byte b140,b141,b142,b143,b144,b145,b146,b147;//104b
    byte b150,b151,b152,b153,b154,b155,b156,b157;//112b
    byte b160,b161,b162,b163,b164,b165,b166,b167;//120b
    byte b170,b171,b172,b173,b174,b175,b176,b177;//128b
// ConcurrentCircularArrayQueue
    protected final long mask;
    protected final E[] buffer;
// MpmcArrayQueueL1Pad
    byte b000,b001,b002,b003,b004,b005,b006,b007;//  8b
    byte b010,b011,b012,b013,b014,b015,b016,b017;// 16b
    byte b020,b021,b022,b023,b024,b025,b026,b027;// 24b
    byte b030,b031,b032,b033,b034,b035,b036,b037;// 32b
    byte b040,b041,b042,b043,b044,b045,b046,b047;// 40b
    byte b050,b051,b052,b053,b054,b055,b056,b057;// 48b
    byte b060,b061,b062,b063,b064,b065,b066,b067;// 56b
    byte b070,b071,b072,b073,b074,b075,b076,b077;// 64b
    byte b100,b101,b102,b103,b104,b105,b106,b107;// 72b
    byte b110,b111,b112,b113,b114,b115,b116,b117;// 80b
    byte b120,b121,b122,b123,b124,b125,b126,b127;// 88b
    byte b130,b131,b132,b133,b134,b135,b136,b137;// 96b
    byte b140,b141,b142,b143,b144,b145,b146,b147;//104b
    byte b150,b151,b152,b153,b154,b155,b156,b157;//112b
    byte b160,b161,b162,b163,b164,b165,b166,b167;//120b
// MpmcArrayQueueProducerIndexField
    private volatile long producerIndex;
// MpscArrayQueueMidPad
    byte b000,b001,b002,b003,b004,b005,b006,b007;//  8b
    byte b010,b011,b012,b013,b014,b015,b016,b017;// 16b
    byte b020,b021,b022,b023,b024,b025,b026,b027;// 24b
    byte b030,b031,b032,b033,b034,b035,b036,b037;// 32b
    byte b040,b041,b042,b043,b044,b045,b046,b047;// 40b
    byte b050,b051,b052,b053,b054,b055,b056,b057;// 48b
    byte b060,b061,b062,b063,b064,b065,b066,b067;// 56b
    byte b070,b071,b072,b073,b074,b075,b076,b077;// 64b
    byte b100,b101,b102,b103,b104,b105,b106,b107;// 72b
    byte b110,b111,b112,b113,b114,b115,b116,b117;// 80b
    byte b120,b121,b122,b123,b124,b125,b126,b127;// 88b
    byte b130,b131,b132,b133,b134,b135,b136,b137;// 96b
    byte b140,b141,b142,b143,b144,b145,b146,b147;//104b
    byte b150,b151,b152,b153,b154,b155,b156,b157;//112b
    byte b160,b161,b162,b163,b164,b165,b166,b167;//120b
    byte b170,b171,b172,b173,b174,b175,b176,b177;//128b
// MpscArrayQueueProducerLimitField
    private volatile long producerLimit;
// MpscArrayQueueL2Pad
    byte b000,b001,b002,b003,b004,b005,b006,b007;//  8b
    byte b010,b011,b012,b013,b014,b015,b016,b017;// 16b
    byte b020,b021,b022,b023,b024,b025,b026,b027;// 24b
    byte b030,b031,b032,b033,b034,b035,b036,b037;// 32b
    byte b040,b041,b042,b043,b044,b045,b046,b047;// 40b
    byte b050,b051,b052,b053,b054,b055,b056,b057;// 48b
    byte b060,b061,b062,b063,b064,b065,b066,b067;// 56b
    byte b070,b071,b072,b073,b074,b075,b076,b077;// 64b
    byte b100,b101,b102,b103,b104,b105,b106,b107;// 72b
    byte b110,b111,b112,b113,b114,b115,b116,b117;// 80b
    byte b120,b121,b122,b123,b124,b125,b126,b127;// 88b
    byte b130,b131,b132,b133,b134,b135,b136,b137;// 96b
    byte b140,b141,b142,b143,b144,b145,b146,b147;//104b
    byte b150,b151,b152,b153,b154,b155,b156,b157;//112b
    byte b160,b161,b162,b163,b164,b165,b166,b167;//120b
// MpscArrayQueueConsumerIndexField
    private volatile long consumerIndex;
// MpscArrayQueueL3Pad
    byte b000,b001,b002,b003,b004,b005,b006,b007;//  8b
    byte b010,b011,b012,b013,b014,b015,b016,b017;// 16b
    byte b020,b021,b022,b023,b024,b025,b026,b027;// 24b
    byte b030,b031,b032,b033,b034,b035,b036,b037;// 32b
    byte b040,b041,b042,b043,b044,b045,b046,b047;// 40b
    byte b050,b051,b052,b053,b054,b055,b056,b057;// 48b
    byte b060,b061,b062,b063,b064,b065,b066,b067;// 56b
    byte b070,b071,b072,b073,b074,b075,b076,b077;// 64b
    byte b100,b101,b102,b103,b104,b105,b106,b107;// 72b
    byte b110,b111,b112,b113,b114,b115,b116,b117;// 80b
    byte b120,b121,b122,b123,b124,b125,b126,b127;// 88b
    byte b130,b131,b132,b133,b134,b135,b136,b137;// 96b
    byte b140,b141,b142,b143,b144,b145,b146,b147;//104b
    byte b150,b151,b152,b153,b154,b155,b156,b157;//112b
    byte b160,b161,b162,b163,b164,b165,b166,b167;//120b
    byte b170,b171,b172,b173,b174,b175,b176,b177;//128b

可以看出,MpscXxxPad 類中使用了大量 byte 類型的變量,其命名沒有什么特殊的含義,只是起到填充的作用。Disruptor 也使用了類似的填充方法。Mpsc Queue 和 Disruptor 之所以填充這些無意義的變量,是為了解決偽共享(false sharing)問題。

什么是偽共享呢?在計算機組成中,CPU 的運算速度比內存高出幾個數量級,為了 CPU 能夠更高效地與內存進行交互,在 CPU 和內存之間設計了多層緩存機制,如下圖所示。

一般來說,CPU 會分為三級緩存,分別為L1 一級緩存、L2 二級緩存和L3 三級緩存。越靠近 CPU 的緩存,速度越快,但是緩存的容量也越小。所以從性能上來說,L1 > L2 > L3,容量方面 L1 < L2 < L3。CPU 讀取數據時,首先會從 L1 查找,如果未命中則繼續查找 L2,如果還未能命中則繼續查找 L3,最后還沒命中的話只能從內存中查找,讀取完成后再將數據逐級放入緩存中。此外,多線程之間共享一份數據的時候,需要其中一個線程將數據寫回主存,其他線程訪問主存數據。

由此可見,引入多級緩存是為了能夠讓 CPU 利用率最大化。如果你在做頻繁的 CPU 運算時,需要盡可能將數據保持在緩存中。那么 CPU 從內存中加載數據的時候,是如何提高緩存的利用率的呢?這就涉及緩存行(Cache Line)的概念,Cache Line 是 CPU 緩存可操作的最小單位,CPU 緩存由若干個 Cache Line 組成。Cache Line 的大小與 CPU 架構有關,在目前主流的 64 位架構下,Cache Line 的大小通常為 64 Byte。CPU 在加載內存數據時,會將相鄰的數據一同讀取到 Cache Line 中,因為相鄰的數據未來被訪問的可能性最大,這樣就可以避免 CPU 頻繁與內存進行交互了。

偽共享問題是如何發生的呢?它又會造成什么影響呢?我們使用下面這幅圖進行講解。

假設變量 A、B、C、D 被加載到同一個 Cache Line,它們會被高頻地修改。當線程 1 在 CPU Core1 中中對變量 A 進行修改,修改完成后 CPU Core1 會通知其他 CPU Core 該緩存行已經失效。然后線程 2 在 CPU Core2 中對變量 C 進行修改時,發現 Cache line 已經失效,此時 CPU Core1 會將數據重新寫回內存,CPU Core2 再從內存中讀取數據加載到當前 Cache line 中。

由此可見,如果同一個 Cache line 被越多的線程修改,那么造成的寫競爭就會越激烈,數據會頻繁寫入內存,導致性能浪費。

對於偽共享問題,我們應該如何解決呢?Disruptor 和 Mpsc Queue 都采取了空間換時間的策略,讓不同線程共享的對象加載到不同的緩存行即可。下面我們通過一個簡單的例子進行說明。

public class FalseSharingPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
    protected volatile long value = 0;
    protected long p9, p10, p11, p12, p13, p14, p15;
}

從上述代碼中可以看出,變量 value 前后都填充了 7 個 long 類型的變量。這樣不論在什么情況下,都可以保證在多線程訪問 value 變量時,value 與其他不相關的變量處於不同的 Cache Line,如下圖所示。

偽共享問題一般是非常隱蔽的,在實際開發的過程中,並不是項目中所有地方都需要花費大量的精力去優化偽共享問題。CPU Cache 的填充本身也是比較珍貴的,我們應該把精力聚焦在一些高性能的數據結構設計上,把資源用在刀刃上,使系統性能收益最大化。

至此,我們知道 Mpsc Queue 為了解決偽共享問題填充了大量的 byte 類型變量,造成源碼不易閱讀。因為變量填充只是為了提升 Mpsc Queue 的性能,與 Mpsc Queue 的主體功能無關。接下來我們先忽略填充變量,開始分析 Mpsc Queue 的基本實現原理。

Mpsc Queue 源碼分析

MpscArrayQueue 如何使用,示例代碼如下:

public class MpscArrayQueueTest {
    public static final MpscArrayQueue<String> MPSC_ARRAY_QUEUE = new MpscArrayQueue<>(2);
    public static void main(String[] args) {
        for (int i = 1; i <= 2; i++) {
            int index = i;
            new Thread(() -> MPSC_ARRAY_QUEUE.offer("data" + index), "thread" + index).start();
        }
        try {
            Thread.sleep(1000L);
            MPSC_ARRAY_QUEUE.add("data3"); // 入隊操作,隊列滿則拋出異常
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("隊列大小:" + MPSC_ARRAY_QUEUE.size() + ", 隊列容量:" + MPSC_ARRAY_QUEUE.capacity());
        System.out.println("出隊:" + MPSC_ARRAY_QUEUE.remove()); // 出隊操作,隊列為空則拋出異常
        System.out.println("出隊:" + MPSC_ARRAY_QUEUE.poll()); // 出隊操作,隊列為空則返回 NULL
    }
}

程序輸出結果如下:

java.lang.IllegalStateException: Queue full
    at java.util.AbstractQueue.add(AbstractQueue.java:98)
    at com.xiaojie.netty.MpscArrayQueueTest.main(MpscArrayQueueTest.java:14)
隊列大小:2, 隊列容量:2
出隊:data2
出隊:data1

MpscArrayQueue 終究還是個隊列,基本用法與 ArrayBlockingQueue 都是類似的:入隊 offer()和出隊 poll()。

入隊 offer

首先我們先回顧下 MpscArrayQueue 的重要屬性:

// ConcurrentCircularArrayQueue
protected final long mask; // 計算數組下標的掩碼
protected final E[] buffer; // 存放隊列數據的數組
// MpmcArrayQueueProducerIndexField
private volatile long producerIndex; // 生產者的索引
// MpscArrayQueueProducerLimitField
private volatile long producerLimit; // 生產者索引的最大值
// MpscArrayQueueConsumerIndexField
protected long consumerIndex; // 消費者索引

看到 mask 變量,你現在是不是條件反射想到隊列中數組的容量大小肯定是 2 的次冪。因為 Mpsc 是多生產者單消費者隊列,所以 producerIndex、producerLimit 都是用 volatile 進行修飾的,其中一個生產者線程的修改需要對其他生產者線程可見。

offer() 方法:

    public boolean offer(final E e)
    {
        if (null == e)
        {
            throw new NullPointerException();
        }

        // use a cached view on consumer index (potentially updated in loop)
        final long mask = this.mask;
        long producerLimit = lvProducerLimit();// 獲取生產者索引最大限制
        long pIndex;
        do
        {
            pIndex = lvProducerIndex();// 獲取生產者索引
            if (pIndex >= producerLimit)
            {
                final long cIndex = lvConsumerIndex();// 獲取消費者索引
                producerLimit = cIndex + mask + 1;

                if (pIndex >= producerLimit)
                {
                    return false; // FULL :(
                }
                else
                {
                    // update producer limit to the next index that we must recheck the consumer index
                    // this is racy, but the race is benign
                    soProducerLimit(producerLimit);// 更新 producerLimit
                }
            }
        }
        while (!casProducerIndex(pIndex, pIndex + 1));// CAS 更新生產者索引,更新成功則退出,說明當前生產者已經占領索引值

        /*
         * NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on
         * the index visibility to poll() we would need to handle the case where the element is not visible.
         */

        // Won CAS, move on to storing
        final long offset = calcCircularRefElementOffset(pIndex, mask);// 計算生產者索引在數組中下標
        soRefElement(buffer, offset, e);// 向數組中放入數據
        return true; // AWESOME :)
    }

lvProducerLimit() 方法

    MpscArrayQueueProducerLimitField(int capacity)
    {
        super(capacity);
        this.producerLimit = capacity;
    }

    final long lvProducerLimit()
    {
        return producerLimit;
    }

在初始化狀態,producerLimit 與隊列的容量是相等的,對應到 MpscArrayQueueTest 代碼示例中,producerLimit = capacity = 2,而 producerIndex = consumerIndex = 0。接下來 Thread1 和 Thread2 並發向 MpscArrayQueue 中存放數據,如下圖所示。

 兩個線程此時拿到的 producerIndex 都是 0,是小於 producerLimit 的。此時兩個線程都會嘗試使用 CAS 操作更新 producerIndex,其中必然有一個是成功的,另外一個是失敗的。假設 Thread1 執行 CAS 操作成功,那么 Thread2 失敗后就會重新更新 producerIndex。Thread1 更新后 producerIndex 的值為 1,由於 producerIndex 是 volatile 修飾的,更新后立刻對 Thread2 可見。這里有一點需要注意的是,當前線程更新后的值是被其他線程使用,當 Thread1 和 Thread2 都通過 CAS 搶占成功后,它們拿到的 pIndex 分別是 0 和 1。接下來就是根據 pIndex 進行位運算計算得到數組對應的下標,然后通過 UNSAFE.putOrderedObject() 方法將數據寫入到數組中,源碼如下所示。

public static <E> void soRefElement(E[] buffer, long offset, E e)
{
UNSAFE.putOrderedObject(buffer, offset, e);
}

putOrderedObject() 和 putObject() 都可以用於更新對象的值,但是 putOrderedObject() 並不會立刻將數據更新到內存中,並把其他 Cache Line 置為失效。putOrderedObject() 使用的是 LazySet 延遲更新機制,所以性能方面 putOrderedObject() 要比 putObject() 高很多。

Java 中有四種類型的內存屏障,分別為 LoadLoad、StoreStore、LoadStore 和 StoreLoad。putOrderedObject() 使用了 StoreStore Barrier,對於 Store1,StoreStore,Store2 這樣的操作序列,在 Store2 進行寫入之前,會保證 Store1 的寫操作對其他處理器可見。

LazySet 機制是有代價的,就是寫操作結果有納秒級的延遲,不會立刻被其他線程以及自身線程可見。因為在 Mpsc Queue 的使用場景中,多個生產者只負責寫入數據,並沒有寫入之后立刻讀取的需求,所以使用 LazySet 機制是沒有問題的,只要 StoreStore Barrier 保證多線程寫入的順序即可。

至此,offer() 的核心操作我們已經講完了。現在我們繼續把目光聚焦在 do-while 循環內的邏輯,為什么需要兩次 if(pIndex >= producerLimit) 判斷呢?說明當生產者索引大於 producerLimit 閾值時,可能存在兩種情況:producerLimit 緩存值過期了或者隊列已經滿了。所以此時我們需要讀取最新的消費者索引 consumerIndex,之前讀取過的數據位置都可以被重復使用,重新做一次 producerLimit 計算,然后再做一次 if(pIndex >= producerLimit) 判斷,如果生產者索引還是大於 producerLimit 閾值,說明隊列的真的滿了。

因為生產者有多個線程,所以 MpscArrayQueue 采用了 UNSAFE.getLongVolatile() 方法保證獲取消費者索引 consumerIndex 的准確性。getLongVolatile() 使用了 StoreLoad Barrier,對於 Store1,StoreLoad,Load2 的操作序列,在 Load2 以及后續的讀取操作之前,都會保證 Store1 的寫入操作對其他處理器可見。StoreLoad 是四種內存屏障開銷最大的,現在你是不是可以體會到引入 producerLimit 的好處了呢?假設我們的消費速度和生產速度比較均衡的情況下,差不多走完一圈數組才需要獲取一次消費者索引 consumerIndex,從而大幅度減少了 getLongVolatile() 操作的執行次數,性能提升是顯著的。

出隊 poll

poll() 方法的作用是移除隊列的首個元素並返回,如果隊列為空則返回 NULL。我們看下 poll() 源碼是如何實現的。

    public E poll()
    {
        final long cIndex = lpConsumerIndex();// 直接返回消費者索引 consumerIndex
        final long offset = calcCircularRefElementOffset(cIndex, mask);// 計算數組對應的偏移量
        // Copy field to avoid re-reading after volatile load
        final E[] buffer = this.buffer;

        // If we can't see the next available element we can't poll
        E e = lvRefElement(buffer, offset);// 取出數組中 offset 對應的元素
        if (null == e)
        {
            /*
             * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
             * winning the CAS on offer but before storing the element in the queue. Other producers may go on
             * to fill up the queue after this element.
             */
            if (cIndex != lvProducerIndex())
            {
                do
                {
                    e = lvRefElement(buffer, offset);
                }
                while (e == null);// 等待生產者填充元素
            }
            else// 隊列已滿
            {
                return null;
            }
        }

        spRefElement(buffer, offset, null);// 消費成功后將當前位置置為 NULL
        soConsumerIndex(cIndex + 1);// 更新 consumerIndex 到下一個位置
        return e;
    }

因為只有一個消費者線程,所以整個 poll() 的過程沒有 CAS 操作。poll() 方法核心思路是獲取消費者索引 consumerIndex,然后根據 consumerIndex 計算得出數組對應的偏移量,然后將數組對應位置的元素取出並返回,最后將 consumerIndex 移動到環形數組下一個位置。

獲取消費者索引以及計算數組對應的偏移量的邏輯與 offer() 類似,在這里就不贅述了。下面直接看下如何取出數組中 offset 對應的元素,跟進 lvRefElement() 方法的源碼。

    public static <E> E lvRefElement(E[] buffer, long offset)
    {
        return (E) UNSAFE.getObjectVolatile(buffer, offset);
    }

獲取數組元素的時候同樣使用了 UNSAFE 系列方法,getObjectVolatile() 方法則使用的是 LoadLoad Barrier,對於 Load1,LoadLoad,Load2 操作序列,在 Load2 以及后續讀取操作之前,會保證 Load1 的讀取操作執行完畢,所以 getObjectVolatile() 方法可以保證每次讀取數據都可以從內存中拿到最新值。

與 offer() 相反,poll() 比較關注隊列為空的情況。當調用 lvElement() 方法獲取到的元素為 NULL 時,有兩種可能的情況:隊列為空或者生產者填充的元素還沒有對消費者可見。如果消費者索引 consumerIndex 等於生產者 producerIndex,說明隊列為空。只要兩者不相等,消費者需要等待生產者填充數據完畢。

當成功消費數組中的元素之后,需要把當前消費者索引 consumerIndex 的位置置為 NULL,然后把 consumerIndex 移動到數組下一個位置。邏輯比較簡單,下面我們把 spRefElement() 和 soConsumerIndex() 方法放在一起看。

public static <E> void spRefElement(E[] buffer, long offset, E e)
{
    UNSAFE.putObject(buffer, offset, e);
}
final void soConsumerIndex(long newValue)
{
UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
}

最后的更新操作我們又看到了 UNSAFE put 系列方法的運用,其中 putObject() 不會使用任何內存屏障,它會直接更新對象對應偏移量的值。而 putOrderedLong 與 putOrderedObject() 是一樣的,都使用了 StoreStore Barrier,也是延遲更新 LazySet 機制,我們就不再贅述了。

到此為止,MpscArrayQueue 入隊和出隊的核心源碼已經分析完了。因為 JCTools 是服務於 JVM 的並發工具類,其中包含了很多黑科技的技巧,例如填充法解決偽共享問題、Unsafe 直接操作內存等,讓我們對底層知識的掌握又更進一步。此外 JCTools 還提供了 MpscUnboundedArrayQueue、MpscChunkedArrayQueue 等其他具有特色功能的隊列,有興趣的話你可以課后自行研究。

總結

MpscArrayQueue 還只是 Jctools 中的冰山一角,其中蘊藏着豐富的技術細節,我們對 MpscArrayQueue 的知識點做一個簡單的總結。

  • 通過大量填充 byte 類型變量解決偽共享問題。
  • 環形數組的容量設置為 2 的次冪,可以通過位運算快速定位到數組對應下標。
  • 入隊 offer() 操作中 producerLimit 的巧妙設計,大幅度減少了主動獲取消費者索引 consumerIndex 的次數,性能提升顯著。
  • 入隊和出隊操作中都大量使用了 UNSAFE 系列方法,針對生產者和消費者的場景不同,使用的 UNSAFE 方法也是不一樣的。Jctools 在底層操作的運用上也是有的放矢,把性能發揮到極致。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM