Java並發容器--ConcurrentLinkedQueue


概述

  ConcurrentLinkedQueue是一種基於鏈表實現的無界非阻塞線程安全隊列,遵循先入先出規則。

  線程安全隊列有兩種實現方式:

    阻塞方式:對入隊和出隊操作加鎖。阻塞隊列。

    非阻塞方式:通過自旋CAS實現。例如:ConcurrentLinkedQueue

  下面從源代碼中分析ConcurrentLinkedQueue的實現方法。

類關系圖

      

    從類圖可以看出,ConcurrentLinkedQueue有head和tail兩個volatile域,節點是用靜態內部類Node表示,每個Node含有元素item和指向下一個節點的指針next,都是volatile變量。

源碼分析

  Node源碼

    Node的item和next兩個域都是volatile變量,保證可見性。casItem和casNext方法使用了UNSAFE提供的CAS方法保證操作的原子性。

 1         //Node代碼中使用了UNSAFE提供的CAS方法保證操作的原子性,
 2         //UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 
 3         //第一個參數表示要更新的對象,第二個參數nextOffset是Field的偏移量,第三個參數表示期望值,最后一個參數更新后的值。若next域的值等於cmp,則把next域更新為val並返回true;否則不更新並返回false。
 4         private static class Node<E> {
 5             volatile E item;    //Node值,volatile保證可見性
 6             volatile Node<E> next;    //Node的下一個元素,volatile保證可見性
 7 
 8             /**
 9              * Constructs a new node.  Uses relaxed write because item can
10              * only be seen after publication via casNext.
11              */
12             Node(E item) {
13                 UNSAFE.putObject(this, itemOffset, item);
14             }
15 
16             boolean casItem(E cmp, E val) {
17                 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
18             }
19 
20             void lazySetNext(Node<E> val) {
21                 UNSAFE.putOrderedObject(this, nextOffset, val);
22             }
23 
24             boolean casNext(Node<E> cmp, Node<E> val) {
25                 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
26             }
27 
28             // Unsafe mechanics
29 
30             private static final sun.misc.Unsafe UNSAFE;
31             private static final long itemOffset;
32             private static final long nextOffset;
33 
34             static {
35                 //初始化UNSAFE和各個域在類中的偏移量
36                 try {
37                     UNSAFE = sun.misc.Unsafe.getUnsafe();//初始化UNSAFE
38                     Class k = Node.class;
39                     //itemOffset是指類中item字段在Node類中的偏移量,先通過反射獲取類的item域,然后通過UNSAFE獲取item域在內存中相對於Node類首地址的偏移量。
40                     itemOffset = UNSAFE.objectFieldOffset
41                         (k.getDeclaredField("item"));
42                     //nextOffset是指類中next字段在Node類中的偏移量
43                     nextOffset = UNSAFE.objectFieldOffset
44                         (k.getDeclaredField("next"));
45                 } catch (Exception e) {
46                     throw new Error(e);
47                 }
48             }
49         }
View Code

    Node類中的lazySetNext(Node<E> val)方法,可以理解為延遲設置Next,內部是使用UNSAFE類的putOrderedObject方法實現,putOrderedXXX方法是putXXXVolatile方法的延遲實現,不保證值的改變被其他線程立即看到。為什么要lazySetNext這個方法呢?其實它是一種低級別的優化手段,就是在不需要讓共享變量的修改立刻讓其他線程可見的時候,以設置普通變量的方式來修改共享狀態,可以減少不必要的內存屏障,從而提高程序執行的效率。

    《Java內存模型中》提到volatile變量可以實現可見性,其原理就是插入內存屏障以保證不會重排序指令,使用的是store-load內存屏障,開銷較大。UNSAFE類的putOrderedXXX方法則是在指令中插入StoreStore內存屏障,避免發生寫操作重排序,由於StoreStore屏障的性能損耗小於StoreLoad屏障,所以lazySetNext方法比直接寫volatile變量的性能要高。需要注意的是,StoreStore屏障僅可以避免寫寫重排序,不保證內存可見性。

    在出隊操作中更新Queue的Head節點時用到了lazySetNext(Node<E> val)方法,將舊head節點的next指向自己。

  初始化

    創建一個空的Queue,head節點為null且tail節點等於head節點。

1             //創建一個空的Queue,head節點為null且tail節點等於head節點
2             public ConcurrentLinkedQueue() {
3                 head = tail = new Node<E>(null);
4         
5             }
View Code

 

  入隊

    入隊的方法為offer,向隊列的尾部插入指定的元素,由於ConcurrentLinkedQueue是無界的,所以offer永遠返回true,不能通過返回值來判斷是否入隊成功。

    入隊大致有以下幾個步驟:

      1)根據tail節點定位出尾節點(last node);

      2)將新節點置為尾節點的下一個節點;

      3)更新尾節點casTail。

 1         //向隊列的尾部插入指定的元素
 2         public boolean offer(E e) {
 3             checkNotNull(e);
 4             final Node<E> newNode = new Node<E>(e);//構造新Node
 5             //循環CAS直到入隊成功。1、根據tail節點定位出尾節點(last node);2、將新節點置為尾節點的下一個節點,3、更新尾節點casTail。
 6             for (Node<E> t = tail, p = t;;) {
 7                 Node<E> q = p.next;
 8                 if (q == null) {    //判斷p是不是尾節點,tail節點不一定是尾節點,判斷是不是尾節點的依據是該節點的next是不是null
 9                     // p is last node
10                     if (p.casNext(null, newNode)) {    
11                     //設置P節點的下一個節點為新節點,如果p的next為null,說明p是尾節點,casNext返回true;如果p的next不為null,說明有其他線程更新過隊列的尾節點,casNext返回false。
12                         // Successful CAS is the linearization point
13                         // for e to become an element of this queue,
14                         // and for newNode to become "live".
15                         if (p != t) // hop two nodes at a time
16                             casTail(t, newNode);  // Failure is OK.
17                         return true;
18                     }
19                     // Lost CAS race to another thread; re-read next
20                 }
21                 else if (p == q)
22                     //p節點是null的head節點剛好被出隊,更新head節點時h.lazySetNext(h)把舊的head節點指向自己
23                     // We have fallen off list.  If tail is unchanged, it
24                     // will also be off-list, in which case we need to
25                     // jump to head, from which all live nodes are always
26                     // reachable.  Else the new tail is a better bet.
27                     p = (t != (t = tail)) ? t : head;
28                 else
29                     // Check for tail updates after two hops.
30                     p = (p != t && t != (t = tail)) ? t : q;
31                     //判斷tail節點有沒有被更新,如果沒被更新,1)p=q:p指向p.next繼續尋找尾節點;
32                     //如果被更新了,2)p=t:P賦值為新的tail節點
33                     //p != t && t != (t = tail)是怎么執行的?見隨筆附錄《通過字節碼指令分析 p != t && t != (t = tail) 語句的執行》
34                     //什么情況下p!=t.只有本分支和else if (p == q)分支含有更新變量p和t的語句,所以在p!=t出現之前已經循環過這兩個分支至少一次。
35                     
36             }
37         }
38         
39         private boolean casTail(Node<E> cmp, Node<E> val) {
40             return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
41         }
View Code

    需要注意的是:tail不總是尾節點(last node)。DougLea大師為什么這么設計呢?把tail節點永遠作為Queue的尾節點實現起來不是更簡單嗎?

    下面是tail節點永遠作為Queue的尾節點的入隊方法代碼:

 1         public boolean offer(E e) {
 2             if (e == null)
 3                 throw new NullPointerException();
 4             Node<E> n = new Node<E>(e);
 5             for (;;) {
 6                 Node<E> t = tail;
 7                 //此處如果casNext成功,那么casTail可能會成功。因為在這兩個原子操作期間,其他線程的casNext操作都會失敗,之后的casTail不會被執行,即tail節點不變。
 8                 if (t.casNext(null, n) && casTail(t, n)) {
 9                     return true;
10                 }
11             }
12         }
View Code

    這么做的缺點是每次入隊都會自旋CAS更新tail節點,入隊效率會降低,而DougLea的設計通過hops變量來減少入隊時減少更新tail節點的次數,默認情況下hops為1。當tail節點與尾節點的距離大於等於hops值時才更新Queue的tail節點。這樣帶來的壞處是入隊時需要根據tail定位尾節點,hops的值越大,定位時間就越長。DougLea的設計思想是通過增加對volatile變量的讀來減少對volatile變量的寫,而寫操作的開銷遠遠大於讀操作。所以從總體上來說入隊效率是提升的。

 

  出隊

    和入隊相似,出隊時也不是每次都會更新head節點,當head節點的item不為null時,直接彈出item;否則會更新head節點。更新head節點成功時,會把舊的head節點指向自己。

 1             public E poll() {
 2                 restartFromHead:
 3                 //兩層循環
 4                 for (;;) {
 5                     for (Node<E> h = head, p = h, q;;) {
 6                         E item = p.item;
 7 
 8                         if (item != null && p.casItem(item, null)) {
 9                             // Successful CAS is the linearization point
10                             // for item to be removed from this queue.
11                             if (p != h) // hop two nodes at a time
12                                 updateHead(h, ((q = p.next) != null) ? q : p);
13                             return item;
14                         }
15                         //隊列為空,更新head節點
16                         else if ((q = p.next) == null) {
17                             updateHead(h, p);
18                             return null;
19                         }
20                         else if (p == q)
21                             //p節點是null的head節點剛好被出隊,更新head節點時h.lazySetNext(h);把舊的head節點指向自己。
22                             //重新從head節點開始
23                             continue restartFromHead;
24                         else
25                             p = q;    //將p執行p的下一個節點
26                     }
27                 }
28             }
29             
30             //更新head節點
31             final void updateHead(Node<E> h, Node<E> p) {
32                 //通過CAS將head更新為P
33                 if (h != p && casHead(h, p))
34                     h.lazySetNext(h);//把舊的head節點指向自己
35             }
36             
37             void lazySetNext(Node<E> val) {
38                 UNSAFE.putOrderedObject(this, nextOffset, val);
39             }
View Code

  

  隊列大小

    注意:size()需要遍歷隊列中的所有元素,時間復雜度為O(n),開銷較大。並且如果在遍歷的過程中,Queue有入隊或出隊的操作,會導致該方法統計的結果不准確。所以size()方法不太有用。那如何判斷Queue是否為空呢?使用isEmpty()方法,判斷第一個節點是否為null,時間復雜度為O(1)

1         public int size() {
2             int count = 0;
3             for (Node<E> p = first(); p != null; p = succ(p))
4                 if (p.item != null)
5                     // Collection.size() spec says to max out
6                     if (++count == Integer.MAX_VALUE)
7                         break;
8             return count;
9         }
View Code

 

 

附錄:通過字節碼指令分析 p != t && t != (t = tail) 語句的執行

  在讀ConcurrentLinkedQueue源代碼時,在入隊方法的定位尾節點中讀到 p = (p != t && t != (t = tail)) ? t : q; 語句,不太理解 p != t && t != (t = tail) 的執行順序,遂通過反匯編語句仔細研究一下。

  我們都知道 A && B 運算,在A不滿足條件的情況下,B將不會執行。那在字節碼指令中是怎么實現的呢?

  通過以下代碼模擬:

1             public class Test {
2                 public static void main(String[] args) {
3                     int t = 8;
4                     int p = t;
5                     int tail = 9;
6                     boolean result = (p != t && t != (t = tail));
7                     System.out.println("p=" + p + ", t=" + t + ", result=" + result);
8                 }
9             }
View Code

 

  不出所料,運行結果為p=8, t=8, result=false。t=8說明沒有執行t != (t = tail)語句。

  看反匯編后的字節碼指令:

 1         public class Test {
 2           public static void main(java.lang.String[] args);
 3              0  bipush 8                //將單字節常量(-128~127)壓入棧頂
 4              2  istore_1 [t]            //將棧頂int型數值存入第二個本地變量,即賦值給變量t,同時常量8出棧
 5              3  iload_1 [t]                //將第二個int型本地變量(t)壓入棧頂 
 6              4  istore_2 [p]            //將棧頂int型數值存入第三個本地變量,即賦值給變量P,同時t出棧
 7              5  bipush 9                
 8              7  istore_3 [tail]
 9              8  iload_2 [p]
10              9  iload_1 [t]
11             10  if_icmpeq 24            //比較棧頂兩int型數值大小,當結果等於0時跳轉。即比較p!=t,結果為false(0),跳轉到24行,同時p和t出棧
12             13  iload_1 [t]
13             14  iload_3 [tail]
14             15  dup
15             16  istore_1 [t]
16             17  if_icmpeq 24
17             20  iconst_1
18             21  goto 25
19             24  iconst_0                //將int型0壓入棧頂。
20             25  istore 4 [result]        //將棧頂int型數值存入指定本地變量。即將result賦值為0(false)
21             27  return
22         }
View Code

  接下來再看一下第一個條件成立時的情況。代碼將p != t改為p == t:

1             public class Test {
2                 public static void main(String[] args) {
3                     int t = 8;
4                     int p = t;
5                     int tail = 9;
6                     boolean result = (p == t && t != (t = tail));
7                     System.out.println("p=" + p + ", t=" + t + ", result=" + result);
8                 }
9             }
View Code

 

  先來看運行結果p=8, t=9, result=true。說明執行了t != (t = tail)語句。

  看反匯編后的字節碼指令:

 1         public class Test {
 2           public static void main(java.lang.String[] args);
 3              0  bipush 8
 4              2  istore_1 [t]
 5              3  iload_1 [t]
 6              4  istore_2 [p]
 7              5  bipush 9
 8              7  istore_3 [tail]
 9              8  iload_2 [p]
10              9  iload_1 [t]
11             10  if_icmpne 24            //比較棧頂兩int型數值大小,當結果不等於0時跳轉。即比較p == t,結果為true(1)。所以不會跳轉到24行,繼續執行下一行。
12             13  iload_1 [t]                //將變量t壓入棧頂,此時t=8
13             14  iload_3 [tail]            //將變量tail壓入棧頂,tail=9
14             15  dup                        //復制棧頂數值並將復制值壓入棧頂。即復制tail變量值並壓入棧頂,tail=9
15             16  istore_1 [t]            //將棧頂數值存入t變量,同時出棧
16             17  if_icmpeq 24            //比較棧頂兩int型數值大小,當結果等於0時跳轉。此時棧頂有9、8。比較9!=8,結果為true(1)。所以不會跳轉到24行,繼續執行下一行。
17             20  iconst_1                //將int型1壓入棧頂
18             21  goto 25                    //無條件跳轉到25行
19             24  iconst_0
20             25  istore 4 [result]        //將棧頂1存入result,同時出棧。即result返回true
21             27  return
22         }
View Code

 

  通過字節碼指令分析可知,編譯器是通過if_icmpeq和if_icmpne比較並條件跳轉指令實現&&短路與運算的。在第二種情況中,還分析了t != (t = tail)語句的執行過程,理解會更加深入。

 

參考資料:

  《Java並發編程的藝術》

  ConcurrentLinkedQueue源碼分析(http://www.jianshu.com/p/7816c1361439)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM