Java多線程系列--“JUC集合”09之 LinkedBlockingDeque


 

概要

本章介紹JUC包中的LinkedBlockingDeque。內容包括:
LinkedBlockingDeque介紹
LinkedBlockingDeque原理和數據結構
LinkedBlockingDeque函數列表
LinkedBlockingDeque源碼分析(JDK1.7.0_40版本)
LinkedBlockingDeque示例

轉載請注明出處:http://www.cnblogs.com/skywang12345/p/3503480.html

 

LinkedBlockingDeque介紹

LinkedBlockingDeque是雙向鏈表實現的雙向並發阻塞隊列。該阻塞隊列同時支持FIFO和FILO兩種操作方式,即可以從隊列的頭和尾同時操作(插入/刪除);並且,該阻塞隊列是支持線程安全。

此外,LinkedBlockingDeque還是可選容量的(防止過度膨脹),即可以指定隊列的容量。如果不指定,默認容量大小等於Integer.MAX_VALUE。

 

LinkedBlockingDeque原理和數據結構

LinkedBlockingDeque的數據結構,如下圖所示:

說明
1. LinkedBlockingDeque繼承於AbstractQueue,它本質上是一個支持FIFO和FILO的雙向的隊列。
2. LinkedBlockingDeque實現了BlockingDeque接口,它支持多線程並發。當多線程競爭同一個資源時,某線程獲取到該資源之后,其它線程需要阻塞等待。
3. LinkedBlockingDeque是通過雙向鏈表實現的。
3.1 first是雙向鏈表的表頭。
3.2 last是雙向鏈表的表尾。
3.3 count是LinkedBlockingDeque的實際大小,即雙向鏈表中當前節點個數。
3.4 capacity是LinkedBlockingDeque的容量,它是在創建LinkedBlockingDeque時指定的。
3.5 lock是控制對LinkedBlockingDeque的互斥鎖,當多個線程競爭同時訪問LinkedBlockingDeque時,某線程獲取到了互斥鎖lock,其它線程則需要阻塞等待,直到該線程釋放lock,其它線程才有機會獲取lock從而獲取cpu執行權。
3.6 notEmptynotFull分別是“非空條件”和“未滿條件”。通過它們能夠更加細膩進行並發控制。

     -- 若某線程(線程A)要取出數據時,隊列正好為空,則該線程會執行notEmpty.await()進行等待;當其它某個線程(線程B)向隊列中插入了數據之后,會調用notEmpty.signal()喚醒“notEmpty上的等待線程”。此時,線程A會被喚醒從而得以繼續運行。 此外,線程A在執行取操作前,會獲取takeLock,在取操作執行完畢再釋放takeLock。 -- 若某線程(線程H)要插入數據時,隊列已滿,則該線程會它執行notFull.await()進行等待;當其它某個線程(線程I)取出數據之后,會調用notFull.signal()喚醒“notFull上的等待線程”。此時,線程H就會被喚醒從而得以繼續運行。 此外,線程H在執行插入操作前,會獲取putLock,在插入操作執行完畢才釋放putLock。

關於ReentrantLock 和 Condition等更多的內容,可以參考:
    (01) Java多線程系列--“JUC鎖”02之 互斥鎖ReentrantLock
    (02) Java多線程系列--“JUC鎖”03之 公平鎖(一)
    (03) Java多線程系列--“JUC鎖”04之 公平鎖(二)
    (04) Java多線程系列--“JUC鎖”05之 非公平鎖
    (05) Java多線程系列--“JUC鎖”06之 Condition條件

 

LinkedBlockingDeque函數列表

// 創建一個容量為 Integer.MAX_VALUE 的 LinkedBlockingDeque。
LinkedBlockingDeque()
// 創建一個容量為 Integer.MAX_VALUE 的 LinkedBlockingDeque,最初包含給定 collection 的元素,以該 collection 迭代器的遍歷順序添加。
LinkedBlockingDeque(Collection<? extends E> c)
// 創建一個具有給定(固定)容量的 LinkedBlockingDeque。
LinkedBlockingDeque(int capacity)

// 在不違反容量限制的情況下,將指定的元素插入此雙端隊列的末尾。
boolean add(E e)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列的開頭;如果當前沒有空間可用,則拋出 IllegalStateException。
void addFirst(E e)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列的末尾;如果當前沒有空間可用,則拋出 IllegalStateException。
void addLast(E e)
// 以原子方式 (atomically) 從此雙端隊列移除所有元素。
void clear()
// 如果此雙端隊列包含指定的元素,則返回 true。
boolean contains(Object o)
// 返回在此雙端隊列的元素上以逆向連續順序進行迭代的迭代器。
Iterator<E> descendingIterator()
// 移除此隊列中所有可用的元素,並將它們添加到給定 collection 中。
int drainTo(Collection<? super E> c)
// 最多從此隊列中移除給定數量的可用元素,並將這些元素添加到給定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 獲取但不移除此雙端隊列表示的隊列的頭部。
E element()
// 獲取,但不移除此雙端隊列的第一個元素。
E getFirst()
// 獲取,但不移除此雙端隊列的最后一個元素。
E getLast()
// 返回在此雙端隊列元素上以恰當順序進行迭代的迭代器。
Iterator<E> iterator()
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列表示的隊列中(即此雙端隊列的尾部),並在成功時返回 true;如果當前沒有空間可用,則返回 false。
boolean offer(E e)
// 將指定的元素插入此雙端隊列表示的隊列中(即此雙端隊列的尾部),必要時將在指定的等待時間內一直等待可用空間。
boolean offer(E e, long timeout, TimeUnit unit)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列的開頭,並在成功時返回 true;如果當前沒有空間可用,則返回 false。
boolean offerFirst(E e)
// 將指定的元素插入此雙端隊列的開頭,必要時將在指定的等待時間內等待可用空間。
boolean offerFirst(E e, long timeout, TimeUnit unit)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列的末尾,並在成功時返回 true;如果當前沒有空間可用,則返回 false。
boolean offerLast(E e)
// 將指定的元素插入此雙端隊列的末尾,必要時將在指定的等待時間內等待可用空間。
boolean offerLast(E e, long timeout, TimeUnit unit)
// 獲取但不移除此雙端隊列表示的隊列的頭部(即此雙端隊列的第一個元素);如果此雙端隊列為空,則返回 null。
E peek()
// 獲取,但不移除此雙端隊列的第一個元素;如果此雙端隊列為空,則返回 null。
E peekFirst()
// 獲取,但不移除此雙端隊列的最后一個元素;如果此雙端隊列為空,則返回 null。
E peekLast()
// 獲取並移除此雙端隊列表示的隊列的頭部(即此雙端隊列的第一個元素);如果此雙端隊列為空,則返回 null。
E poll()
// 獲取並移除此雙端隊列表示的隊列的頭部(即此雙端隊列的第一個元素),如有必要將在指定的等待時間內等待可用元素。
E poll(long timeout, TimeUnit unit)
// 獲取並移除此雙端隊列的第一個元素;如果此雙端隊列為空,則返回 null。
E pollFirst()
// 獲取並移除此雙端隊列的第一個元素,必要時將在指定的等待時間等待可用元素。
E pollFirst(long timeout, TimeUnit unit)
// 獲取並移除此雙端隊列的最后一個元素;如果此雙端隊列為空,則返回 null。
E pollLast()
// 獲取並移除此雙端隊列的最后一個元素,必要時將在指定的等待時間內等待可用元素。
E pollLast(long timeout, TimeUnit unit)
// 從此雙端隊列所表示的堆棧中彈出一個元素。
E pop()
// 將元素推入此雙端隊列表示的棧。
void push(E e)
// 將指定的元素插入此雙端隊列表示的隊列中(即此雙端隊列的尾部),必要時將一直等待可用空間。
void put(E e)
// 將指定的元素插入此雙端隊列的開頭,必要時將一直等待可用空間。
void putFirst(E e)
// 將指定的元素插入此雙端隊列的末尾,必要時將一直等待可用空間。
void putLast(E e)
// 返回理想情況下(沒有內存和資源約束)此雙端隊列可不受阻塞地接受的額外元素數。
int remainingCapacity()
// 獲取並移除此雙端隊列表示的隊列的頭部。
E remove()
// 從此雙端隊列移除第一次出現的指定元素。
boolean remove(Object o)
// 獲取並移除此雙端隊列第一個元素。
E removeFirst()
// 從此雙端隊列移除第一次出現的指定元素。
boolean removeFirstOccurrence(Object o)
// 獲取並移除此雙端隊列的最后一個元素。
E removeLast()
// 從此雙端隊列移除最后一次出現的指定元素。
boolean removeLastOccurrence(Object o)
// 返回此雙端隊列中的元素數。
int size()
// 獲取並移除此雙端隊列表示的隊列的頭部(即此雙端隊列的第一個元素),必要時將一直等待可用元素。
E take()
// 獲取並移除此雙端隊列的第一個元素,必要時將一直等待可用元素。
E takeFirst()
// 獲取並移除此雙端隊列的最后一個元素,必要時將一直等待可用元素。
E takeLast()
// 返回以恰當順序(從第一個元素到最后一個元素)包含此雙端隊列所有元素的數組。
Object[] toArray()
// 返回以恰當順序包含此雙端隊列所有元素的數組;返回數組的運行時類型是指定數組的運行時類型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()

 

LinkedBlockingDeque源碼分析(JDK1.7.0_40版本)

LinkedBlockingDeque.java的完整源碼如下:

   1 /*
   2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
   3  *
   4  *
   5  *
   6  *
   7  *
   8  *
   9  *
  10  *
  11  *
  12  *
  13  *
  14  *
  15  *
  16  *
  17  *
  18  *
  19  *
  20  *
  21  *
  22  *
  23  */
  24 
  25 /*
  26  *
  27  *
  28  *
  29  *
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.AbstractQueue;
  39 import java.util.Collection;
  40 import java.util.Iterator;
  41 import java.util.NoSuchElementException;
  42 import java.util.concurrent.locks.Condition;
  43 import java.util.concurrent.locks.ReentrantLock;
  44 
  45 /**
  46  * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
  47  * linked nodes.
  48  *
  49  * <p> The optional capacity bound constructor argument serves as a
  50  * way to prevent excessive expansion. The capacity, if unspecified,
  51  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
  52  * dynamically created upon each insertion unless this would bring the
  53  * deque above capacity.
  54  *
  55  * <p>Most operations run in constant time (ignoring time spent
  56  * blocking).  Exceptions include {@link #remove(Object) remove},
  57  * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
  58  * #removeLastOccurrence removeLastOccurrence}, {@link #contains
  59  * contains}, {@link #iterator iterator.remove()}, and the bulk
  60  * operations, all of which run in linear time.
  61  *
  62  * <p>This class and its iterator implement all of the
  63  * <em>optional</em> methods of the {@link Collection} and {@link
  64  * Iterator} interfaces.
  65  *
  66  * <p>This class is a member of the
  67  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  68  * Java Collections Framework</a>.
  69  *
  70  * @since 1.6
  71  * @author  Doug Lea
  72  * @param <E> the type of elements held in this collection
  73  */
  74 public class LinkedBlockingDeque<E>
  75     extends AbstractQueue<E>
  76     implements BlockingDeque<E>,  java.io.Serializable {
  77 
  78     /*
  79      * Implemented as a simple doubly-linked list protected by a
  80      * single lock and using conditions to manage blocking.
  81      *
  82      * To implement weakly consistent iterators, it appears we need to
  83      * keep all Nodes GC-reachable from a predecessor dequeued Node.
  84      * That would cause two problems:
  85      * - allow a rogue Iterator to cause unbounded memory retention
  86      * - cause cross-generational linking of old Nodes to new Nodes if
  87      *   a Node was tenured while live, which generational GCs have a
  88      *   hard time dealing with, causing repeated major collections.
  89      * However, only non-deleted Nodes need to be reachable from
  90      * dequeued Nodes, and reachability does not necessarily have to
  91      * be of the kind understood by the GC.  We use the trick of
  92      * linking a Node that has just been dequeued to itself.  Such a
  93      * self-link implicitly means to jump to "first" (for next links)
  94      * or "last" (for prev links).
  95      */
  96 
  97     /*
  98      * We have "diamond" multiple interface/abstract class inheritance
  99      * here, and that introduces ambiguities. Often we want the
 100      * BlockingDeque javadoc combined with the AbstractQueue
 101      * implementation, so a lot of method specs are duplicated here.
 102      */
 103 
 104     private static final long serialVersionUID = -387911632671998426L;
 105 
 106     /** Doubly-linked list node class */
 107     static final class Node<E> {
 108         /**
 109          * The item, or null if this node has been removed.
 110          */
 111         E item;
 112 
 113         /**
 114          * One of:
 115          * - the real predecessor Node
 116          * - this Node, meaning the predecessor is tail
 117          * - null, meaning there is no predecessor
 118          */
 119         Node<E> prev;
 120 
 121         /**
 122          * One of:
 123          * - the real successor Node
 124          * - this Node, meaning the successor is head
 125          * - null, meaning there is no successor
 126          */
 127         Node<E> next;
 128 
 129         Node(E x) {
 130             item = x;
 131         }
 132     }
 133 
 134     /**
 135      * Pointer to first node.
 136      * Invariant: (first == null && last == null) ||
 137      *            (first.prev == null && first.item != null)
 138      */
 139     transient Node<E> first;
 140 
 141     /**
 142      * Pointer to last node.
 143      * Invariant: (first == null && last == null) ||
 144      *            (last.next == null && last.item != null)
 145      */
 146     transient Node<E> last;
 147 
 148     /** Number of items in the deque */
 149     private transient int count;
 150 
 151     /** Maximum number of items in the deque */
 152     private final int capacity;
 153 
 154     /** Main lock guarding all access */
 155     final ReentrantLock lock = new ReentrantLock();
 156 
 157     /** Condition for waiting takes */
 158     private final Condition notEmpty = lock.newCondition();
 159 
 160     /** Condition for waiting puts */
 161     private final Condition notFull = lock.newCondition();
 162 
 163     /**
 164      * Creates a {@code LinkedBlockingDeque} with a capacity of
 165      * {@link Integer#MAX_VALUE}.
 166      */
 167     public LinkedBlockingDeque() {
 168         this(Integer.MAX_VALUE);
 169     }
 170 
 171     /**
 172      * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
 173      *
 174      * @param capacity the capacity of this deque
 175      * @throws IllegalArgumentException if {@code capacity} is less than 1
 176      */
 177     public LinkedBlockingDeque(int capacity) {
 178         if (capacity <= 0) throw new IllegalArgumentException();
 179         this.capacity = capacity;
 180     }
 181 
 182     /**
 183      * Creates a {@code LinkedBlockingDeque} with a capacity of
 184      * {@link Integer#MAX_VALUE}, initially containing the elements of
 185      * the given collection, added in traversal order of the
 186      * collection's iterator.
 187      *
 188      * @param c the collection of elements to initially contain
 189      * @throws NullPointerException if the specified collection or any
 190      *         of its elements are null
 191      */
 192     public LinkedBlockingDeque(Collection<? extends E> c) {
 193         this(Integer.MAX_VALUE);
 194         final ReentrantLock lock = this.lock;
 195         lock.lock(); // Never contended, but necessary for visibility
 196         try {
 197             for (E e : c) {
 198                 if (e == null)
 199                     throw new NullPointerException();
 200                 if (!linkLast(new Node<E>(e)))
 201                     throw new IllegalStateException("Deque full");
 202             }
 203         } finally {
 204             lock.unlock();
 205         }
 206     }
 207 
 208 
 209     // Basic linking and unlinking operations, called only while holding lock
 210 
 211     /**
 212      * Links node as first element, or returns false if full.
 213      */
 214     private boolean linkFirst(Node<E> node) {
 215         // assert lock.isHeldByCurrentThread();
 216         if (count >= capacity)
 217             return false;
 218         Node<E> f = first;
 219         node.next = f;
 220         first = node;
 221         if (last == null)
 222             last = node;
 223         else
 224             f.prev = node;
 225         ++count;
 226         notEmpty.signal();
 227         return true;
 228     }
 229 
 230     /**
 231      * Links node as last element, or returns false if full.
 232      */
 233     private boolean linkLast(Node<E> node) {
 234         // assert lock.isHeldByCurrentThread();
 235         if (count >= capacity)
 236             return false;
 237         Node<E> l = last;
 238         node.prev = l;
 239         last = node;
 240         if (first == null)
 241             first = node;
 242         else
 243             l.next = node;
 244         ++count;
 245         notEmpty.signal();
 246         return true;
 247     }
 248 
 249     /**
 250      * Removes and returns first element, or null if empty.
 251      */
 252     private E unlinkFirst() {
 253         // assert lock.isHeldByCurrentThread();
 254         Node<E> f = first;
 255         if (f == null)
 256             return null;
 257         Node<E> n = f.next;
 258         E item = f.item;
 259         f.item = null;
 260         f.next = f; // help GC
 261         first = n;
 262         if (n == null)
 263             last = null;
 264         else
 265             n.prev = null;
 266         --count;
 267         notFull.signal();
 268         return item;
 269     }
 270 
 271     /**
 272      * Removes and returns last element, or null if empty.
 273      */
 274     private E unlinkLast() {
 275         // assert lock.isHeldByCurrentThread();
 276         Node<E> l = last;
 277         if (l == null)
 278             return null;
 279         Node<E> p = l.prev;
 280         E item = l.item;
 281         l.item = null;
 282         l.prev = l; // help GC
 283         last = p;
 284         if (p == null)
 285             first = null;
 286         else
 287             p.next = null;
 288         --count;
 289         notFull.signal();
 290         return item;
 291     }
 292 
 293     /**
 294      * Unlinks x.
 295      */
 296     void unlink(Node<E> x) {
 297         // assert lock.isHeldByCurrentThread();
 298         Node<E> p = x.prev;
 299         Node<E> n = x.next;
 300         if (p == null) {
 301             unlinkFirst();
 302         } else if (n == null) {
 303             unlinkLast();
 304         } else {
 305             p.next = n;
 306             n.prev = p;
 307             x.item = null;
 308             // Don't mess with x's links.  They may still be in use by
 309             // an iterator.
 310             --count;
 311             notFull.signal();
 312         }
 313     }
 314 
 315     // BlockingDeque methods
 316 
 317     /**
 318      * @throws IllegalStateException {@inheritDoc}
 319      * @throws NullPointerException  {@inheritDoc}
 320      */
 321     public void addFirst(E e) {
 322         if (!offerFirst(e))
 323             throw new IllegalStateException("Deque full");
 324     }
 325 
 326     /**
 327      * @throws IllegalStateException {@inheritDoc}
 328      * @throws NullPointerException  {@inheritDoc}
 329      */
 330     public void addLast(E e) {
 331         if (!offerLast(e))
 332             throw new IllegalStateException("Deque full");
 333     }
 334 
 335     /**
 336      * @throws NullPointerException {@inheritDoc}
 337      */
 338     public boolean offerFirst(E e) {
 339         if (e == null) throw new NullPointerException();
 340         Node<E> node = new Node<E>(e);
 341         final ReentrantLock lock = this.lock;
 342         lock.lock();
 343         try {
 344             return linkFirst(node);
 345         } finally {
 346             lock.unlock();
 347         }
 348     }
 349 
 350     /**
 351      * @throws NullPointerException {@inheritDoc}
 352      */
 353     public boolean offerLast(E e) {
 354         if (e == null) throw new NullPointerException();
 355         Node<E> node = new Node<E>(e);
 356         final ReentrantLock lock = this.lock;
 357         lock.lock();
 358         try {
 359             return linkLast(node);
 360         } finally {
 361             lock.unlock();
 362         }
 363     }
 364 
 365     /**
 366      * @throws NullPointerException {@inheritDoc}
 367      * @throws InterruptedException {@inheritDoc}
 368      */
 369     public void putFirst(E e) throws InterruptedException {
 370         if (e == null) throw new NullPointerException();
 371         Node<E> node = new Node<E>(e);
 372         final ReentrantLock lock = this.lock;
 373         lock.lock();
 374         try {
 375             while (!linkFirst(node))
 376                 notFull.await();
 377         } finally {
 378             lock.unlock();
 379         }
 380     }
 381 
 382     /**
 383      * @throws NullPointerException {@inheritDoc}
 384      * @throws InterruptedException {@inheritDoc}
 385      */
 386     public void putLast(E e) throws InterruptedException {
 387         if (e == null) throw new NullPointerException();
 388         Node<E> node = new Node<E>(e);
 389         final ReentrantLock lock = this.lock;
 390         lock.lock();
 391         try {
 392             while (!linkLast(node))
 393                 notFull.await();
 394         } finally {
 395             lock.unlock();
 396         }
 397     }
 398 
 399     /**
 400      * @throws NullPointerException {@inheritDoc}
 401      * @throws InterruptedException {@inheritDoc}
 402      */
 403     public boolean offerFirst(E e, long timeout, TimeUnit unit)
 404         throws InterruptedException {
 405         if (e == null) throw new NullPointerException();
 406         Node<E> node = new Node<E>(e);
 407         long nanos = unit.toNanos(timeout);
 408         final ReentrantLock lock = this.lock;
 409         lock.lockInterruptibly();
 410         try {
 411             while (!linkFirst(node)) {
 412                 if (nanos <= 0)
 413                     return false;
 414                 nanos = notFull.awaitNanos(nanos);
 415             }
 416             return true;
 417         } finally {
 418             lock.unlock();
 419         }
 420     }
 421 
 422     /**
 423      * @throws NullPointerException {@inheritDoc}
 424      * @throws InterruptedException {@inheritDoc}
 425      */
 426     public boolean offerLast(E e, long timeout, TimeUnit unit)
 427         throws InterruptedException {
 428         if (e == null) throw new NullPointerException();
 429         Node<E> node = new Node<E>(e);
 430         long nanos = unit.toNanos(timeout);
 431         final ReentrantLock lock = this.lock;
 432         lock.lockInterruptibly();
 433         try {
 434             while (!linkLast(node)) {
 435                 if (nanos <= 0)
 436                     return false;
 437                 nanos = notFull.awaitNanos(nanos);
 438             }
 439             return true;
 440         } finally {
 441             lock.unlock();
 442         }
 443     }
 444 
 445     /**
 446      * @throws NoSuchElementException {@inheritDoc}
 447      */
 448     public E removeFirst() {
 449         E x = pollFirst();
 450         if (x == null) throw new NoSuchElementException();
 451         return x;
 452     }
 453 
 454     /**
 455      * @throws NoSuchElementException {@inheritDoc}
 456      */
 457     public E removeLast() {
 458         E x = pollLast();
 459         if (x == null) throw new NoSuchElementException();
 460         return x;
 461     }
 462 
 463     public E pollFirst() {
 464         final ReentrantLock lock = this.lock;
 465         lock.lock();
 466         try {
 467             return unlinkFirst();
 468         } finally {
 469             lock.unlock();
 470         }
 471     }
 472 
 473     public E pollLast() {
 474         final ReentrantLock lock = this.lock;
 475         lock.lock();
 476         try {
 477             return unlinkLast();
 478         } finally {
 479             lock.unlock();
 480         }
 481     }
 482 
 483     public E takeFirst() throws InterruptedException {
 484         final ReentrantLock lock = this.lock;
 485         lock.lock();
 486         try {
 487             E x;
 488             while ( (x = unlinkFirst()) == null)
 489                 notEmpty.await();
 490             return x;
 491         } finally {
 492             lock.unlock();
 493         }
 494     }
 495 
 496     public E takeLast() throws InterruptedException {
 497         final ReentrantLock lock = this.lock;
 498         lock.lock();
 499         try {
 500             E x;
 501             while ( (x = unlinkLast()) == null)
 502                 notEmpty.await();
 503             return x;
 504         } finally {
 505             lock.unlock();
 506         }
 507     }
 508 
 509     public E pollFirst(long timeout, TimeUnit unit)
 510         throws InterruptedException {
 511         long nanos = unit.toNanos(timeout);
 512         final ReentrantLock lock = this.lock;
 513         lock.lockInterruptibly();
 514         try {
 515             E x;
 516             while ( (x = unlinkFirst()) == null) {
 517                 if (nanos <= 0)
 518                     return null;
 519                 nanos = notEmpty.awaitNanos(nanos);
 520             }
 521             return x;
 522         } finally {
 523             lock.unlock();
 524         }
 525     }
 526 
 527     public E pollLast(long timeout, TimeUnit unit)
 528         throws InterruptedException {
 529         long nanos = unit.toNanos(timeout);
 530         final ReentrantLock lock = this.lock;
 531         lock.lockInterruptibly();
 532         try {
 533             E x;
 534             while ( (x = unlinkLast()) == null) {
 535                 if (nanos <= 0)
 536                     return null;
 537                 nanos = notEmpty.awaitNanos(nanos);
 538             }
 539             return x;
 540         } finally {
 541             lock.unlock();
 542         }
 543     }
 544 
 545     /**
 546      * @throws NoSuchElementException {@inheritDoc}
 547      */
 548     public E getFirst() {
 549         E x = peekFirst();
 550         if (x == null) throw new NoSuchElementException();
 551         return x;
 552     }
 553 
 554     /**
 555      * @throws NoSuchElementException {@inheritDoc}
 556      */
 557     public E getLast() {
 558         E x = peekLast();
 559         if (x == null) throw new NoSuchElementException();
 560         return x;
 561     }
 562 
 563     public E peekFirst() {
 564         final ReentrantLock lock = this.lock;
 565         lock.lock();
 566         try {
 567             return (first == null) ? null : first.item;
 568         } finally {
 569             lock.unlock();
 570         }
 571     }
 572 
 573     public E peekLast() {
 574         final ReentrantLock lock = this.lock;
 575         lock.lock();
 576         try {
 577             return (last == null) ? null : last.item;
 578         } finally {
 579             lock.unlock();
 580         }
 581     }
 582 
 583     public boolean removeFirstOccurrence(Object o) {
 584         if (o == null) return false;
 585         final ReentrantLock lock = this.lock;
 586         lock.lock();
 587         try {
 588             for (Node<E> p = first; p != null; p = p.next) {
 589                 if (o.equals(p.item)) {
 590                     unlink(p);
 591                     return true;
 592                 }
 593             }
 594             return false;
 595         } finally {
 596             lock.unlock();
 597         }
 598     }
 599 
 600     public boolean removeLastOccurrence(Object o) {
 601         if (o == null) return false;
 602         final ReentrantLock lock = this.lock;
 603         lock.lock();
 604         try {
 605             for (Node<E> p = last; p != null; p = p.prev) {
 606                 if (o.equals(p.item)) {
 607                     unlink(p);
 608                     return true;
 609                 }
 610             }
 611             return false;
 612         } finally {
 613             lock.unlock();
 614         }
 615     }
 616 
 617     // BlockingQueue methods
 618 
 619     /**
 620      * Inserts the specified element at the end of this deque unless it would
 621      * violate capacity restrictions.  When using a capacity-restricted deque,
 622      * it is generally preferable to use method {@link #offer(Object) offer}.
 623      *
 624      * <p>This method is equivalent to {@link #addLast}.
 625      *
 626      * @throws IllegalStateException if the element cannot be added at this
 627      *         time due to capacity restrictions
 628      * @throws NullPointerException if the specified element is null
 629      */
 630     public boolean add(E e) {
 631         addLast(e);
 632         return true;
 633     }
 634 
 635     /**
 636      * @throws NullPointerException if the specified element is null
 637      */
 638     public boolean offer(E e) {
 639         return offerLast(e);
 640     }
 641 
 642     /**
 643      * @throws NullPointerException {@inheritDoc}
 644      * @throws InterruptedException {@inheritDoc}
 645      */
 646     public void put(E e) throws InterruptedException {
 647         putLast(e);
 648     }
 649 
 650     /**
 651      * @throws NullPointerException {@inheritDoc}
 652      * @throws InterruptedException {@inheritDoc}
 653      */
 654     public boolean offer(E e, long timeout, TimeUnit unit)
 655         throws InterruptedException {
 656         return offerLast(e, timeout, unit);
 657     }
 658 
 659     /**
 660      * Retrieves and removes the head of the queue represented by this deque.
 661      * This method differs from {@link #poll poll} only in that it throws an
 662      * exception if this deque is empty.
 663      *
 664      * <p>This method is equivalent to {@link #removeFirst() removeFirst}.
 665      *
 666      * @return the head of the queue represented by this deque
 667      * @throws NoSuchElementException if this deque is empty
 668      */
 669     public E remove() {
 670         return removeFirst();
 671     }
 672 
 673     public E poll() {
 674         return pollFirst();
 675     }
 676 
 677     public E take() throws InterruptedException {
 678         return takeFirst();
 679     }
 680 
 681     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 682         return pollFirst(timeout, unit);
 683     }
 684 
 685     /**
 686      * Retrieves, but does not remove, the head of the queue represented by
 687      * this deque.  This method differs from {@link #peek peek} only in that
 688      * it throws an exception if this deque is empty.
 689      *
 690      * <p>This method is equivalent to {@link #getFirst() getFirst}.
 691      *
 692      * @return the head of the queue represented by this deque
 693      * @throws NoSuchElementException if this deque is empty
 694      */
 695     public E element() {
 696         return getFirst();
 697     }
 698 
 699     public E peek() {
 700         return peekFirst();
 701     }
 702 
 703     /**
 704      * Returns the number of additional elements that this deque can ideally
 705      * (in the absence of memory or resource constraints) accept without
 706      * blocking. This is always equal to the initial capacity of this deque
 707      * less the current {@code size} of this deque.
 708      *
 709      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
 710      * an element will succeed by inspecting {@code remainingCapacity}
 711      * because it may be the case that another thread is about to
 712      * insert or remove an element.
 713      */
 714     public int remainingCapacity() {
 715         final ReentrantLock lock = this.lock;
 716         lock.lock();
 717         try {
 718             return capacity - count;
 719         } finally {
 720             lock.unlock();
 721         }
 722     }
 723 
 724     /**
 725      * @throws UnsupportedOperationException {@inheritDoc}
 726      * @throws ClassCastException            {@inheritDoc}
 727      * @throws NullPointerException          {@inheritDoc}
 728      * @throws IllegalArgumentException      {@inheritDoc}
 729      */
 730     public int drainTo(Collection<? super E> c) {
 731         return drainTo(c, Integer.MAX_VALUE);
 732     }
 733 
 734     /**
 735      * @throws UnsupportedOperationException {@inheritDoc}
 736      * @throws ClassCastException            {@inheritDoc}
 737      * @throws NullPointerException          {@inheritDoc}
 738      * @throws IllegalArgumentException      {@inheritDoc}
 739      */
 740     public int drainTo(Collection<? super E> c, int maxElements) {
 741         if (c == null)
 742             throw new NullPointerException();
 743         if (c == this)
 744             throw new IllegalArgumentException();
 745         final ReentrantLock lock = this.lock;
 746         lock.lock();
 747         try {
 748             int n = Math.min(maxElements, count);
 749             for (int i = 0; i < n; i++) {
 750                 c.add(first.item);   // In this order, in case add() throws.
 751                 unlinkFirst();
 752             }
 753             return n;
 754         } finally {
 755             lock.unlock();
 756         }
 757     }
 758 
 759     // Stack methods
 760 
 761     /**
 762      * @throws IllegalStateException {@inheritDoc}
 763      * @throws NullPointerException  {@inheritDoc}
 764      */
 765     public void push(E e) {
 766         addFirst(e);
 767     }
 768 
 769     /**
 770      * @throws NoSuchElementException {@inheritDoc}
 771      */
 772     public E pop() {
 773         return removeFirst();
 774     }
 775 
 776     // Collection methods
 777 
 778     /**
 779      * Removes the first occurrence of the specified element from this deque.
 780      * If the deque does not contain the element, it is unchanged.
 781      * More formally, removes the first element {@code e} such that
 782      * {@code o.equals(e)} (if such an element exists).
 783      * Returns {@code true} if this deque contained the specified element
 784      * (or equivalently, if this deque changed as a result of the call).
 785      *
 786      * <p>This method is equivalent to
 787      * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}.
 788      *
 789      * @param o element to be removed from this deque, if present
 790      * @return {@code true} if this deque changed as a result of the call
 791      */
 792     public boolean remove(Object o) {
 793         return removeFirstOccurrence(o);
 794     }
 795 
 796     /**
 797      * Returns the number of elements in this deque.
 798      *
 799      * @return the number of elements in this deque
 800      */
 801     public int size() {
 802         final ReentrantLock lock = this.lock;
 803         lock.lock();
 804         try {
 805             return count;
 806         } finally {
 807             lock.unlock();
 808         }
 809     }
 810 
 811     /**
 812      * Returns {@code true} if this deque contains the specified element.
 813      * More formally, returns {@code true} if and only if this deque contains
 814      * at least one element {@code e} such that {@code o.equals(e)}.
 815      *
 816      * @param o object to be checked for containment in this deque
 817      * @return {@code true} if this deque contains the specified element
 818      */
 819     public boolean contains(Object o) {
 820         if (o == null) return false;
 821         final ReentrantLock lock = this.lock;
 822         lock.lock();
 823         try {
 824             for (Node<E> p = first; p != null; p = p.next)
 825                 if (o.equals(p.item))
 826                     return true;
 827             return false;
 828         } finally {
 829             lock.unlock();
 830         }
 831     }
 832 
 833     /*
 834      * TODO: Add support for more efficient bulk operations.
 835      *
 836      * We don't want to acquire the lock for every iteration, but we
 837      * also want other threads a chance to interact with the
 838      * collection, especially when count is close to capacity.
 839      */
 840 
 841 //     /**
 842 //      * Adds all of the elements in the specified collection to this
 843 //      * queue.  Attempts to addAll of a queue to itself result in
 844 //      * {@code IllegalArgumentException}. Further, the behavior of
 845 //      * this operation is undefined if the specified collection is
 846 //      * modified while the operation is in progress.
 847 //      *
 848 //      * @param c collection containing elements to be added to this queue
 849 //      * @return {@code true} if this queue changed as a result of the call
 850 //      * @throws ClassCastException            {@inheritDoc}
 851 //      * @throws NullPointerException          {@inheritDoc}
 852 //      * @throws IllegalArgumentException      {@inheritDoc}
 853 //      * @throws IllegalStateException         {@inheritDoc}
 854 //      * @see #add(Object)
 855 //      */
 856 //     public boolean addAll(Collection<? extends E> c) {
 857 //         if (c == null)
 858 //             throw new NullPointerException();
 859 //         if (c == this)
 860 //             throw new IllegalArgumentException();
 861 //         final ReentrantLock lock = this.lock;
 862 //         lock.lock();
 863 //         try {
 864 //             boolean modified = false;
 865 //             for (E e : c)
 866 //                 if (linkLast(e))
 867 //                     modified = true;
 868 //             return modified;
 869 //         } finally {
 870 //             lock.unlock();
 871 //         }
 872 //     }
 873 
 874     /**
 875      * Returns an array containing all of the elements in this deque, in
 876      * proper sequence (from first to last element).
 877      *
 878      * <p>The returned array will be "safe" in that no references to it are
 879      * maintained by this deque.  (In other words, this method must allocate
 880      * a new array).  The caller is thus free to modify the returned array.
 881      *
 882      * <p>This method acts as bridge between array-based and collection-based
 883      * APIs.
 884      *
 885      * @return an array containing all of the elements in this deque
 886      */
 887     @SuppressWarnings("unchecked")
 888     public Object[] toArray() {
 889         final ReentrantLock lock = this.lock;
 890         lock.lock();
 891         try {
 892             Object[] a = new Object[count];
 893             int k = 0;
 894             for (Node<E> p = first; p != null; p = p.next)
 895                 a[k++] = p.item;
 896             return a;
 897         } finally {
 898             lock.unlock();
 899         }
 900     }
 901 
 902     /**
 903      * Returns an array containing all of the elements in this deque, in
 904      * proper sequence; the runtime type of the returned array is that of
 905      * the specified array.  If the deque fits in the specified array, it
 906      * is returned therein.  Otherwise, a new array is allocated with the
 907      * runtime type of the specified array and the size of this deque.
 908      *
 909      * <p>If this deque fits in the specified array with room to spare
 910      * (i.e., the array has more elements than this deque), the element in
 911      * the array immediately following the end of the deque is set to
 912      * {@code null}.
 913      *
 914      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 915      * array-based and collection-based APIs.  Further, this method allows
 916      * precise control over the runtime type of the output array, and may,
 917      * under certain circumstances, be used to save allocation costs.
 918      *
 919      * <p>Suppose {@code x} is a deque known to contain only strings.
 920      * The following code can be used to dump the deque into a newly
 921      * allocated array of {@code String}:
 922      *
 923      * <pre>
 924      *     String[] y = x.toArray(new String[0]);</pre>
 925      *
 926      * Note that {@code toArray(new Object[0])} is identical in function to
 927      * {@code toArray()}.
 928      *
 929      * @param a the array into which the elements of the deque are to
 930      *          be stored, if it is big enough; otherwise, a new array of the
 931      *          same runtime type is allocated for this purpose
 932      * @return an array containing all of the elements in this deque
 933      * @throws ArrayStoreException if the runtime type of the specified array
 934      *         is not a supertype of the runtime type of every element in
 935      *         this deque
 936      * @throws NullPointerException if the specified array is null
 937      */
 938     @SuppressWarnings("unchecked")
 939     public <T> T[] toArray(T[] a) {
 940         final ReentrantLock lock = this.lock;
 941         lock.lock();
 942         try {
 943             if (a.length < count)
 944                 a = (T[])java.lang.reflect.Array.newInstance
 945                     (a.getClass().getComponentType(), count);
 946 
 947             int k = 0;
 948             for (Node<E> p = first; p != null; p = p.next)
 949                 a[k++] = (T)p.item;
 950             if (a.length > k)
 951                 a[k] = null;
 952             return a;
 953         } finally {
 954             lock.unlock();
 955         }
 956     }
 957 
 958     public String toString() {
 959         final ReentrantLock lock = this.lock;
 960         lock.lock();
 961         try {
 962             Node<E> p = first;
 963             if (p == null)
 964                 return "[]";
 965 
 966             StringBuilder sb = new StringBuilder();
 967             sb.append('[');
 968             for (;;) {
 969                 E e = p.item;
 970                 sb.append(e == this ? "(this Collection)" : e);
 971                 p = p.next;
 972                 if (p == null)
 973                     return sb.append(']').toString();
 974                 sb.append(',').append(' ');
 975             }
 976         } finally {
 977             lock.unlock();
 978         }
 979     }
 980 
 981     /**
 982      * Atomically removes all of the elements from this deque.
 983      * The deque will be empty after this call returns.
 984      */
 985     public void clear() {
 986         final ReentrantLock lock = this.lock;
 987         lock.lock();
 988         try {
 989             for (Node<E> f = first; f != null; ) {
 990                 f.item = null;
 991                 Node<E> n = f.next;
 992                 f.prev = null;
 993                 f.next = null;
 994                 f = n;
 995             }
 996             first = last = null;
 997             count = 0;
 998             notFull.signalAll();
 999         } finally {
1000             lock.unlock();
1001         }
1002     }
1003 
1004     /**
1005      * Returns an iterator over the elements in this deque in proper sequence.
1006      * The elements will be returned in order from first (head) to last (tail).
1007      *
1008      * <p>The returned iterator is a "weakly consistent" iterator that
1009      * will never throw {@link java.util.ConcurrentModificationException
1010      * ConcurrentModificationException}, and guarantees to traverse
1011      * elements as they existed upon construction of the iterator, and
1012      * may (but is not guaranteed to) reflect any modifications
1013      * subsequent to construction.
1014      *
1015      * @return an iterator over the elements in this deque in proper sequence
1016      */
1017     public Iterator<E> iterator() {
1018         return new Itr();
1019     }
1020 
1021     /**
1022      * Returns an iterator over the elements in this deque in reverse
1023      * sequential order.  The elements will be returned in order from
1024      * last (tail) to first (head).
1025      *
1026      * <p>The returned iterator is a "weakly consistent" iterator that
1027      * will never throw {@link java.util.ConcurrentModificationException
1028      * ConcurrentModificationException}, and guarantees to traverse
1029      * elements as they existed upon construction of the iterator, and
1030      * may (but is not guaranteed to) reflect any modifications
1031      * subsequent to construction.
1032      *
1033      * @return an iterator over the elements in this deque in reverse order
1034      */
1035     public Iterator<E> descendingIterator() {
1036         return new DescendingItr();
1037     }
1038 
1039     /**
1040      * Base class for Iterators for LinkedBlockingDeque
1041      */
1042     private abstract class AbstractItr implements Iterator<E> {
1043         /**
1044          * The next node to return in next()
1045          */
1046          Node<E> next;
1047 
1048         /**
1049          * nextItem holds on to item fields because once we claim that
1050          * an element exists in hasNext(), we must return item read
1051          * under lock (in advance()) even if it was in the process of
1052          * being removed when hasNext() was called.
1053          */
1054         E nextItem;
1055 
1056         /**
1057          * Node returned by most recent call to next. Needed by remove.
1058          * Reset to null if this element is deleted by a call to remove.
1059          */
1060         private Node<E> lastRet;
1061 
1062         abstract Node<E> firstNode();
1063         abstract Node<E> nextNode(Node<E> n);
1064 
1065         AbstractItr() {
1066             // set to initial position
1067             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1068             lock.lock();
1069             try {
1070                 next = firstNode();
1071                 nextItem = (next == null) ? null : next.item;
1072             } finally {
1073                 lock.unlock();
1074             }
1075         }
1076 
1077         /**
1078          * Returns the successor node of the given non-null, but
1079          * possibly previously deleted, node.
1080          */
1081         private Node<E> succ(Node<E> n) {
1082             // Chains of deleted nodes ending in null or self-links
1083             // are possible if multiple interior nodes are removed.
1084             for (;;) {
1085                 Node<E> s = nextNode(n);
1086                 if (s == null)
1087                     return null;
1088                 else if (s.item != null)
1089                     return s;
1090                 else if (s == n)
1091                     return firstNode();
1092                 else
1093                     n = s;
1094             }
1095         }
1096 
1097         /**
1098          * Advances next.
1099          */
1100         void advance() {
1101             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1102             lock.lock();
1103             try {
1104                 // assert next != null;
1105                 next = succ(next);
1106                 nextItem = (next == null) ? null : next.item;
1107             } finally {
1108                 lock.unlock();
1109             }
1110         }
1111 
1112         public boolean hasNext() {
1113             return next != null;
1114         }
1115 
1116         public E next() {
1117             if (next == null)
1118                 throw new NoSuchElementException();
1119             lastRet = next;
1120             E x = nextItem;
1121             advance();
1122             return x;
1123         }
1124 
1125         public void remove() {
1126             Node<E> n = lastRet;
1127             if (n == null)
1128                 throw new IllegalStateException();
1129             lastRet = null;
1130             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1131             lock.lock();
1132             try {
1133                 if (n.item != null)
1134                     unlink(n);
1135             } finally {
1136                 lock.unlock();
1137             }
1138         }
1139     }
1140 
1141     /** Forward iterator */
1142     private class Itr extends AbstractItr {
1143         Node<E> firstNode() { return first; }
1144         Node<E> nextNode(Node<E> n) { return n.next; }
1145     }
1146 
1147     /** Descending iterator */
1148     private class DescendingItr extends AbstractItr {
1149         Node<E> firstNode() { return last; }
1150         Node<E> nextNode(Node<E> n) { return n.prev; }
1151     }
1152 
1153     /**
1154      * Save the state of this deque to a stream (that is, serialize it).
1155      *
1156      * @serialData The capacity (int), followed by elements (each an
1157      * {@code Object}) in the proper order, followed by a null
1158      * @param s the stream
1159      */
1160     private void writeObject(java.io.ObjectOutputStream s)
1161         throws java.io.IOException {
1162         final ReentrantLock lock = this.lock;
1163         lock.lock();
1164         try {
1165             // Write out capacity and any hidden stuff
1166             s.defaultWriteObject();
1167             // Write out all elements in the proper order.
1168             for (Node<E> p = first; p != null; p = p.next)
1169                 s.writeObject(p.item);
1170             // Use trailing null as sentinel
1171             s.writeObject(null);
1172         } finally {
1173             lock.unlock();
1174         }
1175     }
1176 
1177     /**
1178      * Reconstitute this deque from a stream (that is,
1179      * deserialize it).
1180      * @param s the stream
1181      */
1182     private void readObject(java.io.ObjectInputStream s)
1183         throws java.io.IOException, ClassNotFoundException {
1184         s.defaultReadObject();
1185         count = 0;
1186         first = null;
1187         last = null;
1188         // Read in all elements and place in queue
1189         for (;;) {
1190             @SuppressWarnings("unchecked")
1191             E item = (E)s.readObject();
1192             if (item == null)
1193                 break;
1194             add(item);
1195         }
1196     }
1197 
1198 }
View Code

 

下面從ArrayBlockingQueue的創建,添加,取出,遍歷這幾個方面對LinkedBlockingDeque進行分析

1. 創建

下面以LinkedBlockingDeque(int capacity)來進行說明。

public LinkedBlockingDeque(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
}

說明:capacity是“鏈式阻塞隊列”的容量。


LinkedBlockingDeque中相關的數據結果定義如下:

// “雙向隊列”的表頭
transient Node<E> first;
// “雙向隊列”的表尾
transient Node<E> last;
// 節點數量
private transient int count;
// 容量
private final int capacity;
// 互斥鎖 , 互斥鎖對應的“非空條件notEmpty”, 互斥鎖對應的“未滿條件notFull”
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

說明:lock是互斥鎖,用於控制多線程對LinkedBlockingDeque中元素的互斥訪問;而notEmpty和notFull是與lock綁定的條件,它們用於實現對多線程更精確的控制。

雙向鏈表的節點Node的定義如下:

static final class Node<E> {
    E item;       // 數據
    Node<E> prev; // 前一節點
    Node<E> next; // 后一節點

    Node(E x) { item = x; }
}

 

2. 添加

下面以offer(E e)為例,對LinkedBlockingDeque的添加方法進行說明。

public boolean offer(E e) {
    return offerLast(e);
}

offer()實際上是調用offerLast()將元素添加到隊列的末尾。

offerLast()的源碼如下:

public boolean offerLast(E e) {
    if (e == null) throw new NullPointerException();
    // 新建節點
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    // 獲取鎖
    lock.lock();
    try {
        // 將“新節點”添加到雙向鏈表的末尾
        return linkLast(node);
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}

說明:offerLast()的作用,是新建節點並將該節點插入到雙向鏈表的末尾。它在插入節點前,會獲取鎖;操作完畢,再釋放鎖。

linkLast()的源碼如下:

private boolean linkLast(Node<E> node) {
    // 如果“雙向鏈表的節點數量” > “容量”,則返回false,表示插入失敗。
    if (count >= capacity)
        return false;
    // 將“node添加到鏈表末尾”,並設置node為新的尾節點
    Node<E> l = last;
    node.prev = l;
    last = node;
    if (first == null)
        first = node;
    else
        l.next = node;
    // 將“節點數量”+1
    ++count;
    // 插入節點之后,喚醒notEmpty上的等待線程。
    notEmpty.signal();
    return true;
}

說明:linkLast()的作用,是將節點插入到雙向隊列的末尾;插入節點之后,喚醒notEmpty上的等待線程。


3. 刪除

下面以take()為例,對LinkedBlockingDeque的取出方法進行說明。

public E take() throws InterruptedException {
    return takeFirst();
}

take()實際上是調用takeFirst()隊列的第一個元素。

takeFirst()的源碼如下:

public E takeFirst() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 獲取鎖
    lock.lock();
    try {
        E x;
        // 若“隊列為空”,則一直等待。否則,通過unlinkFirst()刪除第一個節點。
        while ( (x = unlinkFirst()) == null)
            notEmpty.await();
        return x;
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}

說明:takeFirst()的作用,是刪除雙向鏈表的第一個節點,並返回節點對應的值。它在插入節點前,會獲取鎖;操作完畢,再釋放鎖。

unlinkFirst()的源碼如下:

private E unlinkFirst() {
    // assert lock.isHeldByCurrentThread();
    Node<E> f = first;
    if (f == null)
        return null;
    // 刪除並更新“第一個節點”
    Node<E> n = f.next;
    E item = f.item;
    f.item = null;
    f.next = f; // help GC
    first = n;
    if (n == null)
        last = null;
    else
        n.prev = null;
    // 將“節點數量”-1
    --count;
    // 刪除節點之后,喚醒notFull上的等待線程。
    notFull.signal();
    return item;
}

說明:unlinkFirst()的作用,是將雙向隊列的第一個節點刪除;刪除節點之后,喚醒notFull上的等待線程。

 

4. 遍歷

下面對LinkedBlockingDeque的遍歷方法進行說明。

public Iterator<E> iterator() {
    return new Itr();
}

iterator()實際上是返回一個Iter對象。

Itr類的定義如下:

private class Itr extends AbstractItr {
    // “雙向隊列”的表頭
    Node<E> firstNode() { return first; }
    // 獲取“節點n的下一個節點”
    Node<E> nextNode(Node<E> n) { return n.next; }
}

Itr繼承於AbstractItr,而AbstractItr的定義如下:

 

private abstract class AbstractItr implements Iterator<E> {
    // next是下一次調用next()會返回的節點。
    Node<E> next;
    // nextItem是next()返回節點對應的數據。
    E nextItem;
    // 上一次next()返回的節點。
    private Node<E> lastRet;
    // 返回第一個節點
    abstract Node<E> firstNode();
    // 返回下一個節點
    abstract Node<E> nextNode(Node<E> n);

    AbstractItr() {
        final ReentrantLock lock = LinkedBlockingDeque.this.lock;
        // 獲取“LinkedBlockingDeque的互斥鎖”
        lock.lock();
        try {
            // 獲取“雙向隊列”的表頭
            next = firstNode();
            // 獲取表頭對應的數據
            nextItem = (next == null) ? null : next.item;
        } finally {
            // 釋放“LinkedBlockingDeque的互斥鎖”
            lock.unlock();
        }
    }

    // 獲取n的后繼節點
    private Node<E> succ(Node<E> n) {
        // Chains of deleted nodes ending in null or self-links
        // are possible if multiple interior nodes are removed.
        for (;;) {
            Node<E> s = nextNode(n);
            if (s == null)
                return null;
            else if (s.item != null)
                return s;
            else if (s == n)
                return firstNode();
            else
                n = s;
        }
    }

    // 更新next和nextItem。
    void advance() {
        final ReentrantLock lock = LinkedBlockingDeque.this.lock;
        lock.lock();
        try {
            // assert next != null;
            next = succ(next);
            nextItem = (next == null) ? null : next.item;
        } finally {
            lock.unlock();
        }
    }

    // 返回“下一個節點是否為null”
    public boolean hasNext() {
        return next != null;
    }

    // 返回下一個節點
    public E next() {
        if (next == null)
            throw new NoSuchElementException();
        lastRet = next;
        E x = nextItem;
        advance();
        return x;
    }

    // 刪除下一個節點
    public void remove() {
        Node<E> n = lastRet;
        if (n == null)
            throw new IllegalStateException();
        lastRet = null;
        final ReentrantLock lock = LinkedBlockingDeque.this.lock;
        lock.lock();
        try {
            if (n.item != null)
                unlink(n);
        } finally {
            lock.unlock();
        }
    }
}

 

LinkedBlockingDeque示例

 1 import java.util.*;
 2 import java.util.concurrent.*;
 3 
 4 /*
 5  *   LinkedBlockingDeque是“線程安全”的隊列,而LinkedList是非線程安全的。
 6  *
 7  *   下面是“多個線程同時操作並且遍歷queue”的示例
 8  *   (01) 當queue是LinkedBlockingDeque對象時,程序能正常運行。
 9  *   (02) 當queue是LinkedList對象時,程序會產生ConcurrentModificationException異常。
10  *
11  * @author skywang
12  */
13 public class LinkedBlockingDequeDemo1 {
14 
15     // TODO: queue是LinkedList對象時,程序會出錯。
16     //private static Queue<String> queue = new LinkedList<String>();
17     private static Queue<String> queue = new LinkedBlockingDeque<String>();
18     public static void main(String[] args) {
19     
20         // 同時啟動兩個線程對queue進行操作!
21         new MyThread("ta").start();
22         new MyThread("tb").start();
23     }
24 
25     private static void printAll() {
26         String value;
27         Iterator iter = queue.iterator();
28         while(iter.hasNext()) {
29             value = (String)iter.next();
30             System.out.print(value+", ");
31         }
32         System.out.println();
33     }
34 
35     private static class MyThread extends Thread {
36         MyThread(String name) {
37             super(name);
38         }
39         @Override
40         public void run() {
41                 int i = 0;
42             while (i++ < 6) {
43                 // “線程名” + "-" + "序號"
44                 String val = Thread.currentThread().getName()+i;
45                 queue.add(val);
46                 // 通過“Iterator”遍歷queue。
47                 printAll();
48             }
49         }
50     }
51 }

(某一次)運行結果

ta1, ta1, tb1, tb1,

ta1, ta1, tb1, tb1, tb2, tb2, ta2, 
ta2, 
ta1, ta1, tb1, tb1, tb2, tb2, ta2, ta2, tb3, tb3, ta3, 
ta3, ta1, 
tb1, ta1, tb2, tb1, ta2, tb2, tb3, ta2, ta3, tb3, tb4, ta3, ta4, 
tb4, ta1, ta4, tb1, tb5, 
tb2, ta1, ta2, tb1, tb3, tb2, ta3, ta2, tb4, tb3, ta4, ta3, tb5, tb4, ta5, 
ta4, ta1, tb5, tb1, ta5, tb2, tb6, 
ta2, ta1, tb3, tb1, ta3, tb2, tb4, ta2, ta4, tb3, tb5, ta3, ta5, tb4, tb6, ta4, ta6, 
tb5, ta5, tb6, ta6,

結果說明示例程序中,啟動兩個線程(線程ta和線程tb)分別對LinkedBlockingDeque進行操作。以線程ta而言,它會先獲取“線程名”+“序號”,然后將該字符串添加到LinkedBlockingDeque中;接着,遍歷並輸出LinkedBlockingDeque中的全部元素。 線程tb的操作和線程ta一樣,只不過線程tb的名字和線程ta的名字不同。
當queue是LinkedBlockingDeque對象時,程序能正常運行。如果將queue改為LinkedList時,程序會產生ConcurrentModificationException異常。

 


更多內容

1. Java多線程系列--“JUC集合”01之 框架

2. Java多線程系列目錄(共xx篇)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM