Java並發包--ConcurrentLinkedQueue


轉載請注明出處:http://www.cnblogs.com/skywang12345/p/3498995.html

 

ConcurrentLinkedQueue介紹

ConcurrentLinkedQueue是線程安全的隊列,它適用於“高並發”的場景。
它是一個基於鏈接節點的無界線程安全隊列,按照 FIFO(先進先出)原則對元素進行排序。隊列元素中不可以放置null元素(內部實現的特殊節點除外)。

 

ConcurrentLinkedQueue原理和數據結構

ConcurrentLinkedQueue的數據結構,如下圖所示:

說明
1. ConcurrentLinkedQueue繼承於AbstractQueue。
2. ConcurrentLinkedQueue內部是通過鏈表來實現的。它同時包含鏈表的頭節點head和尾節點tail。ConcurrentLinkedQueue按照 FIFO(先進先出)原則對元素進行排序。元素都是從尾部插入到鏈表,從頭部開始返回。
3. ConcurrentLinkedQueue的鏈表Node中的next的類型是volatile,而且鏈表數據item的類型也是volatile。關於volatile,我們知道它的語義包含:“即對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫入”。ConcurrentLinkedQueue就是通過volatile來實現多線程對競爭資源的互斥訪問的。

 

ConcurrentLinkedQueue函數列表

復制代碼
// 創建一個最初為空的 ConcurrentLinkedQueue。
ConcurrentLinkedQueue()
// 創建一個最初包含給定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍歷順序來添加元素。
ConcurrentLinkedQueue(Collection<? extends E> c)

// 將指定元素插入此隊列的尾部。
boolean add(E e)
// 如果此隊列包含指定元素,則返回 true。
boolean contains(Object o)
// 如果此隊列不包含任何元素,則返回 true。
boolean isEmpty()
// 返回在此隊列元素上以恰當順序進行迭代的迭代器。
Iterator<E> iterator()
// 將指定元素插入此隊列的尾部。
boolean offer(E e)
// 獲取但不移除此隊列的頭;如果此隊列為空,則返回 null。
E peek()
// 獲取並移除此隊列的頭,如果此隊列為空,則返回 null。
E poll()
// 從隊列中移除指定元素的單個實例(如果存在)。
boolean remove(Object o)
// 返回此隊列中的元素數量。
int size()
// 返回以恰當順序包含此隊列所有元素的數組。
Object[] toArray()
// 返回以恰當順序包含此隊列所有元素的數組;返回數組的運行時類型是指定數組的運行時類型。
<T> T[] toArray(T[] a)
復制代碼

 

ConcurrentLinkedQueue源碼分析(JDK1.7.0_40版本)

ConcurrentLinkedQueue的完整源碼如下:

  View Code

 

下面從ConcurrentLinkedQueue的創建,添加,刪除這幾個方面對它進行分析。

1 創建

下面以ConcurrentLinkedQueue()來進行說明。

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

說明:在構造函數中,新建了一個“內容為null的節點”,並設置表頭head和表尾tail的值為新節點。

head和tail的定義如下:

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

head和tail都是volatile類型,他們具有volatile賦予的含義:“即對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫入”。


Node的聲明如下:

復制代碼
private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
復制代碼

說明
Node是個單向鏈表節點,next用於指向下一個Node,item用於存儲數據。Node中操作節點數據的API,都是通過Unsafe機制的CAS函數實現的;例如casNext()是通過CAS函數“比較並設置節點的下一個節點”。

 

2. 添加

下面以add(E e)為例對ConcurrentLinkedQueue中的添加進行說明。

public boolean add(E e) {
    return offer(e);
}

說明:add()實際上是調用的offer()來完成添加操作的。

offer()的源碼如下:

復制代碼
public boolean offer(E e) {
    // 檢查e是不是null,是的話拋出NullPointerException異常。
    checkNotNull(e);
    // 創建新的節點
    final Node<E> newNode = new Node<E>(e);

    // 將“新的節點”添加到鏈表的末尾。
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        // 情況1:q為空
        if (q == null) {
            // CAS操作:如果“p的下一個節點為null”(即p為尾節點),則設置p的下一個節點為newNode。
            // 如果該CAS操作成功的話,則比較“p和t”(若p不等於t,則設置newNode為新的尾節點),然后返回true。
            // 如果該CAS操作失敗,這意味着“其它線程對尾節點進行了修改”,則重新循環。
            if (p.casNext(null, newNode)) {
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
        }
        // 情況2:p和q相等
        else if (p == q)
            p = (t != (t = tail)) ? t : head;
        // 情況3:其它
        else
            p = (p != t && t != (t = tail)) ? t : q;
    }
}
復制代碼

說明:offer(E e)的作用就是將元素e添加到鏈表的末尾。offer()比較的地方是理解for循環,下面區分3種情況對for進行分析。

情況1 -- q為空。這意味着q是尾節點的下一個節點。此時,通過p.casNext(null, newNode)將“p的下一個節點設為newNode”,若設置成功的話,則比較“p和t”(若p不等於t,則設置newNode為新的尾節點),然后返回true。否則的話(意味着“其它線程對尾節點進行了修改”),什么也不做,繼續進行for循環。
p.casNext(null, newNode),是調用CAS對p進行操作。若“p的下一個節點等於null”,則設置“p的下一個節點等於newNode”;設置成功的話,返回true,失敗的話返回false。

情況2 -- p和q相等。這種情況什么時候會發生呢?通過“情況3”,我們知道,經過“情況3”的處理后,p的值可能等於q。
此時,若尾節點沒有發生變化的話,那么,應該是頭節點發生了變化,則設置p為頭節點,然后重新遍歷鏈表;否則(尾節點變化的話),則設置p為尾節點。

情況3 -- 其它。
我們將p = (p != t && t != (t = tail)) ? t : q;轉換成如下代碼。

復制代碼
if (p==t) {
    p = q;
} else {
    Node<E> tmp=t;
    t = tail;
    if (tmp==t) {
        p=q;
    } else {
        p=t;
    }
}
復制代碼

如果p和t相等,則設置p為q。否則的話,判斷“尾節點是否發生變化”,沒有變化的話,則設置p為q;否則,設置p為尾節點。


checkNotNull()的源碼如下:

private static void checkNotNull(Object v) {
    if (v == null)
        throw new NullPointerException();
}

 

3. 刪除

下面以poll()為例對ConcurrentLinkedQueue中的刪除進行說明。

復制代碼
public E poll() {
    // 設置“標記”
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            // 情況1
            // 表頭的數據不為null,並且“設置表頭的數據為null”這個操作成功的話;
            // 則比較“p和h”(若p!=h,即表頭發生了變化,則更新表頭,即設置表頭為p),然后返回原表頭的item值。
            if (item != null && p.casItem(item, null)) {
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            // 情況2
            // 表頭的下一個節點為null,即鏈表只有一個“內容為null的表頭節點”。則更新表頭為p,並返回null。
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            // 情況3
            // 這可能到由於“情況4”的發生導致p=q,在該情況下跳轉到restartFromHead標記重新操作。
            else if (p == q)
                continue restartFromHead;
            // 情況4
            // 設置p為q
            else
                p = q;
        }
    }
}
復制代碼

說明:poll()的作用就是刪除鏈表的表頭節點,並返回被刪節點對應的值。poll()的實現原理和offer()比較類似,下面根將or循環划分為4種情況進行分析。

情況1:“表頭節點的數據”不為null,並且“設置表頭節點的數據為null”這個操作成功。
p.casItem(item, null) -- 調用CAS函數,比較“節點p的數據值”與item是否相等,是的話,設置節點p的數據值為null。
在情況1發生時,先比較“p和h”,若p!=h,即表頭發生了變化,則調用updateHead()更新表頭;然后返回刪除節點的item值。
updateHead()的源碼如下:

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

說明:updateHead()的最終目的是更新表頭為p,並設置h的下一個節點為h本身。
casHead(h,p)是通過CAS函數設置表頭,若表頭等於h的話,則設置表頭為p。
lazySetNext()的源碼如下:

void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
}

putOrderedObject()函數,我們在前面一章“TODO”中介紹過。h.lazySetNext(h)的作用是通過CAS函數設置h的下一個節點為h自身,該設置可能會延遲執行。

情況2:如果表頭的下一個節點為null,即鏈表只有一個“內容為null的表頭節點”。
則調用updateHead(h, p),將表頭更新p;然后返回null。

情況3:p=q
在“情況4”的發生后,會導致p=q;此時,“情況3”就會發生。當“情況3”發生后,它會跳轉到restartFromHead標記重新操作。

情況4:其它情況。
設置p=q。

 

ConcurrentLinkedQueue示例

復制代碼
 1 import java.util.*;
 2 import java.util.concurrent.*;
 3 
 4 /*
 5  *   ConcurrentLinkedQueue是“線程安全”的隊列,而LinkedList是非線程安全的。
 6  *
 7  *   下面是“多個線程同時操作並且遍歷queue”的示例
 8  *   (01) 當queue是ConcurrentLinkedQueue對象時,程序能正常運行。
 9  *   (02) 當queue是LinkedList對象時,程序會產生ConcurrentModificationException異常。
10  *
11  * @author skywang
12  */
13 public class ConcurrentLinkedQueueDemo1 {
14 
15     // TODO: queue是LinkedList對象時,程序會出錯。
16     //private static Queue<String> queue = new LinkedList<String>();
17     private static Queue<String> queue = new ConcurrentLinkedQueue<String>();
18     public static void main(String[] args) {
19     
20         // 同時啟動兩個線程對queue進行操作!
21         new MyThread("ta").start();
22         new MyThread("tb").start();
23     }
24 
25     private static void printAll() {
26         String value;
27         Iterator iter = queue.iterator();
28         while(iter.hasNext()) {
29             value = (String)iter.next();
30             System.out.print(value+", ");
31         }
32         System.out.println();
33     }
34 
35     private static class MyThread extends Thread {
36         MyThread(String name) {
37             super(name);
38         }
39         @Override
40         public void run() {
41                 int i = 0;
42             while (i++ < 6) {
43                 // “線程名” + "-" + "序號"
44                 String val = Thread.currentThread().getName()+i;
45                 queue.add(val);
46                 // 通過“Iterator”遍歷queue。
47                 printAll();
48             }
49         }
50     }
51 }
復制代碼

(某一次)運行結果

復制代碼
ta1, ta1, tb1, tb1,

ta1, ta1, tb1, tb1, ta2, ta2, tb2, 
tb2, 
ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta3, tb3, 
ta3, ta1, tb3, tb1, ta4, 
ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta4, ta3, tb4, 
tb3, ta1, ta4, tb1, tb4, ta2, ta5, 
tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta5, tb3, tb5, 
ta4, ta1, tb4, tb1, ta5, ta2, tb5, tb2, ta6, 
ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta6, ta4, tb6, 
tb4, ta5, tb5, ta6, tb6, 
復制代碼

結果說明:如果將源碼中的queue改成LinkedList對象時,程序會產生ConcurrentModificationException異常。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM