Java阻塞隊列


💛原文地址為https://www.cnblogs.com/haixiang/p/12354520.html,轉載請注明出處!

什么是阻塞隊列

原文地址為,轉載請注明出處!
阻塞隊列是一個支持阻塞的插入和移除的隊列。

  • 支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
  • 支持阻塞的移除方法:意思是隊列為空時,獲取元素(同時移除元素)的線程會被阻塞,等到隊列變為非空。

阻塞隊列用法

阻塞隊列常用於生產者和消費者的場景,生產者是向隊列里添加元素的線程,消費者是從隊列里獲取元素的線程。

當阻塞隊列不可用時,會有四種相應的處理方式:

處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
插入操作 add(e) offer(e) put(e) offer(e, time, unit)
移除操作 remove() poll() take() poll(time, unit)
獲取操作 element() peek() 不可用 不可用
  • 返回特殊值:插入元素時,會返回是否插入成功,成功返回true。如果是移除方法,則是從隊列中取出一個元素,沒有則返回null。
  • 一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里面put元素,則生產者線程會被阻塞,知道隊列不滿或者響應中斷退出。當隊列為空時,如果消費者線程從隊列里take元素。
  • 超時退出:當阻塞隊列滿時,如果生產者線程往隊列里插入元素,隊列會阻塞生產者線程一段時間,如果超過了指定時間,生產者線程就會退出。

如果是無界阻塞隊列,隊列則不會出現滿的情況。

阻塞隊列

  • ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列

  • LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列

  • PriorityBlockingQueue:一個支持優先級排序無界阻塞隊列

  • DelayQueue:一個使用優先級隊列實現的無界阻塞隊列

  • SynchronousQueue:一個不存儲元素的阻塞隊列

  • LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列

    • LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列

1.ArrayBlockingQueue

此隊列按照先進先出(FIFO)的原則對元素進行排序

默認情況下不保證線程公平地訪問隊列(所謂公平是指當隊列可用時,先被阻塞的線程先訪問隊列)

為了保證公平性通常會降低吞吐量。

公平鎖是利用了可重入鎖的公平鎖來實現。

   public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

2.ArrayBlockingQueue

此隊列按照先進先出(FIFO)的原則對元素進行排序

默認長度為Integer.MAX_VALUE

3.PriorityBlockingQueue

默認情況下元素采取自然順序升序排列

可以自定義Comparator或者自定義類實現compareTo()方法來指定排序規則

不支持同優先級元素排序

4.DelayQueue

隊列使用PriorityQueue來實現,隊列中的元素必須實現Delayed接口

只有在延時期滿才能從隊列中提取元素

阻塞隊列原理

如果隊列是空的,消費者會一直等待,當生產者添加元素時,消費者是如何知道當前隊列有元素的呢?

使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素后,會通知生產者當前隊列可用。

ArrayBlockingQueue為例子

    /** items 存放隊列中的元素*/
    final Object[] items;

    /** take 操作的索引 */
    int takeIndex;

    /** put 操作的索引 */
    int putIndex;

    /** 隊列中元素個數 */
    int count;

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** 控制生產者 takes 操作的 Condition */
    private final Condition notEmpty;

    /** 控制消費者 put 操作的 Condition */
    private final Condition notFull;
    

put操作

    public void put(E e) throws InterruptedException {
        checkNotNull(e); //判斷 e == null
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //獲取鎖,與lock不同,可以嘗試中斷阻塞
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

入隊操作,入隊之后喚醒消費者線程。

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

notFull.await();中其實調用了park方法,先使用setBlocker保存一下將要阻塞的線程,然后調用本地方法UNSAFE.park(boolean isAbsolute, long time)進行阻塞。

    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM