並發隊列:ArrayBlockingQueue實際運用場景和原理


ArrayBlockingQueue實際應用場景


之前在某公司做過一款情緒識別的系統,這套系統通過調用攝像頭接口采集人臉信息,將采集的人臉信息做人臉識別和情緒分析,最終經過一定的算法將個人情緒數據轉化具體行為指標值。其中采集圖片的部分就用到了並發隊列ArrayBlockingQueue。

image.png

如上圖所示:攝像頭有n個,單線程采集的效率會比較慢,所以在采集攝像頭的過程中是多線程的,另外采集到的圖片需要存儲到圖片服務器,對圖片服務器寫也有很高的要求,圖片服務器是集群的,也需要用到也多線程的。將圖片入庫后需要將圖片數據打到人臉分析服務器上去處理,這部分涉及到了分布式消息,所以是黑色虛線部分用kafka來傳遞消息。其中紅色虛線部分多線程圖片采集將信息傳遞到多線程圖片存儲用到了ArrayBlockingQueue,它是並發安全隊列

 

ArrayBlockingQueue簡化類圖結構


image.png

從類圖可以看出Queue接口提供了add,offer入隊列的方法,提供poll出隊列的方法!

BlockingQueue接口增加了put入隊列的方法,提供take出隊列的方法!

補充說明:UML類圖結構:

  • 繼承:實線空箭頭。
  • 實現:虛線虛箭頭。

 

 

並發隊列阻塞和非阻塞概念


從上面類圖名字可以看到Queue提供的方法是非阻塞的!而BlockingQueue提供的put,take方法是阻塞的!下面按老思路,我們用代碼說明阻塞非阻塞下!

非阻塞

import java.util.concurrent.ArrayBlockingQueue;

/**
 * @author :jiaolian
 * @date :Created in 2021-02-02 20:16
 * @description:ArrayBlockingQueue阻塞非阻塞測試
 * @modified By:
 * 公眾號:叫練
 */
public class ArrayBlockingQueueTest {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
        arrayBlockingQueue.offer("叫練");
        arrayBlockingQueue.offer("叫練");
        //輸出arrayBlockingQueue的長度
        System.out.println(arrayBlockingQueue.size());
    }
}

 

如上代碼:設置ArrayBlockingQueue長度為1,通過offer方法向隊列添加2個元素,最后打印arrayBlockingQueue的長度?答案是1,不會阻塞,因為offer方法丟棄了第二個元素“叫練”,我們說出隊和入隊能夠讓其繼續執行的隊列我們稱為非阻塞。如果換成add方法呢?就會報錯隊列溢出,如下圖所示!但是還不是阻塞的。下面我們看看什么阻塞!

image.png

 

阻塞

import java.util.concurrent.ArrayBlockingQueue;

/**
 * @author :jiaolian
 * @date :Created in 2021-02-02 20:16
 * @description:ArrayBlockingQueue阻塞非阻塞測試
 * @modified By:
 * 公眾號:叫練
 */
public class ArrayBlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
        arrayBlockingQueue.put("叫練");
        arrayBlockingQueue.put("叫練");
        //輸出arrayBlockingQueue的長度
        System.out.println(arrayBlockingQueue.size());
    }
}

 

如上代碼:ArrayBlockingQueue長度為1,通過put方法向隊列添加2個元素,最后輸出arrayBlockingQueue的長度是多少?答案是控制台一直運行,因為在添加第二個“叫練”時程序阻塞了。我們說出隊和入隊不能夠讓其繼續執行的隊列我們稱為阻塞,add方法,poll方法,take方法我們就不一一舉例了,大家可以寫代碼做下最簡單的測試!

 

好啦,我們對幾個方法做個總結吧!

  • 入隊:

offer:隊列滿了丟棄。

add :隊列滿了報錯。

put :阻塞。

  • 出隊:

poll :如果隊列為空則返回null。

take :阻塞。

 

ArrayBlockingQueue實現原理淺析


image.png

如上圖,ArrayBlockingQueue是用數組實現的,ReentrantLock獨占鎖控制數組的入隊和出隊。notEmpty,notFull是ReentrantLock的兩個條件隊列,用來控制隊列是否進入阻塞狀態,是生產者和消費者模型。下面我們看看take,put方法流程,其他的方法同理。

  • take方法:多個線程競爭獨占鎖獲取items[taskIndex]隊首元素,其中A線程成功獲取鎖,其他線程阻塞等待A線程執行完成釋放鎖,如果隊列不為空,A線程獲取items[taskIndex]元素返回移除並釋放鎖讓其他阻塞線程繼續競爭;如果隊列為空,A線程調用notEmpty.await方法進入條件隊列並釋放鎖讓其他阻塞線程繼續競爭,其他線程發現隊列為空也會進入notEmpty條件隊列,等待put線程入隊通知notEmpty阻塞線程。
  • put方法:多個線程競爭獨占鎖設置items[putIndex]隊尾元素,其中A線程成功獲取鎖,其他線程阻塞等待A線程執行完成釋放鎖,如果隊列不滿【隊列長度】,A線程添加items[putIndex]元素返回並釋放鎖讓其他阻塞線程繼續競爭;如果隊列滿了,A線程調用notFull.await方法進入條件隊列並釋放鎖讓其他阻塞線程繼續競爭,其他線程發現隊列為空也會進入notFull條件隊列,等待take線程出隊通知notFull阻塞線程

 

完全非阻塞隊列ConcurrentLinkedQueue


ConcurrentLinkedQueue也實現了Queue接口,提供offer,add,poll方法都是非阻塞的,另外從名字可以看出,底層是鏈表結構,入隊和出隊用的是自旋的cas。

 

 

List 多線程安全方案:LinkedBlockingQueue


image.png

LinkedBlockingQueue和ArrayBlockingQueue 類似,LinkedBlockingQueue是有界的,長度是Integer.MAX_VALUE實現上,LinkedBlockingQueue是鏈表,而且是雙鎖,如上圖所示,takeLock獨占鎖控制隊列頭部,putLock控制隊列尾部,互不影響,目的是提高LinkedBlockingQueue的並發度。

 

總結


今天我們介紹了並發隊列重要的幾個概念,整理出來希望能對你有幫助,寫的比不全,同時還有許多需要修正的地方,希望親們加以指正和點評,年前這段時間會繼續輸出線程池這些概念等。最后喜歡的請點贊加關注哦。點關注,不迷路,我是叫練【公眾號】,邊叫邊練。

8bcfc73380e185dcaf516ada1b44abd.jpg

 

參考書籍:《Java並發編程之美》

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM