概述
ArrayBlockingQueue是一個阻塞隊列,其實底層就是一個數組,說到底層是數組,ArrayList底層也是數組,那它其實也可以作為隊列,但是是非阻塞的,那阻塞和非阻塞的區別是什么?區別在於當隊列中沒有元素的時候就阻塞等待,直到隊列中有數據再消費,而如果隊列滿了之后(隊列有界),生產者就要阻塞。下面就總結一下ArrayBlockingQueue的特性。
- 是一個有界的隊列,初始化隊列的時候傳入隊列大小
- 采用ReentrantLock + Condition實現線程安全和阻塞
- 底層采用數組存儲
- 生產者和消費者共用一把鎖,所以效率一般
總結來說就是效率一般,容量有限,那既然這么差還要搞一個這個對象,原因就是這個實現起來簡單。ArrayBlockingQueue的插入和刪除操作都比較簡單,但是里面有一個東西其實還挺復雜的,就是Itrs,迭代器,我打算寫兩篇博客,本篇介紹插入和刪除等一般的方法,下一篇介紹迭代器。
繼承結構圖
這幅圖畫出了Java中常用隊列的繼承結構圖,可以看出所有的隊列都實現了AbstartQueue,這個抽象類實現了Queue接口,提供了Queue接口中方法的默認實現,繼承它可以少寫一些不必要的代碼,但是Queue接口中沒有提供大多數的阻塞方法,所以有了BlockingQueue接口,這個接口中提供很多的阻塞的插入刪除方法,而最底層的實現者都是阻塞隊列,所以都會實現這個接口。
屬性分析
/** 隊列中元素保存的地方 */ final Object[] items; /** items index for next take, poll, peek or remove,這個英文注釋很詳細 */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes,處理消費者線程的 */ private final Condition notEmpty; /** Condition for waiting puts,處理生產者線程的 */ private final Condition notFull; /** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state.
* 迭代器,由於一個隊列可以有多個迭代器,所以在該隊列中,迭代器通過鏈表連接起來,itrs屬性就是鏈表頭
* 通過這個頭,就可以找到所有的迭代器,下一篇文章會具體分析 */
transient Itrs itrs = null;
構造方法
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); //初始化數組,指定容量,不會擴容,固定 this.items = new Object[capacity]; //默認是非公平鎖,下面會解釋一下這個 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
關於這里的公平鎖,在網上看資料時,發現有的胖友寫的文章中說,這里如果是公平鎖,如果隊列滿了,生產者阻塞了非常多,那等待最久的生產者線程會先競爭到鎖,別的阻塞線程不能跟他競爭鎖,其實這么解釋的結論是對的,就是如果是公平鎖,那等最久的確實先競爭到鎖,但是原因並不是上面說的原因,即便是非公平鎖,如果沒有新的生產者加入競爭鎖,也是等最久先競爭到鎖,這里公平鎖的作用是說,當多個消費者線程消費多個元素之后,喚醒多個生產者,這時候如果有新的生產者加入,這些生產者需要等待剛剛喚醒的生產者執行完之后才可以競爭鎖。
Queue
public interface Queue<E> extends Collection<E> { //添加元素 boolean add(E e); //添加元素 boolean offer(E e); //刪除 E remove(); //消費元素,其實也是刪除 E poll(); //當前元素 E element(); //當前消費要消費的元素,不會移除隊列,只是獲取 E peek(); }
BlockingQueue
//其實這個接口繼承於Queue,只不過多添加了幾個方法 public interface BlockingQueue<E> extends Queue<E> { //添加元素 boolean add(E e); //添加元素 boolean offer(E e); //添加元素 void put(E e) throws InterruptedException; //添加元素 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //消費元素 E take() throws InterruptedException; //消費元素 E poll(long timeout, TimeUnit unit) throws InterruptedException; int remainingCapacity(); //刪除元素 boolean remove(Object o); public boolean contains(Object o); int drainTo(Collection<? super E> c); public boolean contains(Object o); }
搞不懂有些方法在Queue中已經定義了,這里還要重復定義。
ArrayBlockedQueue
ArrayBlockedQueue實現了上面的隊列,下面就對其實現的方法做一個對比。
生產者方法 |
消費者方法 |
add(E e):直接調用offer,和offer方法返回不同,如果插入成功,返回true,否則拋出異常 |
|
offer(E e):向隊尾插入數據,如果隊列滿了,就返回插入失敗 |
poll():從隊頭獲取元素,如果隊列為空,返回失敗 |
offer(E e, long timeout, TimeUnit unit):向隊尾插入數據, 如果隊列滿了,阻塞等待一段時間,超時返回插入失敗 |
poll(long timeout, TimeUnit unit):從隊頭獲取元素,如果隊列為空,等待一段時間,如果超時返回失敗 |
put(E e):向隊尾插入元素,如果隊列滿了就一直等待,直到隊列中有空閑空間 |
take():從隊頭獲取元素,如果隊列為空,阻塞等待,直到隊列中有元素再消費 |
remove(Object o):刪除隊列中的元素,可以是隊列中的任意元素 |
從上面的對比可知,中間三個方法生產者和消費者是相對的。
add(E e)
public boolean add(E e) { return super.add(e); } //AbstractQueue的實現,直接調用offer方法,不過和offer不同的是,如果插入隊列失敗 //直接拋出異常 public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
offer(E e)
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; //先加鎖,防止多線程出問題 lock.lock(); try { //如果數組滿了,返回false if (count == items.length) return false; else { //下面分析這個方法 enqueue(e); return true; } } finally { lock.unlock(); } }
進入#enqueue(E e)
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; //隊尾插入 items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; //元素數量自增 count++; //通知阻塞的消費者線程 notEmpty.signal(); }
這個方法實現很簡單,不過生產者的幾個方法基本都是調用這個方法。
offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); //獲取傳入的超時時間,轉為納秒 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; //獲取可中斷鎖 lock.lockInterruptibly(); try { //如果隊列滿了,進入循環 while (count == items.length) { //下面那個返回負數,這里直接return一個false結束 if (nanos <= 0) return false; //阻塞固定的時間,如果超時之后會返回一個0或者負數 nanos = notFull.awaitNanos(nanos); } //還是調用這個方法,就不分析了 enqueue(e); return true; } finally { lock.unlock(); } }
這個方法和offer(E e)有兩點不同:
- 使用的鎖是可中斷鎖,就是說如果在等待過程中,線程被中斷了會拋出一個異常
- 使用了超時等待,如果超時才會返回false
put(E e)
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; //可中斷鎖 lock.lockInterruptibly(); try { //隊列滿了 while (count == items.length) //等待,不設置超時時間 notFull.await(); enqueue(e); } finally { lock.unlock(); } }
ok,到此就把生產者的方法分析完了,其實都是調用的enqueue方法,很簡單。
用最帥的陳永仁做分割線。。。
poll(E e)
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //如果隊列為空返回null,否則之后后面的方法 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
進入dequeue()方法
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") //從隊列中獲取 E x = (E) items[takeIndex]; //刪除這個元素 items[takeIndex] = null; //判斷是不是到隊尾了,如果到隊尾了就從頭開始 if (++takeIndex == items.length) takeIndex = 0; count--; //如果該隊列存在迭代器,更新里面一些信息,后面會稍微說明一下 if (itrs != null) itrs.elementDequeued(); //通知生產者線程 notFull.signal(); return x; }
上面的過程都非常簡單,這里提一下itrs這部分代碼,舉個例子,假設通過如下代碼獲取了一個迭代器
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(10); Iterator<Integer> iterator = queue.iterator();
在迭代器初始化時候,其第一個要迭代的元素和消費者要消費的元素是同一個,在上面的代碼中如果消費者消費元素,把隊列中所有的元素消費完了,但是迭代器還沒有運行,這個時候就需要更新迭代器中的一些參數,不讓它迭代了,因為隊列已經為空了,這里只是提一下,下一篇文章會詳細介紹。
poll(long timeout, TimeUnit unit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
take(E e)
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
remove(Object o)
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { //遍歷隊列,尋找要刪除的元素 if (o.equals(items[i])) { //執行刪除,之后移動數組中的元素,把刪除的元素的空位置給擠占掉 removeAt(i); return true; } if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } }
進入removeAt(i)
void removeAt(final int removeIndex) { // assert lock.getHoldCount() == 1; // assert items[removeIndex] != null; // assert removeIndex >= 0 && removeIndex < items.length; final Object[] items = this.items; //如果刪除的就是下一個要消費的元素 if (removeIndex == takeIndex) { // removing front item; just advance items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); } else { // an "interior" remove // slide over all others up through putIndex. final int putIndex = this.putIndex; //移動元素 for (int i = removeIndex;;) { int next = i + 1; if (next == items.length) next = 0; if (next != putIndex) { items[i] = items[next]; i = next; } else { items[i] = null; this.putIndex = i; break; } } count--; if (itrs != null) //下一篇文章分析 itrs.removedAt(removeIndex); } //通知生產者線程 notFull.signal(); }
以上就是ArrayBlockingQueue常用方法,還有幾個比如contains,toString,toArray都很簡單,就不貼代碼了。
總結
ArrayBlockingQueue總的來說在上面列的結構圖中算是最簡單的一個,在概述中也說了,其實這里面復雜的是迭代,隊列其實很簡單,下一篇文章分析迭代過程。