阻塞隊列之六:LinkedBlockingDeque


一、LinkedBlockingDeque簡介

  java6增加了兩種容器類型,Deque和BlockingDeque,它們分別對Queue和BlockingQueue進行了擴展。
  Deque是一個雙端隊列,deque(雙端隊列) 是 "Double Ended Queue" 的縮寫。因此,雙端隊列是一個你可以從任意一端插入或者抽取元素的隊列。實現了在隊列頭和隊列尾的高效插入和移除。
  BlockingDeque 類是一個雙端隊列,在不能夠插入元素時,它將阻塞住試圖插入元素的線程;在不能夠抽取元素時,它將阻塞住試圖抽取的線程。
  正如阻塞隊列使用與生產者-消費者模式,雙端隊列同樣適用於另一種相關模式,即工作密取。在生產者-消費者設計中,所有消費者有一個共享的工作隊列,而在工作密取設計中,每個消費者都有各自的雙端隊列。如果一個消費者完成了自己雙端隊列中的全部工作,那么它可以從其它消費者雙端隊列末尾秘密地獲取工作。密取工作模式比傳統的生產者-消費者模式具有更高的可伸縮性,這是因為工作者線程不會在單個共享的任務隊列上發生競爭。在大多數時候,它們都只是訪問自己的雙端隊列,從而極大地減少了競爭。當工作者線程需要訪問另一個隊列時,它會從隊列的尾部而不是頭部獲取工作,因此進一步降低了隊列上的競爭程度。


LinkedBlockingDeque是雙向鏈表實現的雙向並發阻塞隊列。該阻塞隊列同時支持FIFO和FILO兩種操作方式,即可以從隊列的頭和尾同時操作(插入/刪除);並且,該阻塞隊列是支持線程安全。
此外,LinkedBlockingDeque還是可選容量的(防止過度膨脹),即可以指定隊列的容量。如果不指定,默認容量大小等於Integer.MAX_VALUE。

 

BlockingDeque 的使用

在線程既是一個隊列的生產者又是這個隊列的消費者的時候可以使用到 BlockingDeque。如果生產者線程需要在隊列的兩端都可以插入數據,消費者線程需要在隊列的兩端都可以移除數據,這個時候也可以使用 BlockingDeque。BlockingDeque 圖解:

一個 BlockingDeque - 線程在雙端隊列的兩端都可以插入和提取元素。
一個線程生產元素,並把它們插入到隊列的任意一端。如果雙端隊列已滿,插入線程將被阻塞,直到一個移除線程從該隊列中移出了一個元素。如果雙端隊列為空,移除線程將被阻塞,直到一個插入線程向該隊列插入了一個新元素。

BlockingDeque 的方法

BlockingDeque 具有 4 組不同的方法用於插入、移除以及對雙端隊列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:

  拋異常 特定值 阻塞 超時
插入 addFirst(o) offerFirst(o) putFirst(o) offerFirst(o, timeout, timeunit)
移除 removeFirst(o) pollFirst(o) takeFirst(o) pollFirst(timeout, timeunit)
檢查 getFirst(o) peekFirst(o)    

 

  拋異常 特定值 阻塞 超時
插入 addLast(o) offerLast(o) putLast(o) offerLast(o, timeout, timeunit)
移除 removeLast(o) pollLast(o) takeLast(o) pollLast(timeout, timeunit)
檢查 getLast(o) peekLast(o)    


四組不同的行為方式解釋:

  1. 拋異常:如果試圖的操作無法立即執行,拋一個異常。
  2. 特定值:如果試圖的操作無法立即執行,返回一個特定的值(常常是 true / false)。
  3. 阻塞:如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行。
  4. 超時:如果試圖的操作無法立即執行,該方法調用將會發生阻塞,直到能夠執行,但等待時間不會超過給定值。返回一個特定值以告知該操作是否成功(典型的是 true / false)。

BlockingDeque 繼承自 BlockingQueue

BlockingDeque 接口繼承自 BlockingQueue 接口。這就意味着你可以像使用一個 BlockingQueue 那樣使用 BlockingDeque。如果你這么干的話,各種插入方法將會把新元素添加到雙端隊列的尾端,而移除方法將會把雙端隊列的首端的元素移除。正如 BlockingQueue 接口的插入和移除方法一樣。
以下是 BlockingDeque 對 BlockingQueue 接口的方法的具體內部實現:

 

 

BlockingQueue BlockingDeque
add() addLast()
offer() x 2 offerLast() x 2
put() putLast()
   
remove() removeFirst()
poll() x 2 pollFirst()
take() takeFirst()
   
element() getFirst()
peek() peekFirst()

 

二、LinkedBlockingDeque源碼分析

2.1、LinkedBlockingDeque的lock

LinkedBlockingDeque的原理就是使用一個可重入鎖和這個鎖生成的兩個條件對象進行並發控制(classic two-condition algorithm)。LinkedBlockingDeque是一個帶有長度的阻塞隊列,初始化的時候可以指定隊列長度(如果不指定就是Integer.MAX_VALUE),且指定長度之后不允許進行修改。

    /** Main lock guarding all access */
    final ReentrantLock lock = new ReentrantLock();

    /** Condition for waiting takes */
    private final Condition notEmpty = lock.newCondition();

    /** Condition for waiting puts */
    private final Condition notFull = lock.newCondition();

2.2、數據結構

雙向鏈表

/** 雙向鏈表節點 */  
static final class Node<E> {  
    /** 
     * 元素值 
     */  
    E item;  
  
    /** 
     * 節點前驅 
     * 1.指向前驅;2.指向this,說明前驅是尾節點,看unlinklast;3.指向null說明沒有前驅 
     */  
    Node<E> prev;  
  
    /** 
     * 節點后繼 
     * 1.指向后繼;2.指向this,說明后繼是頭結點,看unlinkfirst;3.指向null說明沒有后繼 
     */  
    Node<E> next;  
  
    Node(E x) {  
        item = x;  
    }  
}  

 

2.3、成員變量

    //first是雙向鏈表的表頭
    transient Node<E> first;

    //last是雙向鏈表的表尾
    transient Node<E> last;

    //count是LinkedBlockingDeque的實際大小,即雙向鏈表中當前節點個數
    private transient int count;

    //是LinkedBlockingDeque的容量,它是在創建LinkedBlockingDeque時指定的
    private final int capacity;

 

2.4、構造函數

    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }
    
    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }

    public LinkedBlockingDeque(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock lock = this.lock;
        lock.lock(); // Never contended, but necessary for visibility
        try {
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (!linkLast(new Node<E>(e)))
                    throw new IllegalStateException("Deque full");
            }
        } finally {
            lock.unlock();
        }
    }

 

2.5、入隊

addFirst,addLast分別調用offerFirst,offerLast,而offerFirst,offerLast和putFirst,putLast都是調用了linkFirst和linkLast。

    public void addFirst(E e) {
        if (!offerFirst(e))
            throw new IllegalStateException("Deque full");
    }
    
    public void addLast(E e) {
        if (!offerLast(e))
            throw new IllegalStateException("Deque full");
    }
    
    public boolean offerFirst(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return linkFirst(node);
        } finally {
            lock.unlock();
        }
    }
    
    public boolean offerLast(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return linkLast(node);
        } finally {
            lock.unlock();
        }
    }
    
    public void putFirst(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkFirst(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }
    
    public void putLast(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkLast(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }

linkFirst和linkLast

/** 
 * 設置node為鏈表頭節點,鏈表滿時為false 
 */  
private boolean linkFirst(Node<E> node) {  
    // assert lock.isHeldByCurrentThread();  
    if (count >= capacity) //超過容量false  
        return false;  
    Node<E> f = first;  
    node.next = f; //新節點的next指向原first  
    first = node; //設置node為新的first  
    if (last == null) //沒有尾節點,就將node設置成尾節點  
        last = node;  
    else  
        f.prev = node; //有尾節點,那就將之前first的pre指向新增node  
    ++count; //累加節點數量  
    notEmpty.signal(); //有新節點入隊,通知非空條件隊列  
    return true;  
}  
  
/** 
 * 設置node為鏈表尾節點,鏈表滿時為false 
 */  
private boolean linkLast(Node<E> node) {  
    // assert lock.isHeldByCurrentThread();  
    if (count >= capacity)  
        return false;  
    Node<E> l = last;  
    node.prev = l;  
    last = node;  
    if (first == null) //為null,說明之前隊列空吧,那就first也指向node  
        first = node;  
    else  
        l.next = node; //非null,說明之前的last有值,就將之前的last的next指向node  
    ++count;  
    notEmpty.signal();  
    return true;  
}  

2.6、出隊

    public E removeFirst() {
        E x = pollFirst();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    
    public E removeLast() {
        E x = pollLast();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    
    public E pollFirst() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkFirst();
        } finally {
            lock.unlock();
        }
    }
    
    public E pollLast() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkLast();
        } finally {
            lock.unlock();
        }
    }
    
    public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
    
    public E takeLast() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkLast()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
    

核心方法

/** 
 * 移除頭結點,鏈表空返回null 
 */  
private E unlinkFirst() {  
    // assert lock.isHeldByCurrentThread();  
    Node<E> f = first;  
    if (f == null)  
        return null; //空返回null  
    Node<E> n = f.next;  
    E item = f.item;  
    f.item = null;  
    f.next = f; // help GC  
    first = n;  
    if (n == null) //說明之前應該只有一個節點,移除頭結點后,鏈表空,現在first和last都指向null了  
        last = null;  
    else  
        n.prev = null; //否則的話,n的pre原來指向之前的first,現在n變為first了,pre指向null  
    --count;  
    notFull.signal(); //通知非滿條件隊列  
    return item;  
}  
  
/** 
 * 移除尾結點,鏈表空返回null 
 */  
private E unlinkLast() {  
    // assert lock.isHeldByCurrentThread();  
    Node<E> l = last;  
    if (l == null)  
        return null;  
    Node<E> p = l.prev;  
    E item = l.item;  
    l.item = null;  
    l.prev = l; // help GC  
    last = p;  
    if (p == null)  
        first = null;  
    else  
        p.next = null;  
    --count;  
    notFull.signal();  
    return item;  
}  
  
/** 
 * 移除指定節點:p--》x--》n 
 */  
void unlink(Node<E> x) {  
    // assert lock.isHeldByCurrentThread();  
    Node<E> p = x.prev;  
    Node<E> n = x.next;   
    if (p == null) { //prev為null說明x節點為頭結點  
        unlinkFirst();  
    } else if (n == null) {  
        unlinkLast(); //nex為null說明待清除節點為尾節點  
    } else { //否則的話節點處於鏈表中間  
        p.next = n; //將p和n互鏈  
        n.prev = p;  
        x.item = null;  
        // 沒有斷開x節點鏈接,可能有其他線程在迭代鏈表  
        --count;  
        notFull.signal();  
    }  
} 

2.7、peek方法

    public E peek() {
        return peekFirst();
    }
    
    public E peekFirst() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (first == null) ? null : first.item;
        } finally {
            lock.unlock();
        }
    }
    //peekLast就不貼了

2.8、size方法

    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }

 

三、JDK或開源框架中使用

四、使用示例


java.util.ArrayDeque 類提供了可調整大小的陣列,並實現了Deque接口。以下是關於陣列雙端隊列的要點:
Java.util.ArrayDeque

  • 數組雙端隊列沒有容量限制,使他們增長為必要支持使用。

  • 它們不是線程安全的;如果沒有外部同步。

  • 不支持多線程並發訪問。

  • null元素被禁止使用在數組deques。

  • 它們要比堆棧Stack和LinkedList快。

此類及其迭代器實現Collection和Iteratorinterfaces方法可選。

類的聲明

以下是java.util.ArrayDeque類的聲明:

public class ArrayDeque<E> extends AbstractCollection<E> implements Deque<E>, Cloneable, Serializable

這里<E>代表一個元素,它可以是任何類。例如,如果你正在構建一個整數數組列表,那么初始化可為

ArrayList<Integer> list = new ArrayList<Integer>();

 

 

S.N. 方法 & 描述
1 boolean add(E e) 
此方法將添加指定的元素,在此deque隊列的末尾。
2 void addFirst(E e) 
此方法將添加指定的元素,在此deque隊列的前面。
3 void addLast(E e) 
此方法將插入指定的元素,在此deque隊列的末尾。
4 void clear() 
此方法移除此deque隊列的元素。
5 ArrayDeque<E> clone() 
此方法返回此deque隊列的副本。
6 boolean contains(Object o) 
如果此deque 隊列包含指定的元素,此方法返回true。
7 Iterator<E> descendingIterator() 
此方法返回一個迭代器在此deque隊列以逆向順序的元素。
8 E element() 
此方法檢索,但是不移除此deque隊列表示的隊列的頭部。
9 E getFirst()
此方法檢索,但是不移除此deque隊列的第一個元素。
10 E getLast() 
此方法檢索,但是不移除此deque隊列的最后一個元素。
11 boolean isEmpty() 
如果此deque隊列不包含元素,此方法返回true。
12 Iterator<E> iterator() 
此方法返回一個迭代器在此deque隊列的元素。
13 boolean offer(E e)
此方法將指定的元素,在此deque隊列的末尾。
14 boolean offerFirst(E e) 
此方法將指定的元素,在此deque隊列的前面。
15 boolean offerLast(E e) 
此方法將指定的元素,在此deque隊列的末尾。
16 E peek() 
此方法檢索,但是不移除此deque隊列表示的隊列的頭部,如果此deque隊列為空,則返回null。
17 E peekFirst() 
此方法檢索,但是不移除此deque 隊列的第一個元素,或者如果此deque 隊列為空,則返回null。
18 E peekLast() 
此方法檢索,但是不移除此deque隊列的最后一個元素,如果此deque隊列為空,則返回null。
19 E poll() 
此方法檢索並移除此deque隊列表示的隊列的頭部,如果此deque隊列為空,則返回null。
20 E pollFirst() 
此方法檢索並移除此deque隊列的第一個元素,或者如果此deque隊列為空,則返回null。
21 E pollLast() 
此方法檢索並移除此deque隊列的最后一個元素,如果此deque隊列為空,則返回null。
22 E pop() 
這種方法的此deque隊列所表示的堆棧彈出一個元素。
23 void push(E e) 
這種方法將元素推入此deque隊列所表示的堆棧。
24 E remove() 
此方法檢索並移除此deque隊列表示的隊列的頭部。
25 boolean remove(Object o) 
此方法從此deque隊列中移除指定元素的單個實例。
26 E removeFirst() 
此方法檢索並移除此deque隊列的第一個元素。
27 boolean removeFirstOccurrence(Object o) 
此方法移除此deque隊列的指定元素的第一個匹配。
28 E removeLast() 
此方法檢索並移除此deque隊列的最后一個元素。
29 boolean removeLastOccurrence(Object o) 
此方法移除此deque隊列的指定元素的最后一次出現。
30 int size() 
此方法返回在此deque隊列的元素個數。
31 object[] toArray() 
這個方法返回一個包含所有在此deque隊列在適當的序列中元素的數組。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM