BlockingQueue原理


概念

BlockingQueue 翻譯成中文阻塞隊列,顧名思義就是線程使用隊列時會阻塞當前線程;

BlockingQueue 繼承了Collection,具有一般集合所具有的數據存取功能

BlockingQueue 是線程安全的隊列,多線程訪問時不會出現同一個數據集中的數據被多次取出,或者覆蓋存放的事件

 

使用場景

可用於一個快速反饋的消息隊列,無消息時阻塞線程讓出CPU,有數據存入時通知線程取出數據,取完后繼續阻塞,

比如用戶下單后立刻在大屏上顯示有客戶下單,比較簡單的做法是開啟一個定時任務,定期掃訂單表;或者接入消息中間件,下單時發送消息,大屏服務監聽消息;或者借用reddis隊列 解決方式有很多種不一一列舉

示例模擬數據的存取 設置隊列的容量為1 是為了更好展示 存取的阻塞特性

public static void main(String[] args) throws InterruptedException {
    ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(1);

    //模擬存入數據線程
    new Thread(()->{
        int i=0;
        while (true){
            try {
                //每次循環+1
                i++;
                queue.put(i);
                System.out.println("存入數據"+i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "存入數據線程").start();

    //模擬取出數據線程 1秒鍾取一個
    new Thread(()->{
        while (true){
            try {
                //一秒鍾取一個數據
                Thread.sleep(1000);
                Integer result = queue.take();
                System.out.println("取出數據"+result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "取數據線程").start();

}

打印結果:
存入數據1
取出數據1
存入數據2
取出數據2
存入數據3
取出數據3
存入數據4
取出數據4
存入數據5
取出數據5
存入數據6
取出數據6
存入數據7
取出數據7

 

方法示例

阻塞隊列的使用非常簡單,基本上和普通集合一樣對數據進行存和取

public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(2);

        //存入一個數據 如果隊列滿了則一直阻塞到有數據取出
        queue.put(1);
        //取出一個數據 如果隊列空了則一直阻塞到有數據存入
        queue.take();

        //存入一個數據 如果隊列滿了則阻塞若干時長(示例為10秒),超時則返回offerResult=false
        boolean offerResult = queue.offer(1, 10, TimeUnit.SECONDS);
        //取出一個數據 如果隊列空了則阻塞若干時長(示例為10秒),超時則返回pollResult=null
        Integer pollResult = queue.poll(10, TimeUnit.SECONDS);
}

 

源碼分析

1、接口繼承結構 

 

2、接口代碼

public interface BlockingQueue<E> extends Queue<E> {
    //向隊列中添加元素, 若超過給定隊列長度拋出異常
    boolean add(E e);

    //向隊列中添加元素, 若超過給定隊列長度拋出異常
    boolean offer(E e);

    //向隊列中添加元素, 若超過隊列長度則等待隊列有剩余容量再加入元素
    void put(E e) throws InterruptedException;

    //向隊列中添加元素, 若超過給定隊列長度則等待給定時長
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //獲取隊列頭部元素,並從隊列頭部移除,若隊列為空,則阻塞當前獲取線程,並等待新元素加入
    E take() throws InterruptedException;

    //獲取隊列頭部元素,並從隊列頭部移除,若隊列為空,則阻塞當前獲取線程,並等待元素給定時長
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    //返回隊列剩余容量
    int remainingCapacity();

    //移除指定元素
    boolean remove(Object o);

    //返回是否存在指定元素
    public boolean contains(Object o);

    
    //將隊列中的元素全部移除到給定的集合c中
    int drainTo(Collection<? super E> c);

    //將隊列中的元素全部移除到給定的集合c中(最多不超過maxElements個)
    int drainTo(Collection<? super E> c, int maxElements);
}

 

3、實現類 ArrayBlockingQueue 分析

 

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    //數據集 用於存放元素 初始化固定數組長度 不再擴容
    final Object[] items;

    //數據集下一次取數據的下標
    //具體操作為 每次take加1 若take+1==items.length即take最后一個元素
    //則takeIndex重置為0 如此往復
    int takeIndex;

    //數據集下一次存數據下標
    int putIndex;

    //數據集中 存放元素的個數 即items[i]!=null的個數
    int count;

    //重入鎖 可選公平與非公平 非本文重點
    final ReentrantLock lock;

    //Condition 
    //使用流程 1取數據為空(count==0) 則阻塞等待數據集存入數據 執行等待notEmpty.await 
    //         2存數據數據集肯定不為空(count!=0), 則通知取數據線程繼續取數據 執行通知notEmpty.signal
    private final Condition notEmpty;

    //Condition 
    //使用流程 1存數據數據集存滿(count==items.length)則等待消耗后重新存入 執行等待notFull.await 
    //         2取數據后則數據集未滿肯定不滿(count<items.length) 則通知存入數據 執行通知notFull.signal 
    private final Condition notFull;

    //用戶維護ArrayBlockingQueue 作為集合的迭代(Iterator)功能
    //調用ArrayBlockingQueue.iterator()是初始化此屬性 非本文重點
    transient Itrs itrs = null;



    //--------------------重點方法------------------------
    

    /**
     * 從隊列中取一個元素
     * @return [description]
     * @throws InterruptedException [description]
     */
    public E take() throws InterruptedException {
        //對操作進行加鎖 多線程時輪流取元素
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //如果隊列中沒有對象 則阻塞線程等待
            while (count == 0)
                //重點:等待存數據的線程通知
                notEmpty.await();
            //代碼運行到此處說明count!=null 執行從隊列中取元素
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    private E dequeue() {
        final Object[] items = this.items;
        //從數據集數組items 取出下標takeIndex的數據
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        //取完數據之后 將數組對應下標應用置為空(GC對象)
        items[takeIndex] = null;
        //takeIndex+1等於數組長度表示當前下標為數組最后一個對象
        //則takeIndex重新歸0
        if (++takeIndex == items.length)
            takeIndex = 0;
        //每次取數據 數據總量減1
        count--;
        //迭代器維護操作
        if (itrs != null)
            itrs.elementDequeued();
        //重點:通知存數據的線程 可以執行數據存放
        notFull.signal();
        return x;
    }


    /**
     * 存入一個數據
     * @param  e                    [description]
     * @throws InterruptedException [description]
     */
    public void put(E e) throws InterruptedException {
        //校驗數據非空
        checkNotNull(e);
        //加鎖
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //若數據集數組items滿了 則阻塞線程等待
            while (count == items.length)
                //重點:等待取出數據的線程通知
                notFull.await();
            //存入數據
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        final Object[] items = this.items;
        //存入數據到下標putIndex
        items[putIndex] = x;
        //如果存數據的下標已經到數據最后一個下標 則putIndex重新歸0
        if (++putIndex == items.length)
            putIndex = 0;
        //數據總量加1
        count++;
        //重點:存入數據后通知等待取數據的線程
        notEmpty.signal();
    }




}

 

 總結:

BlockingQueue 重點關注

1、阻塞方式

Condition notFull 和 Condition notEmpty 的使用,存通知取,取通知存;

從而達到存滿阻塞,取完阻塞,存入通知取,取出通知存的功能

2、存取游標

takeIndex 和 putIndex的使用,每次取數據takeIndex加1,到了數據末尾則重新回到數組開始下標0,存數據原理相似逐次加1,到末尾歸0

對於LinkedBlockingQueue實現方式則略有不同,鏈表式集合多線程取數據時只需要排隊從頭部節點獲取,從末尾存數據,有個小優化,創建LinkedBlockingQueue

時創建一個虛擬頭部節點,不做深究

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM