阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。
這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。
阻塞隊列常用於生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。
阻塞隊列提供了四種處理方法:
方法\處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
對於 BlockingQueue,我們的關注點應該在 put(e) 和 take() 這兩個方法,因為這兩個方法是帶阻塞的。
- 拋出異常:是指當阻塞隊列滿時候,再往隊列里插入元素,會拋出IllegalStateException(“Queue full”)異常。當隊列為空時,從隊列里獲取元素時會拋出NoSuchElementException異常 。
- 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列里拿出一個元素,如果沒有則返回null
- 一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列里take元素,隊列也會阻塞消費者線程,直到隊列可用。
- 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。
- ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
- LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
- PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
- DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。
- LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
- LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
一、應用
先使用Object.wait()和Object.notify()、非阻塞隊列實現生產者-消費者模式:
public class Test { private int queueSize = 10; private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread { @Override public void run() { consume(); } private void consume() { while (true) { synchronized (queue) { while (queue.size() == 0) { try { System.out.println("隊列空,等待數據"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.poll(); // 每次移走隊首元素 queue.notify(); System.out.println("從隊列取走一個元素,隊列剩余" + queue.size() + "個元素"); } } } } class Producer extends Thread { @Override public void run() { produce(); } private void produce() { while (true) { synchronized (queue) { while (queue.size() == queueSize) { try { System.out.println("隊列滿,等待有空余空間"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.offer(1); // 每次插入一個元素 queue.notify(); System.out.println("向隊列取中插入一個元素,隊列剩余空間:" + (queueSize - queue.size())); } } } } }
使用阻塞隊列實現的生產者-消費者模式:
public class Test { private int queueSize = 10; private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread{ @Override public void run() { consume(); } private void consume() { while(true){ try { queue.take(); System.out.println("從隊列取走一個元素,隊列剩余"+queue.size()+"個元素"); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread{ @Override public void run() { produce(); } private void produce() { while(true){ try { queue.put(1); System.out.println("向隊列取中插入一個元素,隊列剩余空間:"+(queueSize-queue.size())); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
Java線程(十三):BlockingQueue-線程的阻塞隊列 BlockingQueue(阻塞隊列)詳解 中都有應用舉例可以參考
二、ArrayBlockingQueue
ArrayBlockingQueue,一個由數組實現的有界阻塞隊列。該隊列采用FIFO的原則對元素進行排序添加的。
ArrayBlockingQueue 實現並發同步的原理:
讀操作和寫操作都需要獲取到同一個 AQS 獨占鎖才能進行操作。
如果隊列為空,這個時候讀操作的線程進入到讀線程隊列排隊,等待寫線程寫入新的元素,然后喚醒讀線程隊列的第一個等待線程。
如果隊列已滿,這個時候寫操作的線程進入到寫線程隊列排隊,等待讀線程將隊列元素移除騰出空間,然后喚醒寫線程隊列的第一個等待線程。
源碼分析:
// 屬性 // 用於存放元素的數組 final Object[] items; // 下一次讀取操作的位置 int takeIndex; // 下一次寫入操作的位置 int putIndex; // 隊列中的元素數量 int count; // 以下幾個就是控制並發用的同步器 final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;
put:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // 自旋 隊列滿時,掛起寫線程 notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal();// 成功插入元素后,喚醒讀線程 }
take:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 自旋 隊列為空,掛起讀線程 notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal();// 成功讀出一個元素之后,喚醒寫線程 return x; }
三、LinkedBlockingQueue
LinkedBlockingQueue底層基於單向鏈表實現的阻塞隊列,可以當做無界隊列也可以當做有界隊列來使用。
// 無界隊列 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } // 有界隊列
//注意,這里會初始化一個空的頭結點,那么第一個元素入隊的時候,隊列中就會有兩個元素。讀取元素時,也總是獲取頭節點后面的一個節點。count 的計數值不包括這個頭節點。 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } // 隊列容量 private final int capacity; // 隊列中的元素數量 private final AtomicInteger count = new AtomicInteger(0); // 隊頭 private transient Node<E> head; // 隊尾 private transient Node<E> last; // take, poll, peek 等讀操作的方法需要獲取到這個鎖 private final ReentrantLock takeLock = new ReentrantLock(); // 如果讀操作的時候隊列是空的,那么等待 notEmpty 條件 private final Condition notEmpty = takeLock.newCondition(); // put, offer 等寫操作的方法需要獲取到這個鎖 private final ReentrantLock putLock = new ReentrantLock(); // 如果寫操作的時候隊列是滿的,那么等待 notFull 條件 private final Condition notFull = putLock.newCondition();
原理:
這里用了兩個鎖,兩個 Condition
takeLock 和 notEmpty 怎么搭配:如果要獲取(take)一個元素,需要獲取 takeLock 鎖,但是獲取了鎖還不夠,如果隊列此時為空,還需要隊列不為空(notEmpty)這個條件(Condition)。
putLock 需要和 notFull 搭配:如果要插入(put)一個元素,需要獲取 putLock 鎖,但是獲取了鎖還不夠,如果隊列此時已滿,還需要隊列不是滿的(notFull)這個條件(Condition)。
put():
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // 如果你糾結這里為什么是 -1,可以看看 offer 方法。這就是個標識成功、失敗的標志而已。 int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 必須要獲取到 putLock 才可以進行插入操作 putLock.lockInterruptibly(); try { // 如果隊列滿,等待 notFull 的條件滿足。 while (count.get() == capacity) { notFull.await(); } // 入隊 enqueue(node); // count 原子加 1,c 還是加 1 前的值 c = count.getAndIncrement(); // 如果這個元素入隊后,還有至少一個槽可以使用,調用 notFull.signal() 喚醒等待線程。 // 哪些線程會等待在 notFull 這個 Condition 上呢? if (c + 1 < capacity) notFull.signal(); } finally { // 入隊后,釋放掉 putLock putLock.unlock(); } // 如果 c == 0,那么代表隊列在這個元素入隊前是空的(不包括head空節點), // 那么所有的讀線程都在等待 notEmpty 這個條件,等待喚醒,這里做一次喚醒操作 if (c == 0) signalNotEmpty(); } // 入隊的代碼非常簡單,就是將 last 屬性指向這個新元素,並且讓原隊尾的 next 指向這個元素 // 這里入隊沒有並發問題,因為只有獲取到 putLock 獨占鎖以后,才可以進行此操作 private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } // 元素入隊后,如果需要,調用這個方法喚醒讀線程來讀 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
take():
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 首先,需要獲取到 takeLock 才能進行出隊操作 takeLock.lockInterruptibly(); try { // 如果隊列為空,等待 notEmpty 這個條件滿足再繼續執行 while (count.get() == 0) { notEmpty.await(); } // 出隊 x = dequeue(); // count 進行原子減 1 c = count.getAndDecrement(); // 如果這次出隊后,隊列中至少還有一個元素,那么調用 notEmpty.signal() 喚醒其他的讀線程 if (c > 1) notEmpty.signal(); } finally { // 出隊后釋放掉 takeLock takeLock.unlock(); } // 如果 c == capacity,那么說明在這個 take 方法發生的時候,隊列是滿的 // 既然出隊了一個,那么意味着隊列不滿了,喚醒寫線程去寫 if (c == capacity) signalNotFull(); return x; } // 取隊頭,出隊 private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; // 之前說了,頭結點是空的 Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC // 設置這個為新的頭結點 head = first; E x = first.item; first.item = null; return x; } // 元素出隊后,如果需要,調用這個方法喚醒寫線程來寫 private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
四、PriorityBlockingQueue
PriorityBlockingQueue是一個支持優先級的無界阻塞隊列。默認情況下元素采用自然順序升序排序,當然我們也可以通過構造函數來指定Comparator來對元素進行排序。需要注意的是PriorityBlockingQueue不能保證同優先級元素的順序。
PriorityBlockingQueue為無界隊列(ArrayBlockingQueue 是有界隊列,LinkedBlockingQueue 也可以通過在構造函數中傳入 capacity 指定隊列最大的容量,但是 PriorityBlockingQueue 只能指定初始的隊列大小,后面插入元素的時候,如果空間不夠的話會自動擴容)。
需要注意的是PriorityBlockingQueue並不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者。因此使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。
PriorityBlockingQueue底層采用二叉堆來實現。
關於二叉堆: 二叉堆的實現 二叉堆(一)之 圖文解析 和 C語言的實現
屬性:
// 構造方法中,如果不指定大小的話,默認大小為 11 private static final int DEFAULT_INITIAL_CAPACITY = 11; // 數組的最大容量 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // 這個就是存放數據的數組 private transient Object[] queue; // 隊列當前大小 private transient int size; // 大小比較器,如果按照自然序排序,那么此屬性可設置為 null private transient Comparator<? super E> comparator; // 並發控制所用的鎖,所有的 public 且涉及到線程安全的方法,都必須先獲取到這個鎖 private final ReentrantLock lock; // 這個很好理解,其實例由上面的 lock 屬性創建 private final Condition notEmpty; // 這個也是用於鎖,用於數組擴容的時候,需要先獲取到這個鎖,才能進行擴容操作 // 其使用 CAS 操作 private transient volatile int allocationSpinLock; // 用於序列化和反序列化的時候用,對於 PriorityBlockingQueue 我們應該比較少使用到序列化 private PriorityQueue q;
put():
public void put(E e) { offer(e); // never need to block } public boolean offer(E e) { // 不能為null if (e == null) throw new NullPointerException(); // 獲取鎖 final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; // 擴容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; // 根據比較器是否為null,做不同的處理 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; // 喚醒正在等待的消費者線程 notEmpty.signal(); } finally { lock.unlock(); } return true; }
take():
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // 沒有元素 返回null int n = size - 1; if (n < 0) return null; else { Object[] array = queue; // 出對元素 E result = (E) array[0]; // 最后一個元素(也就是插入到空穴中的元素) E x = (E) array[n]; array[n] = null; // 根據比較器釋放為null,來執行不同的處理 Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
五、DelayQueue
DelayQueue是一個支持延時獲取元素的無界阻塞隊列。
隊列使用PriorityQueue來實現。
隊列中的元素必須實現Delayed接口,在創建元素時可以指定多久才能從隊列中獲取當前元素。
里面的元素全部都是“可延期”的元素,列頭的元素是最先“到期”的元素,如果隊列里面沒有元素到期,是不能從列頭獲取元素的,哪怕有元素也不行。也就是說只有在延遲期到時才能夠從隊列中取元素。
DelayQueue應用場景:
- 緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
- 定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。
隊列中的Delayed必須實現compareTo來指定元素的順序。比如讓延時時間最長的放在隊列的末尾。實現代碼如下:
public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask x = (ScheduledFutureTask)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); }
六、SynchronousQueue
SynchronousQueue是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作,否則不能繼續添加元素。
SynchronousQueue 的隊列其實是虛的,其不提供任何空間(一個都沒有)來存儲元素。數據必須從某個寫線程交給某個讀線程,而不是寫到某個隊列中等待被消費。
當一個線程往隊列中寫入一個元素時,寫入操作不會立即返回,需要等待另一個線程來將這個元素拿走;同理,當一個讀線程做讀操作的時候,同樣需要一個相匹配的寫線程的寫操作。這里的 Synchronous 指的就是讀線程和寫線程需要同步,一個讀線程匹配一個寫線程。
SynchronousQueue可以看成是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列本身並不存儲任何元素,非常適合於傳遞性場景,比如在一個線程中使用的數據,傳遞給另外一個線程使用。
// 構造時,我們可以指定公平模式還是非公平模式,區別之后再說 public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack(); } abstract static class Transferer { // 從方法名上大概就知道,這個方法用於轉移元素,從生產者手上轉到消費者手上 // 也可以被動地,消費者調用這個方法來從生產者手上取元素 // 第一個參數 e 如果不是 null,代表場景為:將元素從生產者轉移給消費者 // 如果是 null,代表消費者等待生產者提供元素,然后返回值就是相應的生產者提供的元素 // 第二個參數代表是否設置超時,如果設置超時,超時時間是第三個參數的值 // 返回值如果是 null,代表超時,或者中斷。具體是哪個,可以通過檢測中斷狀態得到。 abstract Object transfer(Object e, boolean timed, long nanos); }
我們來看看 transfer 的設計思路,其基本算法如下:
- 當調用這個方法時,如果隊列是空的,或者隊列中的節點和當前的線程操作類型一致(如當前操作是 put 操作,而隊列中的元素也都是寫線程)。這種情況下,將當前線程加入到等待隊列即可。
- 如果隊列中有等待節點,而且與當前操作可以匹配(如隊列中都是讀操作線程,當前線程是寫操作線程,反之亦然)。這種情況下,匹配等待隊列的隊頭,出隊,返回相應數據。
其實這里有個隱含的條件被滿足了,隊列如果不為空,肯定都是同種類型的節點,要么都是讀操作,要么都是寫操作。這個就要看到底是讀線程積壓了,還是寫線程積壓了。
put 方法和 take 方法:
// 寫入值 public void put(E o) throws InterruptedException { if (o == null) throw new NullPointerException(); if (transferer.transfer(o, false, 0) == null) { // 1 Thread.interrupted(); throw new InterruptedException(); } } // 讀取值並移除 public E take() throws InterruptedException { Object e = transferer.transfer(null, false, 0); // 2 if (e != null) return (E)e; Thread.interrupted(); throw new InterruptedException(); }
節點:
static final class QNode { volatile QNode next; // 可以看出來,等待隊列是單向鏈表 volatile Object item; // CAS'ed to or from null volatile Thread waiter; // 將線程對象保存在這里,用於掛起和喚醒 final boolean isData; // 用於判斷是寫線程節點(isData == true),還是讀線程節點 QNode(Object item, boolean isData) { this.item = item; this.isData = isData; } ......
transfer 方法:
Object transfer(Object e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin // 隊列空,或隊列中節點類型和當前節點一致, // 即我們說的第一種情況,將節點入隊即可。讀者要想着這塊 if 里面方法其實就是入隊 if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; // t != tail 說明剛剛有節點入隊,continue 即可 if (t != tail) // inconsistent read continue; // 有其他節點入隊,但是 tail 還是指向原來的,此時設置 tail 即可 if (tn != null) { // lagging tail // 這個方法就是:如果 tail 此時為 t 的話,設置為 tn advanceTail(t, tn); continue; } // if (timed && nanos <= 0) // can't wait return null; if (s == null) s = new QNode(e, isData); // 將當前節點,插入到 tail 的后面 if (!t.casNext(null, s)) // failed to link in continue; // 將當前節點設置為新的 tail advanceTail(t, s); // swing tail and wait // 看到這里,請讀者先往下滑到這個方法,看完了以后再回來這里,思路也就不會斷了 Object x = awaitFulfill(s, e, timed, nanos); // 到這里,說明之前入隊的線程被喚醒了,准備往下執行 if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? x : e; // 這里的 else 分支就是上面說的第二種情況,有相應的讀或寫相匹配的情況 } else { // complementary-mode QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); return (x != null) ? x : e; } } } void advanceTail(QNode t, QNode nt) { if (tail == t) UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
// 自旋或阻塞,直到滿足條件,這個方法返回 Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) { long lastTime = timed ? System.nanoTime() : 0; Thread w = Thread.currentThread(); // 判斷需要自旋的次數, int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 如果被中斷了,那么取消這個節點 if (w.isInterrupted()) // 就是將當前節點 s 中的 item 屬性設置為 this s.tryCancel(e); Object x = s.item; // 這里是這個方法的唯一的出口 if (x != e) return x; // 如果需要,檢測是否超時 if (timed) { long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; if (nanos <= 0) { s.tryCancel(e); continue; } } if (spins > 0) --spins; // 如果自旋達到了最大的次數,那么檢測 else if (s.waiter == null) s.waiter = w; // 如果自旋到了最大的次數,那么線程掛起,等待喚醒 else if (!timed) LockSupport.park(this); // spinForTimeoutThreshold 這個之前講 AQS 的時候其實也說過,剩余時間小於這個閾值的時候,就 // 不要進行掛起了,自旋的性能會比較好 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
七、LinkedTransferQueue
BlockingQueue對讀或者寫都是鎖上整個隊列,在並發量大的時候,各種鎖是比較耗資源和耗時間的,而前面的SynchronousQueue雖然不會鎖住整個隊列,但它是一個沒有容量的“隊列”。
LinkedTransferQueue是ConcurrentLinkedQueue、SynchronousQueue (公平模式下)、無界的LinkedBlockingQueues等的超集。即可以像其他的BlockingQueue一樣有容量又可以像SynchronousQueue一樣不會鎖住整個隊列
LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對於LinkedTransferQueue多了tryTransfer和transfer方法。
transfer方法:如果當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法可以把生產者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等待接收元素,transfer方法會將元素存放在隊列的tail節點,並等到該元素被消費者消費了才返回。transfer方法的關鍵代碼如下:
Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);
第一行代碼是試圖把存放當前元素的s節點作為tail節點。第二行代碼是讓CPU自旋等待消費者消費元素。因為自旋會消耗CPU,所以自旋一定的次數后使用Thread.yield()方法來暫停當前正在執行的線程,並執行其他線程。
tryTransfer方法:則是用來試探下生產者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則返回false。和transfer方法的區別是tryTransfer方法無論消費者是否接收,方法立即返回。而transfer方法是必須等到消費者消費了才返回。
源碼分析:【死磕Java並發】—–J.U.C之阻塞隊列:LinkedTransferQueue
八、LinkedBlockingDeque
LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的你可以從隊列的兩端插入和移出元素。
雙端隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。相比其他的阻塞隊列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法。
在初始化LinkedBlockingDeque時可以初始化隊列的容量,用來防止其再擴容時過渡膨脹。另外雙向阻塞隊列可以運用在“工作竊取”模式中。
源碼分析:【死磕Java並發】—–J.U.C之阻塞隊列:LinkedBlockingDeque
參考資料 / 相關推薦:
【死磕Java並發】—–J.U.C之阻塞隊列:ArrayBlockingQueue
Java線程(十三):BlockingQueue-線程的阻塞隊列