Java並發包--LinkedBlockingDeque


轉載請注明出處:http://www.cnblogs.com/skywang12345/p/3503480.html

 

LinkedBlockingDeque介紹

LinkedBlockingDeque是雙向鏈表實現的雙向並發阻塞隊列。該阻塞隊列同時支持FIFO和FILO兩種操作方式,即可以從隊列的頭和尾同時操作(插入/刪除);並且,該阻塞隊列是支持線程安全。

此外,LinkedBlockingDeque還是可選容量的(防止過度膨脹),即可以指定隊列的容量。如果不指定,默認容量大小等於Integer.MAX_VALUE。

 

LinkedBlockingDeque原理和數據結構

LinkedBlockingDeque的數據結構,如下圖所示:

說明
1. LinkedBlockingDeque繼承於AbstractQueue,它本質上是一個支持FIFO和FILO的雙向的隊列。
2. LinkedBlockingDeque實現了BlockingDeque接口,它支持多線程並發。當多線程競爭同一個資源時,某線程獲取到該資源之后,其它線程需要阻塞等待。
3. LinkedBlockingDeque是通過雙向鏈表實現的。
3.1 first是雙向鏈表的表頭。
3.2 last是雙向鏈表的表尾。
3.3 count是LinkedBlockingDeque的實際大小,即雙向鏈表中當前節點個數。
3.4 capacity是LinkedBlockingDeque的容量,它是在創建LinkedBlockingDeque時指定的。
3.5 lock是控制對LinkedBlockingDeque的互斥鎖,當多個線程競爭同時訪問LinkedBlockingDeque時,某線程獲取到了互斥鎖lock,其它線程則需要阻塞等待,直到該線程釋放lock,其它線程才有機會獲取lock從而獲取cpu執行權。
3.6 notEmpty和notFull分別是“非空條件”和“未滿條件”。通過它們能夠更加細膩進行並發控制。

     -- 若某線程(線程A)要取出數據時,隊列正好為空,則該線程會執行notEmpty.await()進行等待;當其它某個線程(線程B)向隊列中插入了數據之后,會調用notEmpty.signal()喚醒“notEmpty上的等待線程”。此時,線程A會被喚醒從而得以繼續運行。 此外,線程A在執行取操作前,會獲取takeLock,在取操作執行完畢再釋放takeLock。
     -- 若某線程(線程H)要插入數據時,隊列已滿,則該線程會它執行notFull.await()進行等待;當其它某個線程(線程I)取出數據之后,會調用notFull.signal()喚醒“notFull上的等待線程”。此時,線程H就會被喚醒從而得以繼續運行。 此外,線程H在執行插入操作前,會獲取putLock,在插入操作執行完畢才釋放putLock。

關於ReentrantLock 和 Condition等更多的內容,可以參考:
    (01) Java多線程系列--“JUC鎖”02之 互斥鎖ReentrantLock
    (02) Java多線程系列--“JUC鎖”03之 公平鎖(一)
    (03) Java多線程系列--“JUC鎖”04之 公平鎖(二)
    (04) Java多線程系列--“JUC鎖”05之 非公平鎖
    (05) Java多線程系列--“JUC鎖”06之 Condition條件

 

LinkedBlockingDeque函數列表

復制代碼
// 創建一個容量為 Integer.MAX_VALUE 的 LinkedBlockingDeque。
LinkedBlockingDeque()
// 創建一個容量為 Integer.MAX_VALUE 的 LinkedBlockingDeque,最初包含給定 collection 的元素,以該 collection 迭代器的遍歷順序添加。
LinkedBlockingDeque(Collection<? extends E> c)
// 創建一個具有給定(固定)容量的 LinkedBlockingDeque。
LinkedBlockingDeque(int capacity)

// 在不違反容量限制的情況下,將指定的元素插入此雙端隊列的末尾。
boolean add(E e)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列的開頭;如果當前沒有空間可用,則拋出 IllegalStateException。
void addFirst(E e)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列的末尾;如果當前沒有空間可用,則拋出 IllegalStateException。
void addLast(E e)
// 以原子方式 (atomically) 從此雙端隊列移除所有元素。
void clear()
// 如果此雙端隊列包含指定的元素,則返回 true。
boolean contains(Object o)
// 返回在此雙端隊列的元素上以逆向連續順序進行迭代的迭代器。
Iterator<E> descendingIterator()
// 移除此隊列中所有可用的元素,並將它們添加到給定 collection 中。
int drainTo(Collection<? super E> c)
// 最多從此隊列中移除給定數量的可用元素,並將這些元素添加到給定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 獲取但不移除此雙端隊列表示的隊列的頭部。
E element()
// 獲取,但不移除此雙端隊列的第一個元素。
E getFirst()
// 獲取,但不移除此雙端隊列的最后一個元素。
E getLast()
// 返回在此雙端隊列元素上以恰當順序進行迭代的迭代器。
Iterator<E> iterator()
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列表示的隊列中(即此雙端隊列的尾部),並在成功時返回 true;如果當前沒有空間可用,則返回 false。
boolean offer(E e)
// 將指定的元素插入此雙端隊列表示的隊列中(即此雙端隊列的尾部),必要時將在指定的等待時間內一直等待可用空間。
boolean offer(E e, long timeout, TimeUnit unit)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列的開頭,並在成功時返回 true;如果當前沒有空間可用,則返回 false。
boolean offerFirst(E e)
// 將指定的元素插入此雙端隊列的開頭,必要時將在指定的等待時間內等待可用空間。
boolean offerFirst(E e, long timeout, TimeUnit unit)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列的末尾,並在成功時返回 true;如果當前沒有空間可用,則返回 false。
boolean offerLast(E e)
// 將指定的元素插入此雙端隊列的末尾,必要時將在指定的等待時間內等待可用空間。
boolean offerLast(E e, long timeout, TimeUnit unit)
// 獲取但不移除此雙端隊列表示的隊列的頭部(即此雙端隊列的第一個元素);如果此雙端隊列為空,則返回 null。
E peek()
// 獲取,但不移除此雙端隊列的第一個元素;如果此雙端隊列為空,則返回 null。
E peekFirst()
// 獲取,但不移除此雙端隊列的最后一個元素;如果此雙端隊列為空,則返回 null。
E peekLast()
// 獲取並移除此雙端隊列表示的隊列的頭部(即此雙端隊列的第一個元素);如果此雙端隊列為空,則返回 null。
E poll()
// 獲取並移除此雙端隊列表示的隊列的頭部(即此雙端隊列的第一個元素),如有必要將在指定的等待時間內等待可用元素。
E poll(long timeout, TimeUnit unit)
// 獲取並移除此雙端隊列的第一個元素;如果此雙端隊列為空,則返回 null。
E pollFirst()
// 獲取並移除此雙端隊列的第一個元素,必要時將在指定的等待時間等待可用元素。
E pollFirst(long timeout, TimeUnit unit)
// 獲取並移除此雙端隊列的最后一個元素;如果此雙端隊列為空,則返回 null。
E pollLast()
// 獲取並移除此雙端隊列的最后一個元素,必要時將在指定的等待時間內等待可用元素。
E pollLast(long timeout, TimeUnit unit)
// 從此雙端隊列所表示的堆棧中彈出一個元素。
E pop()
// 將元素推入此雙端隊列表示的棧。
void push(E e)
// 將指定的元素插入此雙端隊列表示的隊列中(即此雙端隊列的尾部),必要時將一直等待可用空間。
void put(E e)
// 將指定的元素插入此雙端隊列的開頭,必要時將一直等待可用空間。
void putFirst(E e)
// 將指定的元素插入此雙端隊列的末尾,必要時將一直等待可用空間。
void putLast(E e)
// 返回理想情況下(沒有內存和資源約束)此雙端隊列可不受阻塞地接受的額外元素數。
int remainingCapacity()
// 獲取並移除此雙端隊列表示的隊列的頭部。
E remove()
// 從此雙端隊列移除第一次出現的指定元素。
boolean remove(Object o)
// 獲取並移除此雙端隊列第一個元素。
E removeFirst()
// 從此雙端隊列移除第一次出現的指定元素。
boolean removeFirstOccurrence(Object o)
// 獲取並移除此雙端隊列的最后一個元素。
E removeLast()
// 從此雙端隊列移除最后一次出現的指定元素。
boolean removeLastOccurrence(Object o)
// 返回此雙端隊列中的元素數。
int size()
// 獲取並移除此雙端隊列表示的隊列的頭部(即此雙端隊列的第一個元素),必要時將一直等待可用元素。
E take()
// 獲取並移除此雙端隊列的第一個元素,必要時將一直等待可用元素。
E takeFirst()
// 獲取並移除此雙端隊列的最后一個元素,必要時將一直等待可用元素。
E takeLast()
// 返回以恰當順序(從第一個元素到最后一個元素)包含此雙端隊列所有元素的數組。
Object[] toArray()
// 返回以恰當順序包含此雙端隊列所有元素的數組;返回數組的運行時類型是指定數組的運行時類型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()
復制代碼

 

LinkedBlockingDeque源碼分析(JDK1.7.0_40版本)

LinkedBlockingDeque.java的完整源碼如下:

  View Code

 

下面從ArrayBlockingQueue的創建,添加,取出,遍歷這幾個方面對LinkedBlockingDeque進行分析

1. 創建

下面以LinkedBlockingDeque(int capacity)來進行說明。

public LinkedBlockingDeque(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
}

說明:capacity是“鏈式阻塞隊列”的容量。


LinkedBlockingDeque中相關的數據結果定義如下:

復制代碼
// “雙向隊列”的表頭
transient Node<E> first;
// “雙向隊列”的表尾
transient Node<E> last;
// 節點數量
private transient int count;
// 容量
private final int capacity;
// 互斥鎖 , 互斥鎖對應的“非空條件notEmpty”, 互斥鎖對應的“未滿條件notFull”
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
復制代碼

說明:lock是互斥鎖,用於控制多線程對LinkedBlockingDeque中元素的互斥訪問;而notEmpty和notFull是與lock綁定的條件,它們用於實現對多線程更精確的控制。

雙向鏈表的節點Node的定義如下:

復制代碼
static final class Node<E> {
    E item;       // 數據
    Node<E> prev; // 前一節點
    Node<E> next; // 后一節點

    Node(E x) { item = x; }
}
復制代碼

 

2. 添加

下面以offer(E e)為例,對LinkedBlockingDeque的添加方法進行說明。

public boolean offer(E e) {
    return offerLast(e);
}

offer()實際上是調用offerLast()將元素添加到隊列的末尾。

offerLast()的源碼如下:

復制代碼
public boolean offerLast(E e) {
    if (e == null) throw new NullPointerException();
    // 新建節點
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    // 獲取鎖
    lock.lock();
    try {
        // 將“新節點”添加到雙向鏈表的末尾
        return linkLast(node);
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}
復制代碼

說明:offerLast()的作用,是新建節點並將該節點插入到雙向鏈表的末尾。它在插入節點前,會獲取鎖;操作完畢,再釋放鎖。

linkLast()的源碼如下:

復制代碼
private boolean linkLast(Node<E> node) {
    // 如果“雙向鏈表的節點數量” > “容量”,則返回false,表示插入失敗。
    if (count >= capacity)
        return false;
    // 將“node添加到鏈表末尾”,並設置node為新的尾節點
    Node<E> l = last;
    node.prev = l;
    last = node;
    if (first == null)
        first = node;
    else
        l.next = node;
    // 將“節點數量”+1
    ++count;
    // 插入節點之后,喚醒notEmpty上的等待線程。
    notEmpty.signal();
    return true;
}
復制代碼

說明:linkLast()的作用,是將節點插入到雙向隊列的末尾;插入節點之后,喚醒notEmpty上的等待線程。


3. 刪除

下面以take()為例,對LinkedBlockingDeque的取出方法進行說明。

public E take() throws InterruptedException {
    return takeFirst();
}

take()實際上是調用takeFirst()隊列的第一個元素。

takeFirst()的源碼如下:

復制代碼
public E takeFirst() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 獲取鎖
    lock.lock();
    try {
        E x;
        // 若“隊列為空”,則一直等待。否則,通過unlinkFirst()刪除第一個節點。
        while ( (x = unlinkFirst()) == null)
            notEmpty.await();
        return x;
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}
復制代碼

說明:takeFirst()的作用,是刪除雙向鏈表的第一個節點,並返回節點對應的值。它在插入節點前,會獲取鎖;操作完畢,再釋放鎖。

unlinkFirst()的源碼如下:

復制代碼
private E unlinkFirst() {
    // assert lock.isHeldByCurrentThread();
    Node<E> f = first;
    if (f == null)
        return null;
    // 刪除並更新“第一個節點”
    Node<E> n = f.next;
    E item = f.item;
    f.item = null;
    f.next = f; // help GC
    first = n;
    if (n == null)
        last = null;
    else
        n.prev = null;
    // 將“節點數量”-1
    --count;
    // 刪除節點之后,喚醒notFull上的等待線程。
    notFull.signal();
    return item;
}
復制代碼

說明:unlinkFirst()的作用,是將雙向隊列的第一個節點刪除;刪除節點之后,喚醒notFull上的等待線程。

 

4. 遍歷

下面對LinkedBlockingDeque的遍歷方法進行說明。

public Iterator<E> iterator() {
    return new Itr();
}

iterator()實際上是返回一個Iter對象。

Itr類的定義如下:

private class Itr extends AbstractItr {
    // “雙向隊列”的表頭
    Node<E> firstNode() { return first; }
    // 獲取“節點n的下一個節點”
    Node<E> nextNode(Node<E> n) { return n.next; }
}

Itr繼承於AbstractItr,而AbstractItr的定義如下:

 

復制代碼
private abstract class AbstractItr implements Iterator<E> {
    // next是下一次調用next()會返回的節點。
    Node<E> next;
    // nextItem是next()返回節點對應的數據。
    E nextItem;
    // 上一次next()返回的節點。
    private Node<E> lastRet;
    // 返回第一個節點
    abstract Node<E> firstNode();
    // 返回下一個節點
    abstract Node<E> nextNode(Node<E> n);

    AbstractItr() {
        final ReentrantLock lock = LinkedBlockingDeque.this.lock;
        // 獲取“LinkedBlockingDeque的互斥鎖”
        lock.lock();
        try {
            // 獲取“雙向隊列”的表頭
            next = firstNode();
            // 獲取表頭對應的數據
            nextItem = (next == null) ? null : next.item;
        } finally {
            // 釋放“LinkedBlockingDeque的互斥鎖”
            lock.unlock();
        }
    }

    // 獲取n的后繼節點
    private Node<E> succ(Node<E> n) {
        // Chains of deleted nodes ending in null or self-links
        // are possible if multiple interior nodes are removed.
        for (;;) {
            Node<E> s = nextNode(n);
            if (s == null)
                return null;
            else if (s.item != null)
                return s;
            else if (s == n)
                return firstNode();
            else
                n = s;
        }
    }

    // 更新next和nextItem。
    void advance() {
        final ReentrantLock lock = LinkedBlockingDeque.this.lock;
        lock.lock();
        try {
            // assert next != null;
            next = succ(next);
            nextItem = (next == null) ? null : next.item;
        } finally {
            lock.unlock();
        }
    }

    // 返回“下一個節點是否為null”
    public boolean hasNext() {
        return next != null;
    }

    // 返回下一個節點
    public E next() {
        if (next == null)
            throw new NoSuchElementException();
        lastRet = next;
        E x = nextItem;
        advance();
        return x;
    }

    // 刪除下一個節點
    public void remove() {
        Node<E> n = lastRet;
        if (n == null)
            throw new IllegalStateException();
        lastRet = null;
        final ReentrantLock lock = LinkedBlockingDeque.this.lock;
        lock.lock();
        try {
            if (n.item != null)
                unlink(n);
        } finally {
            lock.unlock();
        }
    }
}
復制代碼

 

LinkedBlockingDeque示例

復制代碼
 1 import java.util.*;
 2 import java.util.concurrent.*;
 3 
 4 /*
 5  *   LinkedBlockingDeque是“線程安全”的隊列,而LinkedList是非線程安全的。
 6  *
 7  *   下面是“多個線程同時操作並且遍歷queue”的示例
 8  *   (01) 當queue是LinkedBlockingDeque對象時,程序能正常運行。
 9  *   (02) 當queue是LinkedList對象時,程序會產生ConcurrentModificationException異常。
10  *
11  * @author skywang
12  */
13 public class LinkedBlockingDequeDemo1 {
14 
15     // TODO: queue是LinkedList對象時,程序會出錯。
16     //private static Queue<String> queue = new LinkedList<String>();
17     private static Queue<String> queue = new LinkedBlockingDeque<String>();
18     public static void main(String[] args) {
19     
20         // 同時啟動兩個線程對queue進行操作!
21         new MyThread("ta").start();
22         new MyThread("tb").start();
23     }
24 
25     private static void printAll() {
26         String value;
27         Iterator iter = queue.iterator();
28         while(iter.hasNext()) {
29             value = (String)iter.next();
30             System.out.print(value+", ");
31         }
32         System.out.println();
33     }
34 
35     private static class MyThread extends Thread {
36         MyThread(String name) {
37             super(name);
38         }
39         @Override
40         public void run() {
41                 int i = 0;
42             while (i++ < 6) {
43                 // “線程名” + "-" + "序號"
44                 String val = Thread.currentThread().getName()+i;
45                 queue.add(val);
46                 // 通過“Iterator”遍歷queue。
47                 printAll();
48             }
49         }
50     }
51 }
復制代碼

(某一次)運行結果

復制代碼
ta1, ta1, tb1, tb1,

ta1, ta1, tb1, tb1, tb2, tb2, ta2, 
ta2, 
ta1, ta1, tb1, tb1, tb2, tb2, ta2, ta2, tb3, tb3, ta3, 
ta3, ta1, 
tb1, ta1, tb2, tb1, ta2, tb2, tb3, ta2, ta3, tb3, tb4, ta3, ta4, 
tb4, ta1, ta4, tb1, tb5, 
tb2, ta1, ta2, tb1, tb3, tb2, ta3, ta2, tb4, tb3, ta4, ta3, tb5, tb4, ta5, 
ta4, ta1, tb5, tb1, ta5, tb2, tb6, 
ta2, ta1, tb3, tb1, ta3, tb2, tb4, ta2, ta4, tb3, tb5, ta3, ta5, tb4, tb6, ta4, ta6, 
tb5, ta5, tb6, ta6,
復制代碼

結果說明:示例程序中,啟動兩個線程(線程ta和線程tb)分別對LinkedBlockingDeque進行操作。以線程ta而言,它會先獲取“線程名”+“序號”,然后將該字符串添加到LinkedBlockingDeque中;接着,遍歷並輸出LinkedBlockingDeque中的全部元素。 線程tb的操作和線程ta一樣,只不過線程tb的名字和線程ta的名字不同。
當queue是LinkedBlockingDeque對象時,程序能正常運行。如果將queue改為LinkedList時,程序會產生ConcurrentModificationException異常。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM