在Java的java.util.concurrent包中定義了和多線程並發相關的操作,有許多好用的工具類,今天就來看下阻塞隊列。阻塞隊列很好的解決了多線程中數據的安全傳輸問題,其中最典型的例子就是客園很好的解決“生產者--消費者”問題。下面來看其中一個實現類ArrayBlockingQueue。看到這個名字,就很好理解這個隊列肯定是使用數組實現的隊列,即使用數組實現的“先進先出”的隊列,下面看其具體的實現。(均為JDK8)
一、構造方法
在ArrayBlockingQueue類中有下面的3個構造方法,
1、ArrayBlockingQueue(int)
接收一個整型的參數,這個整型參數指的是隊列的長度,其定義如下,
public ArrayBlockingQueue(int capacity) { this(capacity, false); }
可以看到這個方法調用的是ArrayBlockingQueue(int,boolean)方法,那么看下這個方法,
2、ArrayBlockingQueue(int,boolean)
接收兩個參數,一個整型,一個boolean類型,前邊已經知道整型參數是隊列的長度,那么boolean類型參數代表什么意思那,其定義如下,
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
可以看到在這個構造方法中進行了相關邏輯實現,對items進行了數組初始化,boolean類型的參數是作為可重入鎖的參數進行初始化,規定可重入鎖是公平還是不公平,默認為false,另外初始化了notEmpty、notFull兩個信號量。
3、ArrayBlockingQueue(int,boolean,Collection<? extends E>)
接收兩三個參數,第一個整型,第二個boolean類型,第三個集合類型,此構造方法不常用,其定義如下,
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
通過上面的三個構造方法可以構造一個ArrayBlockingQueue的隊列,在構造方法中初始化了實列變量,下面是一些實例變量,
private static final long serialVersionUID = -817911632652898426L; /** The queued items */ //保存隊列元素的數組 final Object[] items; /** items index for next take, poll, peek or remove */ //取出元素的位置 int takeIndex; /** items index for next put, offer, or add */ //添加元素的位置 int putIndex; /** Number of elements in the queue */ //隊列中元素的數量 int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ //鎖對象 final ReentrantLock lock; /** Condition for waiting takes */ //不空的信號量 private final Condition notEmpty; /** Condition for waiting puts */ //不滿的信號量 private final Condition notFull; /** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */ transient Itrs itrs = null;
二、隊列的操作
需要使用阻塞隊列,那么就需要向隊列中添加或取出元素,在ArrayBlocking中已經實現了相關操作,對於添加/取出均是成對出現,提供的方法中有拋出異常、返回false、線程阻塞等幾種情形。
1、add/peek
add/peek是一對互斥的操作,add向隊列種放入元素,peek取出元素。
1.1、add
add的定義如下
public boolean add(E e) { return super.add(e); }
從上面可以看出add方法調用了其父類的實現,父類實現如下
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
父類實現種調用的offer方法,在offer方法返回true時,add方法返回true,其他則拋出“Queue full”的異常。offer方法下面會講到。
1.2、peek
peek方法定義如下
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } }
在peek方法種使用可重入鎖,返回takeIndex處的元素,前面注釋中寫道,此變量代表的是待取出元素的索引。itemAt方法定義如下,
@SuppressWarnings("unchecked") final E itemAt(int i) { return (E) items[i]; }
此方法未進行任何的判斷直接返回takeIndex出的數組元素。
2、put/take
put/take是一對互斥的操作,put向隊列種放入元素,take取出元素,其實現方式和add/peek不一樣。
2.1、put
put的定義如下
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
此方法會判斷元素是否null,然后判斷當前隊列中的元素數量和隊列的長度,如果二者相等則阻塞當前線程;如果不相等則執行enqueue(e)方法,其定義如下,
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
把待插入的元素放在數組的putIndex位置,如果執行完插入后putIndex等於數組的長度,說明隊列已經滿了,那么把putIndex的值置為0,即下次插入的位置為0,下次要插入成功的必要條件是取出了一個元素,取出的位置為takeIndex。
2.2、take
take方法是取出一個元素,其定義如下,
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
此方法中首先判斷當前隊列的元素數量如果為0,則當前線程進行等待,等待notEmpty.singal(),如果不為空則執行dequeue()方法,其定義如下,
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
此方法取出takeIndex位置的元素,把數組中此位置的引用置為null,判斷takeIndex和數組的長度,如果相等證明,已經取到了最后一個元素,下次再取元素需要從位置0開始,為此把takeIndex置為0。
3、offer(E)/poll
offer/poll是一對互斥的操作,offer向隊列種放入元素,poll取出元素,
3.1、offer
offer的定義如下
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
此方法判斷如果隊列中的元素數量和隊列長度相等,則直接返回false,否則執行enqueue方法,put方法會將線程掛起,直到被中斷或插入成功。
3.2、poll
poll方法如下
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
此方法判斷當前隊列中的元素個數如果為0返回null,否則執行dequeue操作,take方法會將線程掛起,直到被中斷或取出成功。
4、offer(E,long,TimeUnit)/poll(long,TimeUnnit)
這兩個方法是普通offer/poll方法的加強版,在隊列滿時指定了重試的時間,如果超過指定的時間后還是無法添加或取出則返回false。
4.1、offer
offer方法如下
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e);
//超時時間 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false;//超過規定時間,返回false nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }
此方法會在指定的規定時間內一直重試,如果規定時間內無法退出循環即添加元素,則返回false。
4.2、poll
poll方法如下,
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
此方法同樣是在取出元素時進行規定時間的重試,超過規定時間則返回null。
三、方法比較
1、添加方法比較
序號 | 方法名 | 隊列滿時處理方式 | 方法返回值 |
1 | add(E e) | 拋出“Queue full”異常 | boolean |
2 | offer(E e) | 返回false | boolean |
3 | put(E e) | 線程阻塞,直到中斷或被喚醒 | void |
4 | offer(E e, long timeout, TimeUnit unit) | 在規定時間內重試,超過規定時間返回false | boolean |
2、取出方法比較
序號 | 方法名 | 隊列空時處理方式 | 方法返回值 |
1 | peek() | 返回null | E |
2 | poll() | 返回null | E |
3 | take() | 線程阻塞,指定中斷或被喚醒 | E |
4 | poll(long timeout, TimeUnit unit) | 在規定時間內重試,超過規定時間返回null | E |
四、總結
以上時關於ArrayBlockingQueue這個阻塞隊列的相關實現及方法介紹,此隊列以數組為載體,配合可重入鎖實現生產線程和消費線程共享數據,ArrayBlockingQueue作為共享池,實現了並發條件下的添加及取出等方法。
有不正之處歡迎指正,感謝!