SynchronousQueue詳解


前言

SynchronousQueue是一個比較特別的隊列,由於在線程池方面有所應用,為了更好的理解線程池的實現原理,筆者花了些時間學習了一下該隊列源碼(JDK1.8),此隊列源碼中充斥着大量的CAS語句,理解起來是有些難度的,為了方便日后回顧,本篇文章會以簡潔的圖形化方式展示該隊列底層的實現原理。

 

SynchronousQueue簡單使用

經典的生產者-消費者模式,操作流程是這樣的:

有多個生產者,可以並發生產產品,把產品置入隊列中,如果隊列滿了,生產者就會阻塞;

有多個消費者,並發從隊列中獲取產品,如果隊列空了,消費者就會阻塞;

如下面的示意圖所示:

 

 

SynchronousQueue
也是一個隊列來的,但它的特別之處在於它內部沒有容器,一個生產線程,當它生產產品(即put的時候),如果當前沒有人想要消費產品(即當前沒有線程執行take),此生產線程必須阻塞,等待一個消費線程調用take操作,take操作將會喚醒該生產線程,同時消費線程會獲取生產線程的產品(即數據傳遞),這樣的一個過程稱為一次配對過程(當然也可以先take后put,原理是一樣的)。

我們用一個簡單的代碼來驗證一下,如下所示:

package com.concurrent;

import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();

        Thread putThread = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("put thread start");
                try {
                    queue.put(1);
                } catch (InterruptedException e) {
                }
                System.out.println("put thread end");
            }
        });

        Thread takeThread = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("take thread start");
                try {
                    System.out.println("take from putThread: " + queue.take());
                } catch (InterruptedException e) {
                }
                System.out.println("take thread end");
            }
        });

        putThread.start();
        Thread.sleep(1000);
        takeThread.start();
    }
}

 

一種輸出結果如下:

put thread start
take thread start
take from putThread: 1
put thread end
take thread end

 

從結果可以看出,put線程執行queue.put(1) 后就被阻塞了,只有take線程進行了消費,put線程才可以返回。可以認為這是一種線程與線程間一對一傳遞消息的模型。

 

SynchronousQueue實現原理

不像ArrayBlockingQueue、LinkedBlockingDeque之類的阻塞隊列依賴AQS實現並發操作,SynchronousQueue直接使用CAS實現線程的安全訪問。由於源碼中充斥着大量的CAS代碼,不易於理解,所以按照筆者的風格,接下來會使用簡單的示例來描述背后的實現模型。

隊列的實現策略通常分為公平模式和非公平模式,接下來將分別進行說明。

 

公平模式下的模型:

公平模式下,底層實現使用的是TransferQueue這個內部隊列,它有一個head和tail指針,用於指向當前正在等待匹配的線程節點。

初始化時,TransferQueue的狀態如下:

 

 

接着我們進行一些操作:

1、線程put1執行 put(1)操作,由於當前沒有配對的消費線程,所以put1線程入隊列,自旋一小會后睡眠等待,這時隊列狀態如下:

 

 

2、接着,線程put2執行了put(2)操作,跟前面一樣,put2線程入隊列,自旋一小會后睡眠等待,這時隊列狀態如下:

 

 

3、這時候,來了一個線程take1,執行了
take操作,由於tail指向put2線程,put2線程跟take1線程配對了(一put一take),這時take1線程不需要入隊,但是請注意了,這時候,要喚醒的線程並不是put2,而是put1。為何?

大家應該知道我們現在講的是公平策略,所謂公平就是誰先入隊了,誰就優先被喚醒,我們的例子明顯是put1應該優先被喚醒。至於讀者可能會有一個疑問,明明是take1線程跟put2線程匹配上了,結果是put1線程被喚醒消費,怎么確保take1線程一定可以和次首節點(head.next)也是匹配的呢?其實大家可以拿個紙畫一畫,就會發現真的就是這樣的。


公平策略總結下來就是:隊尾匹配隊頭出隊。

執行后put1線程被喚醒,take1線程的 take()方法返回了1(put1線程的數據),這樣就實現了線程間的一對一通信,這時候內部狀態如下:

 

 

4、最后,再來一個線程take2,執行take操作,這時候只有put2線程在等候,而且兩個線程匹配上了,線程put2被喚醒,

take2線程take操作返回了2(線程put2的數據),這時候隊列又回到了起點,如下所示:

 

 

以上便是公平模式下,SynchronousQueue的實現模型。總結下來就是:隊尾匹配隊頭出隊,先進先出,體現公平原則。

 

非公平模式下的模型:

我們還是使用跟公平模式下一樣的操作流程,對比兩種策略下有何不同。非公平模式底層的實現使用的是TransferStack,

一個棧,實現中用head指針指向棧頂,接着我們看看它的實現模型:

1、線程put1執行 put(1)操作,由於當前沒有配對的消費線程,所以put1線程入棧,自旋一小會后睡眠等待,這時棧狀態如下:

 

 

2、接着,線程put2再次執行了put(2)操作,跟前面一樣,put2線程入棧,自旋一小會后睡眠等待,這時棧狀態如下:

 

 

3、這時候,來了一個線程take1,執行了take操作,這時候發現棧頂為put2線程,匹配成功,但是實現會先把take1線程入棧,然后take1線程循環執行匹配put2線程邏輯,一旦發現沒有並發沖突,就會把棧頂指針直接指向 put1線程

 

 

4、最后,再來一個線程take2,執行take操作,這跟步驟3的邏輯基本是一致的,take2線程入棧,然后在循環中匹配put1線程,最終全部匹配完畢,棧變為空,恢復初始狀態,如下圖所示:

 

 

可以從上面流程看出,雖然put1線程先入棧了,但是卻是后匹配,這就是非公平的由來。

總結

SynchronousQueue由於其獨有的線程一一配對通信機制,在大部分平常開發中,可能都不太會用到,但線程池技術中會有所使用,由於內部沒有使用AQS,而是直接使用CAS,所以代碼理解起來會比較困難,但這並不妨礙我們理解底層的實現模型,在理解了模型的基礎上,有興趣的話再查閱源碼,就會有方向感,看起來也會比較容易,希望本文有所借鑒意義。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM