拋出異常 | 特殊值 | 阻塞 | 超時 | |
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
檢查 | element() | peek() | 不可用 | 不可用 |
1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則招聘異常
2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.
3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續.
4)poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null
5)take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的對象被加入為止
2、BlockingQueue的幾個注意點
【1】BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個remainingCapacity,超出此容量,便無法無阻塞地put 附加元素。沒有任何內部容量約束的BlockingQueue 總是報告Integer.MAX_VALUE 的剩余容量。
【2】BlockingQueue 實現主要用於生產者-使用者隊列,但它另外還支持Collection
接口。因此,舉例來說,使用remove(x) 從隊列中移除任意一個元素是有可能的。然而,這種操作通常不 會有效執行,只能有計划地偶爾使用,比如在取消排隊信息時。
【3】BlockingQueue 實現是線程安全的。所有排隊方法都可以使用內部鎖或其他形式的並發控制來自動達到它們的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和removeAll)沒有 必要自動執行,除非在實現中特別說明。因此,舉例來說,在只添加了c 中的一些元素后,addAll(c) 有可能失敗(拋出一個異常)。

1)ArrayBlockingQueue:規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的.
2)LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的
3)PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數的Comparator決定的順序.
4)SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的.
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock;//每個對象對應一個顯示的鎖 lock.lock();//請求鎖直到獲得鎖(不可以被interrupte) try { if (count == items.length)//如果隊列已經滿了 return false; else { insert(e); return true; } } finally { lock.unlock();// } } 看insert方法: private void insert(E x) { items[putIndex] = x; //增加全局index的值。 /* Inc方法體內部: final int inc(int i) { return (++i == items.length)? 0 : i; } 這里可以看出ArrayBlockingQueue采用從前到后向內部數組插入的方式插入新元素的。如果插完了,putIndex可能重新變為0(在已經執行了移除操作的前提下,否則在之前的判斷中隊列為滿) */ putIndex = inc(putIndex); ++count; notEmpty.signal();//wake up one waiting thread }
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly();//請求鎖直到得到鎖或者變為interrupted try { try { while (count == items.length)//如果滿了,當前線程進入noFull對應的等waiting狀態 notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } }
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { if (count != items.length) { insert(e); return true; } if (nanos <= 0) return false; try { //如果沒有被 signal/interruptes,需要等待nanos時間才返回 nanos = notFull.awaitNanos(nanos); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } }
public boolean add(E e) { return super.add(e); } 父類: public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
該類中有幾個實例變量:takeIndex/putIndex/count
用三個數字來維護這個隊列中的數據變更: /** items index for next take, poll or remove */ private int takeIndex; /** items index for next put, offer, or add. */ private int putIndex; /** Number of items in the queue */ private int count;