Java多線程系列--“JUC集合”07之 ArrayBlockingQueue


 

概要

本章對Java.util.concurrent包中的ArrayBlockingQueue類進行詳細的介紹。內容包括:
ArrayBlockingQueue介紹
ArrayBlockingQueue原理和數據結構
ArrayBlockingQueue函數列表
ArrayBlockingQueue源碼分析(JDK1.7.0_40版本)
ArrayBlockingQueue示例

轉載請注明出處:http://www.cnblogs.com/skywang12345/p/3498652.html

 

ArrayBlockingQueue介紹

ArrayBlockingQueue是數組實現的線程安全的有界的阻塞隊列。
線程安全是指,ArrayBlockingQueue內部通過“互斥鎖”保護競爭資源,實現了多線程對競爭資源的互斥訪問。而有界,則是指ArrayBlockingQueue對應的數組是有界限的。 阻塞隊列,是指多線程訪問競爭資源時,當競爭資源已被某線程獲取時,其它要獲取該資源的線程需要阻塞等待;而且,ArrayBlockingQueue是按 FIFO(先進先出)原則對元素進行排序,元素都是從尾部插入到隊列,從頭部開始返回。

注意:ArrayBlockingQueue不同於ConcurrentLinkedQueue,ArrayBlockingQueue是數組實現的,並且是有界限的;而ConcurrentLinkedQueue是鏈表實現的,是無界限的。

 

ArrayBlockingQueue原理和數據結構

ArrayBlockingQueue的數據結構,如下圖所示:

說明
    1. ArrayBlockingQueue繼承於AbstractQueue,並且它實現了BlockingQueue接口。
    2. ArrayBlockingQueue內部是通過Object[]數組保存數據的,也就是說ArrayBlockingQueue本質上是通過數組實現的。ArrayBlockingQueue的大小,即數組的容量是創建ArrayBlockingQueue時指定的。
    3. ArrayBlockingQueue與ReentrantLock是組合關系,ArrayBlockingQueue中包含一個ReentrantLock對象(lock)。ReentrantLock是可重入的互斥鎖,ArrayBlockingQueue就是根據該互斥鎖實現“多線程對競爭資源的互斥訪問”。而且,ReentrantLock分為公平鎖和非公平鎖,關於具體使用公平鎖還是非公平鎖,在創建ArrayBlockingQueue時可以指定;而且,ArrayBlockingQueue默認會使用非公平鎖。
    4. ArrayBlockingQueue與Condition是組合關系,ArrayBlockingQueue中包含兩個Condition對象(notEmpty和notFull)。而且,Condition又依賴於ArrayBlockingQueue而存在,通過Condition可以實現對ArrayBlockingQueue的更精確的訪問 -- (01)若某線程(線程A)要取數據時,數組正好為空,則該線程會執行notEmpty.await()進行等待;當其它某個線程(線程B)向數組中插入了數據之后,會調用notEmpty.signal()喚醒“notEmpty上的等待線程”。此時,線程A會被喚醒從而得以繼續運行。(02)若某線程(線程H)要插入數據時,數組已滿,則該線程會它執行notFull.await()進行等待;當其它某個線程(線程I)取出數據之后,會調用notFull.signal()喚醒“notFull上的等待線程”。此時,線程H就會被喚醒從而得以繼續運行。
    關於ReentrantLock,公平鎖,非公平鎖,以及Condition等更多的內容,可以參考:

    (01) Java多線程系列--“JUC鎖”02之 互斥鎖ReentrantLock
    (02) Java多線程系列--“JUC鎖”03之 公平鎖(一)
    (03) Java多線程系列--“JUC鎖”04之 公平鎖(二)
    (04) Java多線程系列--“JUC鎖”05之 非公平鎖
    (05) Java多線程系列--“JUC鎖”06之 Condition條件

 

ArrayBlockingQueue函數列表

// 創建一個帶有給定的(固定)容量和默認訪問策略的 ArrayBlockingQueue。
ArrayBlockingQueue(int capacity)
// 創建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue。
ArrayBlockingQueue(int capacity, boolean fair)
// 創建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue,它最初包含給定 collection 的元素,並以 collection 迭代器的遍歷順序添加元素。
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)

// 將指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量),在成功時返回 true,如果此隊列已滿,則拋出 IllegalStateException。
boolean add(E e)
// 自動移除此隊列中的所有元素。
void clear()
// 如果此隊列包含指定的元素,則返回 true。
boolean contains(Object o)
// 移除此隊列中所有可用的元素,並將它們添加到給定 collection 中。
int drainTo(Collection<? super E> c)
// 最多從此隊列中移除給定數量的可用元素,並將這些元素添加到給定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 返回在此隊列中的元素上按適當順序進行迭代的迭代器。
Iterator<E> iterator()
// 將指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量),在成功時返回 true,如果此隊列已滿,則返回 false。
boolean offer(E e)
// 將指定的元素插入此隊列的尾部,如果該隊列已滿,則在到達指定的等待時間之前等待可用的空間。
boolean offer(E e, long timeout, TimeUnit unit)
// 獲取但不移除此隊列的頭;如果此隊列為空,則返回 null。
E peek()
// 獲取並移除此隊列的頭,如果此隊列為空,則返回 null。
E poll()
// 獲取並移除此隊列的頭部,在指定的等待時間前等待可用的元素(如果有必要)。
E poll(long timeout, TimeUnit unit)
// 將指定的元素插入此隊列的尾部,如果該隊列已滿,則等待可用的空間。
void put(E e)
// 返回在無阻塞的理想情況下(不存在內存或資源約束)此隊列能接受的其他元素數量。
int remainingCapacity()
// 從此隊列中移除指定元素的單個實例(如果存在)。
boolean remove(Object o)
// 返回此隊列中元素的數量。
int size()
// 獲取並移除此隊列的頭部,在元素變得可用之前一直等待(如果有必要)。
E take()
// 返回一個按適當順序包含此隊列中所有元素的數組。
Object[] toArray()
// 返回一個按適當順序包含此隊列中所有元素的數組;返回數組的運行時類型是指定數組的運行時類型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()

 

ArrayBlockingQueue源碼分析(JDK1.7.0_40版本)

ArrayBlockingQueue.java的完整源碼如下:

  1 /*
  2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  3  *
  4  *
  5  *
  6  *
  7  *
  8  *
  9  *
 10  *
 11  *
 12  *
 13  *
 14  *
 15  *
 16  *
 17  *
 18  *
 19  *
 20  *
 21  *
 22  *
 23  */
 24 
 25 /*
 26  *
 27  *
 28  *
 29  *
 30  *
 31  * Written by Doug Lea with assistance from members of JCP JSR-166
 32  * Expert Group and released to the public domain, as explained at
 33  * http://creativecommons.org/publicdomain/zero/1.0/
 34  */
 35 
 36 package java.util.concurrent;
 37 import java.util.concurrent.locks.*;
 38 import java.util.*;
 39 
 40 /**
 41  * A bounded {@linkplain BlockingQueue blocking queue} backed by an
 42  * array.  This queue orders elements FIFO (first-in-first-out).  The
 43  * <em>head</em> of the queue is that element that has been on the
 44  * queue the longest time.  The <em>tail</em> of the queue is that
 45  * element that has been on the queue the shortest time. New elements
 46  * are inserted at the tail of the queue, and the queue retrieval
 47  * operations obtain elements at the head of the queue.
 48  *
 49  * <p>This is a classic &quot;bounded buffer&quot;, in which a
 50  * fixed-sized array holds elements inserted by producers and
 51  * extracted by consumers.  Once created, the capacity cannot be
 52  * changed.  Attempts to {@code put} an element into a full queue
 53  * will result in the operation blocking; attempts to {@code take} an
 54  * element from an empty queue will similarly block.
 55  *
 56  * <p>This class supports an optional fairness policy for ordering
 57  * waiting producer and consumer threads.  By default, this ordering
 58  * is not guaranteed. However, a queue constructed with fairness set
 59  * to {@code true} grants threads access in FIFO order. Fairness
 60  * generally decreases throughput but reduces variability and avoids
 61  * starvation.
 62  *
 63  * <p>This class and its iterator implement all of the
 64  * <em>optional</em> methods of the {@link Collection} and {@link
 65  * Iterator} interfaces.
 66  *
 67  * <p>This class is a member of the
 68  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 69  * Java Collections Framework</a>.
 70  *
 71  * @since 1.5
 72  * @author Doug Lea
 73  * @param <E> the type of elements held in this collection
 74  */
 75 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
 76         implements BlockingQueue<E>, java.io.Serializable {
 77 
 78     /**
 79      * Serialization ID. This class relies on default serialization
 80      * even for the items array, which is default-serialized, even if
 81      * it is empty. Otherwise it could not be declared final, which is
 82      * necessary here.
 83      */
 84     private static final long serialVersionUID = -817911632652898426L;
 85 
 86     /** The queued items */
 87     final Object[] items;
 88 
 89     /** items index for next take, poll, peek or remove */
 90     int takeIndex;
 91 
 92     /** items index for next put, offer, or add */
 93     int putIndex;
 94 
 95     /** Number of elements in the queue */
 96     int count;
 97 
 98     /*
 99      * Concurrency control uses the classic two-condition algorithm
100      * found in any textbook.
101      */
102 
103     /** Main lock guarding all access */
104     final ReentrantLock lock;
105     /** Condition for waiting takes */
106     private final Condition notEmpty;
107     /** Condition for waiting puts */
108     private final Condition notFull;
109 
110     // Internal helper methods
111 
112     /**
113      * Circularly increment i.
114      */
115     final int inc(int i) {
116         return (++i == items.length) ? 0 : i;
117     }
118 
119     /**
120      * Circularly decrement i.
121      */
122     final int dec(int i) {
123         return ((i == 0) ? items.length : i) - 1;
124     }
125 
126     @SuppressWarnings("unchecked")
127     static <E> E cast(Object item) {
128         return (E) item;
129     }
130 
131     /**
132      * Returns item at index i.
133      */
134     final E itemAt(int i) {
135         return this.<E>cast(items[i]);
136     }
137 
138     /**
139      * Throws NullPointerException if argument is null.
140      *
141      * @param v the element
142      */
143     private static void checkNotNull(Object v) {
144         if (v == null)
145             throw new NullPointerException();
146     }
147 
148     /**
149      * Inserts element at current put position, advances, and signals.
150      * Call only when holding lock.
151      */
152     private void insert(E x) {
153         items[putIndex] = x;
154         putIndex = inc(putIndex);
155         ++count;
156         notEmpty.signal();
157     }
158 
159     /**
160      * Extracts element at current take position, advances, and signals.
161      * Call only when holding lock.
162      */
163     private E extract() {
164         final Object[] items = this.items;
165         E x = this.<E>cast(items[takeIndex]);
166         items[takeIndex] = null;
167         takeIndex = inc(takeIndex);
168         --count;
169         notFull.signal();
170         return x;
171     }
172 
173     /**
174      * Deletes item at position i.
175      * Utility for remove and iterator.remove.
176      * Call only when holding lock.
177      */
178     void removeAt(int i) {
179         final Object[] items = this.items;
180         // if removing front item, just advance
181         if (i == takeIndex) {
182             items[takeIndex] = null;
183             takeIndex = inc(takeIndex);
184         } else {
185             // slide over all others up through putIndex.
186             for (;;) {
187                 int nexti = inc(i);
188                 if (nexti != putIndex) {
189                     items[i] = items[nexti];
190                     i = nexti;
191                 } else {
192                     items[i] = null;
193                     putIndex = i;
194                     break;
195                 }
196             }
197         }
198         --count;
199         notFull.signal();
200     }
201 
202     /**
203      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
204      * capacity and default access policy.
205      *
206      * @param capacity the capacity of this queue
207      * @throws IllegalArgumentException if {@code capacity < 1}
208      */
209     public ArrayBlockingQueue(int capacity) {
210         this(capacity, false);
211     }
212 
213     /**
214      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
215      * capacity and the specified access policy.
216      *
217      * @param capacity the capacity of this queue
218      * @param fair if {@code true} then queue accesses for threads blocked
219      *        on insertion or removal, are processed in FIFO order;
220      *        if {@code false} the access order is unspecified.
221      * @throws IllegalArgumentException if {@code capacity < 1}
222      */
223     public ArrayBlockingQueue(int capacity, boolean fair) {
224         if (capacity <= 0)
225             throw new IllegalArgumentException();
226         this.items = new Object[capacity];
227         lock = new ReentrantLock(fair);
228         notEmpty = lock.newCondition();
229         notFull =  lock.newCondition();
230     }
231 
232     /**
233      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
234      * capacity, the specified access policy and initially containing the
235      * elements of the given collection,
236      * added in traversal order of the collection's iterator.
237      *
238      * @param capacity the capacity of this queue
239      * @param fair if {@code true} then queue accesses for threads blocked
240      *        on insertion or removal, are processed in FIFO order;
241      *        if {@code false} the access order is unspecified.
242      * @param c the collection of elements to initially contain
243      * @throws IllegalArgumentException if {@code capacity} is less than
244      *         {@code c.size()}, or less than 1.
245      * @throws NullPointerException if the specified collection or any
246      *         of its elements are null
247      */
248     public ArrayBlockingQueue(int capacity, boolean fair,
249                               Collection<? extends E> c) {
250         this(capacity, fair);
251 
252         final ReentrantLock lock = this.lock;
253         lock.lock(); // Lock only for visibility, not mutual exclusion
254         try {
255             int i = 0;
256             try {
257                 for (E e : c) {
258                     checkNotNull(e);
259                     items[i++] = e;
260                 }
261             } catch (ArrayIndexOutOfBoundsException ex) {
262                 throw new IllegalArgumentException();
263             }
264             count = i;
265             putIndex = (i == capacity) ? 0 : i;
266         } finally {
267             lock.unlock();
268         }
269     }
270 
271     /**
272      * Inserts the specified element at the tail of this queue if it is
273      * possible to do so immediately without exceeding the queue's capacity,
274      * returning {@code true} upon success and throwing an
275      * {@code IllegalStateException} if this queue is full.
276      *
277      * @param e the element to add
278      * @return {@code true} (as specified by {@link Collection#add})
279      * @throws IllegalStateException if this queue is full
280      * @throws NullPointerException if the specified element is null
281      */
282     public boolean add(E e) {
283         return super.add(e);
284     }
285 
286     /**
287      * Inserts the specified element at the tail of this queue if it is
288      * possible to do so immediately without exceeding the queue's capacity,
289      * returning {@code true} upon success and {@code false} if this queue
290      * is full.  This method is generally preferable to method {@link #add},
291      * which can fail to insert an element only by throwing an exception.
292      *
293      * @throws NullPointerException if the specified element is null
294      */
295     public boolean offer(E e) {
296         checkNotNull(e);
297         final ReentrantLock lock = this.lock;
298         lock.lock();
299         try {
300             if (count == items.length)
301                 return false;
302             else {
303                 insert(e);
304                 return true;
305             }
306         } finally {
307             lock.unlock();
308         }
309     }
310 
311     /**
312      * Inserts the specified element at the tail of this queue, waiting
313      * for space to become available if the queue is full.
314      *
315      * @throws InterruptedException {@inheritDoc}
316      * @throws NullPointerException {@inheritDoc}
317      */
318     public void put(E e) throws InterruptedException {
319         checkNotNull(e);
320         final ReentrantLock lock = this.lock;
321         lock.lockInterruptibly();
322         try {
323             while (count == items.length)
324                 notFull.await();
325             insert(e);
326         } finally {
327             lock.unlock();
328         }
329     }
330 
331     /**
332      * Inserts the specified element at the tail of this queue, waiting
333      * up to the specified wait time for space to become available if
334      * the queue is full.
335      *
336      * @throws InterruptedException {@inheritDoc}
337      * @throws NullPointerException {@inheritDoc}
338      */
339     public boolean offer(E e, long timeout, TimeUnit unit)
340         throws InterruptedException {
341 
342         checkNotNull(e);
343         long nanos = unit.toNanos(timeout);
344         final ReentrantLock lock = this.lock;
345         lock.lockInterruptibly();
346         try {
347             while (count == items.length) {
348                 if (nanos <= 0)
349                     return false;
350                 nanos = notFull.awaitNanos(nanos);
351             }
352             insert(e);
353             return true;
354         } finally {
355             lock.unlock();
356         }
357     }
358 
359     public E poll() {
360         final ReentrantLock lock = this.lock;
361         lock.lock();
362         try {
363             return (count == 0) ? null : extract();
364         } finally {
365             lock.unlock();
366         }
367     }
368 
369     public E take() throws InterruptedException {
370         final ReentrantLock lock = this.lock;
371         lock.lockInterruptibly();
372         try {
373             while (count == 0)
374                 notEmpty.await();
375             return extract();
376         } finally {
377             lock.unlock();
378         }
379     }
380 
381     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
382         long nanos = unit.toNanos(timeout);
383         final ReentrantLock lock = this.lock;
384         lock.lockInterruptibly();
385         try {
386             while (count == 0) {
387                 if (nanos <= 0)
388                     return null;
389                 nanos = notEmpty.awaitNanos(nanos);
390             }
391             return extract();
392         } finally {
393             lock.unlock();
394         }
395     }
396 
397     public E peek() {
398         final ReentrantLock lock = this.lock;
399         lock.lock();
400         try {
401             return (count == 0) ? null : itemAt(takeIndex);
402         } finally {
403             lock.unlock();
404         }
405     }
406 
407     // this doc comment is overridden to remove the reference to collections
408     // greater in size than Integer.MAX_VALUE
409     /**
410      * Returns the number of elements in this queue.
411      *
412      * @return the number of elements in this queue
413      */
414     public int size() {
415         final ReentrantLock lock = this.lock;
416         lock.lock();
417         try {
418             return count;
419         } finally {
420             lock.unlock();
421         }
422     }
423 
424     // this doc comment is a modified copy of the inherited doc comment,
425     // without the reference to unlimited queues.
426     /**
427      * Returns the number of additional elements that this queue can ideally
428      * (in the absence of memory or resource constraints) accept without
429      * blocking. This is always equal to the initial capacity of this queue
430      * less the current {@code size} of this queue.
431      *
432      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
433      * an element will succeed by inspecting {@code remainingCapacity}
434      * because it may be the case that another thread is about to
435      * insert or remove an element.
436      */
437     public int remainingCapacity() {
438         final ReentrantLock lock = this.lock;
439         lock.lock();
440         try {
441             return items.length - count;
442         } finally {
443             lock.unlock();
444         }
445     }
446 
447     /**
448      * Removes a single instance of the specified element from this queue,
449      * if it is present.  More formally, removes an element {@code e} such
450      * that {@code o.equals(e)}, if this queue contains one or more such
451      * elements.
452      * Returns {@code true} if this queue contained the specified element
453      * (or equivalently, if this queue changed as a result of the call).
454      *
455      * <p>Removal of interior elements in circular array based queues
456      * is an intrinsically slow and disruptive operation, so should
457      * be undertaken only in exceptional circumstances, ideally
458      * only when the queue is known not to be accessible by other
459      * threads.
460      *
461      * @param o element to be removed from this queue, if present
462      * @return {@code true} if this queue changed as a result of the call
463      */
464     public boolean remove(Object o) {
465         if (o == null) return false;
466         final Object[] items = this.items;
467         final ReentrantLock lock = this.lock;
468         lock.lock();
469         try {
470             for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
471                 if (o.equals(items[i])) {
472                     removeAt(i);
473                     return true;
474                 }
475             }
476             return false;
477         } finally {
478             lock.unlock();
479         }
480     }
481 
482     /**
483      * Returns {@code true} if this queue contains the specified element.
484      * More formally, returns {@code true} if and only if this queue contains
485      * at least one element {@code e} such that {@code o.equals(e)}.
486      *
487      * @param o object to be checked for containment in this queue
488      * @return {@code true} if this queue contains the specified element
489      */
490     public boolean contains(Object o) {
491         if (o == null) return false;
492         final Object[] items = this.items;
493         final ReentrantLock lock = this.lock;
494         lock.lock();
495         try {
496             for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
497                 if (o.equals(items[i]))
498                     return true;
499             return false;
500         } finally {
501             lock.unlock();
502         }
503     }
504 
505     /**
506      * Returns an array containing all of the elements in this queue, in
507      * proper sequence.
508      *
509      * <p>The returned array will be "safe" in that no references to it are
510      * maintained by this queue.  (In other words, this method must allocate
511      * a new array).  The caller is thus free to modify the returned array.
512      *
513      * <p>This method acts as bridge between array-based and collection-based
514      * APIs.
515      *
516      * @return an array containing all of the elements in this queue
517      */
518     public Object[] toArray() {
519         final Object[] items = this.items;
520         final ReentrantLock lock = this.lock;
521         lock.lock();
522         try {
523             final int count = this.count;
524             Object[] a = new Object[count];
525             for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
526                 a[k] = items[i];
527             return a;
528         } finally {
529             lock.unlock();
530         }
531     }
532 
533     /**
534      * Returns an array containing all of the elements in this queue, in
535      * proper sequence; the runtime type of the returned array is that of
536      * the specified array.  If the queue fits in the specified array, it
537      * is returned therein.  Otherwise, a new array is allocated with the
538      * runtime type of the specified array and the size of this queue.
539      *
540      * <p>If this queue fits in the specified array with room to spare
541      * (i.e., the array has more elements than this queue), the element in
542      * the array immediately following the end of the queue is set to
543      * {@code null}.
544      *
545      * <p>Like the {@link #toArray()} method, this method acts as bridge between
546      * array-based and collection-based APIs.  Further, this method allows
547      * precise control over the runtime type of the output array, and may,
548      * under certain circumstances, be used to save allocation costs.
549      *
550      * <p>Suppose {@code x} is a queue known to contain only strings.
551      * The following code can be used to dump the queue into a newly
552      * allocated array of {@code String}:
553      *
554      * <pre>
555      *     String[] y = x.toArray(new String[0]);</pre>
556      *
557      * Note that {@code toArray(new Object[0])} is identical in function to
558      * {@code toArray()}.
559      *
560      * @param a the array into which the elements of the queue are to
561      *          be stored, if it is big enough; otherwise, a new array of the
562      *          same runtime type is allocated for this purpose
563      * @return an array containing all of the elements in this queue
564      * @throws ArrayStoreException if the runtime type of the specified array
565      *         is not a supertype of the runtime type of every element in
566      *         this queue
567      * @throws NullPointerException if the specified array is null
568      */
569     @SuppressWarnings("unchecked")
570     public <T> T[] toArray(T[] a) {
571         final Object[] items = this.items;
572         final ReentrantLock lock = this.lock;
573         lock.lock();
574         try {
575             final int count = this.count;
576             final int len = a.length;
577             if (len < count)
578                 a = (T[])java.lang.reflect.Array.newInstance(
579                     a.getClass().getComponentType(), count);
580             for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
581                 a[k] = (T) items[i];
582             if (len > count)
583                 a[count] = null;
584             return a;
585         } finally {
586             lock.unlock();
587         }
588     }
589 
590     public String toString() {
591         final ReentrantLock lock = this.lock;
592         lock.lock();
593         try {
594             int k = count;
595             if (k == 0)
596                 return "[]";
597 
598             StringBuilder sb = new StringBuilder();
599             sb.append('[');
600             for (int i = takeIndex; ; i = inc(i)) {
601                 Object e = items[i];
602                 sb.append(e == this ? "(this Collection)" : e);
603                 if (--k == 0)
604                     return sb.append(']').toString();
605                 sb.append(',').append(' ');
606             }
607         } finally {
608             lock.unlock();
609         }
610     }
611 
612     /**
613      * Atomically removes all of the elements from this queue.
614      * The queue will be empty after this call returns.
615      */
616     public void clear() {
617         final Object[] items = this.items;
618         final ReentrantLock lock = this.lock;
619         lock.lock();
620         try {
621             for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
622                 items[i] = null;
623             count = 0;
624             putIndex = 0;
625             takeIndex = 0;
626             notFull.signalAll();
627         } finally {
628             lock.unlock();
629         }
630     }
631 
632     /**
633      * @throws UnsupportedOperationException {@inheritDoc}
634      * @throws ClassCastException            {@inheritDoc}
635      * @throws NullPointerException          {@inheritDoc}
636      * @throws IllegalArgumentException      {@inheritDoc}
637      */
638     public int drainTo(Collection<? super E> c) {
639         checkNotNull(c);
640         if (c == this)
641             throw new IllegalArgumentException();
642         final Object[] items = this.items;
643         final ReentrantLock lock = this.lock;
644         lock.lock();
645         try {
646             int i = takeIndex;
647             int n = 0;
648             int max = count;
649             while (n < max) {
650                 c.add(this.<E>cast(items[i]));
651                 items[i] = null;
652                 i = inc(i);
653                 ++n;
654             }
655             if (n > 0) {
656                 count = 0;
657                 putIndex = 0;
658                 takeIndex = 0;
659                 notFull.signalAll();
660             }
661             return n;
662         } finally {
663             lock.unlock();
664         }
665     }
666 
667     /**
668      * @throws UnsupportedOperationException {@inheritDoc}
669      * @throws ClassCastException            {@inheritDoc}
670      * @throws NullPointerException          {@inheritDoc}
671      * @throws IllegalArgumentException      {@inheritDoc}
672      */
673     public int drainTo(Collection<? super E> c, int maxElements) {
674         checkNotNull(c);
675         if (c == this)
676             throw new IllegalArgumentException();
677         if (maxElements <= 0)
678             return 0;
679         final Object[] items = this.items;
680         final ReentrantLock lock = this.lock;
681         lock.lock();
682         try {
683             int i = takeIndex;
684             int n = 0;
685             int max = (maxElements < count) ? maxElements : count;
686             while (n < max) {
687                 c.add(this.<E>cast(items[i]));
688                 items[i] = null;
689                 i = inc(i);
690                 ++n;
691             }
692             if (n > 0) {
693                 count -= n;
694                 takeIndex = i;
695                 notFull.signalAll();
696             }
697             return n;
698         } finally {
699             lock.unlock();
700         }
701     }
702 
703     /**
704      * Returns an iterator over the elements in this queue in proper sequence.
705      * The elements will be returned in order from first (head) to last (tail).
706      *
707      * <p>The returned {@code Iterator} is a "weakly consistent" iterator that
708      * will never throw {@link java.util.ConcurrentModificationException
709      * ConcurrentModificationException},
710      * and guarantees to traverse elements as they existed upon
711      * construction of the iterator, and may (but is not guaranteed to)
712      * reflect any modifications subsequent to construction.
713      *
714      * @return an iterator over the elements in this queue in proper sequence
715      */
716     public Iterator<E> iterator() {
717         return new Itr();
718     }
719 
720     /**
721      * Iterator for ArrayBlockingQueue. To maintain weak consistency
722      * with respect to puts and takes, we (1) read ahead one slot, so
723      * as to not report hasNext true but then not have an element to
724      * return -- however we later recheck this slot to use the most
725      * current value; (2) ensure that each array slot is traversed at
726      * most once (by tracking "remaining" elements); (3) skip over
727      * null slots, which can occur if takes race ahead of iterators.
728      * However, for circular array-based queues, we cannot rely on any
729      * well established definition of what it means to be weakly
730      * consistent with respect to interior removes since these may
731      * require slot overwrites in the process of sliding elements to
732      * cover gaps. So we settle for resiliency, operating on
733      * established apparent nexts, which may miss some elements that
734      * have moved between calls to next.
735      */
736     private class Itr implements Iterator<E> {
737         private int remaining; // Number of elements yet to be returned
738         private int nextIndex; // Index of element to be returned by next
739         private E nextItem;    // Element to be returned by next call to next
740         private E lastItem;    // Element returned by last call to next
741         private int lastRet;   // Index of last element returned, or -1 if none
742 
743         Itr() {
744             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
745             lock.lock();
746             try {
747                 lastRet = -1;
748                 if ((remaining = count) > 0)
749                     nextItem = itemAt(nextIndex = takeIndex);
750             } finally {
751                 lock.unlock();
752             }
753         }
754 
755         public boolean hasNext() {
756             return remaining > 0;
757         }
758 
759         public E next() {
760             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
761             lock.lock();
762             try {
763                 if (remaining <= 0)
764                     throw new NoSuchElementException();
765                 lastRet = nextIndex;
766                 E x = itemAt(nextIndex);  // check for fresher value
767                 if (x == null) {
768                     x = nextItem;         // we are forced to report old value
769                     lastItem = null;      // but ensure remove fails
770                 }
771                 else
772                     lastItem = x;
773                 while (--remaining > 0 && // skip over nulls
774                        (nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
775                     ;
776                 return x;
777             } finally {
778                 lock.unlock();
779             }
780         }
781 
782         public void remove() {
783             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
784             lock.lock();
785             try {
786                 int i = lastRet;
787                 if (i == -1)
788                     throw new IllegalStateException();
789                 lastRet = -1;
790                 E x = lastItem;
791                 lastItem = null;
792                 // only remove if item still at index
793                 if (x != null && x == items[i]) {
794                     boolean removingHead = (i == takeIndex);
795                     removeAt(i);
796                     if (!removingHead)
797                         nextIndex = dec(nextIndex);
798                 }
799             } finally {
800                 lock.unlock();
801             }
802         }
803     }
804 
805 }
View Code

 

下面從ArrayBlockingQueue的創建,添加,取出,遍歷這幾個方面對ArrayBlockingQueue進行分析。

1. 創建

下面以ArrayBlockingQueue(int capacity, boolean fair)來進行說明。

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

說明
(01) items是保存“阻塞隊列”數據的數組。它的定義如下:

final Object[] items;

(02) fair是“可重入的獨占鎖(ReentrantLock)”的類型。fair為true,表示是公平鎖;fair為false,表示是非公平鎖。
notEmpty和notFull是鎖的兩個Condition條件。它們的定義如下:

final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

簡單對Condition和Lock的用法進行說明,更多內容請參考“Java多線程系列--“JUC鎖”06之 Condition條件”。
Lock的作用是提供獨占鎖機制,來保護競爭資源;而Condition是為了更加精細的對鎖進行控制,它依賴於Lock,通過某個條件對多線程進行控制。
notEmpty表示“鎖的非空條件”。當某線程想從隊列中取數據時,而此時又沒有數據,則該線程通過notEmpty.await()進行等待;當其它線程向隊列中插入了元素之后,就調用notEmpty.signal()喚醒“之前通過notEmpty.await()進入等待狀態的線程”。
同理,notFull表示“鎖的滿條件”。當某線程想向隊列中插入元素,而此時隊列已滿時,該線程等待;當其它線程從隊列中取出元素之后,就喚醒該等待的線程。

 

2. 添加

下面以offer(E e)為例,對ArrayBlockingQueue的添加方法進行說明。

public boolean offer(E e) {
    // 創建插入的元素是否為null,是的話拋出NullPointerException異常
    checkNotNull(e);
    // 獲取“該阻塞隊列的獨占鎖”
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 如果隊列已滿,則返回false。
        if (count == items.length)
            return false;
        else {
        // 如果隊列未滿,則插入e,並返回true。
            insert(e);
            return true;
        }
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}

說明:offer(E e)的作用是將e插入阻塞隊列的尾部。如果隊列已滿,則返回false,表示插入失敗;否則,插入元素,並返回true。
(01) count表示”隊列中的元素個數“。除此之外,隊列中還有另外兩個遍歷takeIndex和putIndex。takeIndex表示下一個被取出元素的索引,putIndex表示下一個被添加元素的索引。它們的定義如下:

// 隊列中的元素個數
int takeIndex;
// 下一個被取出元素的索引
int putIndex;
// 下一個被添加元素的索引
int count;

(02) insert()的源碼如下

private void insert(E x) {
    // 將x添加到”隊列“中
    items[putIndex] = x;
    // 設置”下一個被取出元素的索引“
    putIndex = inc(putIndex);
    // 將”隊列中的元素個數”+1
    ++count;
    // 喚醒notEmpty上的等待線程
    notEmpty.signal();
}

insert()在插入元素之后,會喚醒notEmpty上面的等待線程。

inc()的源碼如下:

final int inc(int i) {
    return (++i == items.length) ? 0 : i;
}

若i+1的值等於“隊列的長度”,即添加元素之后,隊列滿;則設置“下一個被添加元素的索引”為0。

 

3. 取出

下面以take()為例,對ArrayBlockingQueue的取出方法進行說明。

public E take() throws InterruptedException {
    // 獲取“隊列的獨占鎖”
    final ReentrantLock lock = this.lock;
    // 獲取“鎖”,若當前線程是中斷狀態,則拋出InterruptedException異常
    lock.lockInterruptibly();
    try {
        // 若“隊列為空”,則一直等待。
        while (count == 0)
            notEmpty.await();
        // 取出元素
        return extract();
    } finally {
        // 釋放“鎖”
        lock.unlock();
    }
}

說明:take()的作用是取出並返回隊列的頭。若隊列為空,則一直等待。

extract()的源碼如下:

private E extract() {
    final Object[] items = this.items;
    // 強制將元素轉換為“泛型E”
    E x = this.<E>cast(items[takeIndex]);
    // 將第takeIndex元素設為null,即刪除。同時,幫助GC回收。
    items[takeIndex] = null;
    // 設置“下一個被取出元素的索引”
    takeIndex = inc(takeIndex);
    // 將“隊列中元素數量”-1
    --count;
    // 喚醒notFull上的等待線程。
    notFull.signal();
    return x;
}

說明:extract()在刪除元素之后,會喚醒notFull上的等待線程。

 

4. 遍歷

下面對ArrayBlockingQueue的遍歷方法進行說明。

public Iterator<E> iterator() {
    return new Itr();
}

 

Itr是實現了Iterator接口的類,它的源碼如下:

private class Itr implements Iterator<E> {
    // 隊列中剩余元素的個數
    private int remaining; // Number of elements yet to be returned
    // 下一次調用next()返回的元素的索引
    private int nextIndex; // Index of element to be returned by next
    // 下一次調用next()返回的元素
    private E nextItem;    // Element to be returned by next call to next
    // 上一次調用next()返回的元素
    private E lastItem;    // Element returned by last call to next
    // 上一次調用next()返回的元素的索引
    private int lastRet;   // Index of last element returned, or -1 if none

    Itr() {
        // 獲取“阻塞隊列”的鎖
        final ReentrantLock lock = ArrayBlockingQueue.this.lock;
        lock.lock();
        try {
            lastRet = -1;
            if ((remaining = count) > 0)
                nextItem = itemAt(nextIndex = takeIndex);
        } finally {
            // 釋放“鎖”
            lock.unlock();
        }
    }

    public boolean hasNext() {
        return remaining > 0;
    }

    public E next() {
        // 獲取“阻塞隊列”的鎖
        final ReentrantLock lock = ArrayBlockingQueue.this.lock;
        lock.lock();
        try {
            // 若“剩余元素<=0”,則拋出異常。
            if (remaining <= 0)
                throw new NoSuchElementException();
            lastRet = nextIndex;
            // 獲取第nextIndex位置的元素
            E x = itemAt(nextIndex);  // check for fresher value
            if (x == null) {
                x = nextItem;         // we are forced to report old value
                lastItem = null;      // but ensure remove fails
            }
            else
                lastItem = x;
            while (--remaining > 0 && // skip over nulls
                   (nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
                ;
            return x;
        } finally {
            lock.unlock();
        }
    }

    public void remove() {
        final ReentrantLock lock = ArrayBlockingQueue.this.lock;
        lock.lock();
        try {
            int i = lastRet;
            if (i == -1)
                throw new IllegalStateException();
            lastRet = -1;
            E x = lastItem;
            lastItem = null;
            // only remove if item still at index
            if (x != null && x == items[i]) {
                boolean removingHead = (i == takeIndex);
                removeAt(i);
                if (!removingHead)
                    nextIndex = dec(nextIndex);
            }
        } finally {
            lock.unlock();
        }
    }
}

 

ArrayBlockingQueue示例

import java.util.*;
import java.util.concurrent.*;

/*
 *   ArrayBlockingQueue是“線程安全”的隊列,而LinkedList是非線程安全的。
 *
 *   下面是“多個線程同時操作並且遍歷queue”的示例
 *   (01) 當queue是ArrayBlockingQueue對象時,程序能正常運行。
 *   (02) 當queue是LinkedList對象時,程序會產生ConcurrentModificationException異常。
 *
 * @author skywang
 */
public class ArrayBlockingQueueDemo1{

    // TODO: queue是LinkedList對象時,程序會出錯。
    //private static Queue<String> queue = new LinkedList<String>();
    private static Queue<String> queue = new ArrayBlockingQueue<String>(20);
    public static void main(String[] args) {
    
        // 同時啟動兩個線程對queue進行操作!
        new MyThread("ta").start();
        new MyThread("tb").start();
    }

    private static void printAll() {
        String value;
        Iterator iter = queue.iterator();
        while(iter.hasNext()) {
            value = (String)iter.next();
            System.out.print(value+", ");
        }
        System.out.println();
    }

    private static class MyThread extends Thread {
        MyThread(String name) {
            super(name);
        }
        @Override
        public void run() {
                int i = 0;
            while (i++ < 6) {
                // “線程名” + "-" + "序號"
                String val = Thread.currentThread().getName()+i;
                queue.add(val);
                // 通過“Iterator”遍歷queue。
                printAll();
            }
        }
    }
}

(某一次)運行結果

ta1, ta1, 
tb1, ta1, 
tb1, ta1, ta2, 
tb1, ta1, ta2, tb1, tb2, 
ta2, ta1, tb2, tb1, ta3, 
ta2, ta1, tb2, tb1, ta3, ta2, tb3, 
tb2, ta1, ta3, tb1, tb3, ta2, ta4, 
tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, 
ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, 
ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, 
tb3, ta1, ta4, tb1, tb4, ta2, ta5, tb2, tb5, ta3, ta6, 
tb3, ta4, tb4, ta5, tb5, ta6, tb6, 

結果說明如果將源碼中的queue改成LinkedList對象時,程序會產生ConcurrentModificationException異常。

 


更多內容

1. Java多線程系列--“JUC集合”01之 框架

2. Java多線程系列目錄(共xx篇) 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM