高效讀寫的隊列:ConcurrentLinkedQueue


隊列(Queue)也是常用的數據結構之一。在JDK中提供了一個ConcurrentLinkedQueue類用來實現高並發的隊列。從名字可以看出,這個隊列是使用鏈表作為其數據的結構的。ConcurrentLinkedQueue應該算是在高並發的環境中性能最好的了。它之所以有很好的性能,是因為其內部復雜的實現。下面我們就來一探究竟,為何性能高?

ConcurrentLinkedQueue的介紹

  ConcurrentLinkedQueue是一個基於鏈表的無界非阻塞隊列,並且是線程安全的,它采用的是先進先出的規則,當我們增加一個元素時,它會添加到隊列的末尾,當我們取一個元素時,它會返回一個隊列頭部的元素。

ConcurrentLinkedQueue原理探究

  作為一個鏈表,自然需要定義有關鏈表內的節點,在ConcurrentLinkedQueue中,定義的節點Node核心如下:

 1 static final class Node<E> {
 2         volatile E item;
 3         volatile Node<E> next;
 4 
 5         /**
 6          * Constructs a node holding item.  Uses relaxed write because
 7          * item can only be seen after piggy-backing publication via CAS.
 8          */
 9         Node(E item) {
10             ITEM.set(this, item);
11         }
12 
13         /** Constructs a dead dummy node. */
14         Node() {}
15 
16         void appendRelaxed(Node<E> next) {
17             // assert next != null;
18             // assert this.next == null;
19             NEXT.set(this, next);
20         }
21 
22         boolean casItem(E cmp, E val) {
23             // assert item == cmp || item == null;
24             // assert cmp != null;
25             // assert val == null;
26             return ITEM.compareAndSet(this, cmp, val);
27         }
28     }

 其中 item是用來表示目標元素的。比如,但隊列中存放String時,item就是String類型。字段next表示當前Node的下一個元素,這樣每個Node就能環環相扣,串在一起了。

方法casItem()表示設置當前Node的item值。它需要兩個參數,第一個參數為期望值,第二個參數為設置目標值。當當前值等於cmp期望值時,它就會將目標設置為val。這個方法使用了CAS操作。

head和tail

ConcurrentLinkedQueue內部有兩個重要的字段head和tail,分別表示鏈表的頭部和尾部,他們都是Node類型,對於head來說,它永遠不會為null,並且通過head以及succ()后繼方法一定能完整的遍歷整個鏈表。對於tail來說,它自然應該表示隊列的末尾。但ConcurrentLinkedQueue的內部實現非常復雜,它允許在運行時鏈表處於多個不同的狀態。以tail來說,我們期望tail總是為鏈表的末尾,但實際上,tail的更新並不是及時的,而是可能會產生拖延現象。來看一下源碼:

offer():將指定的元素插入隊列的尾部

 1 public boolean offer(E e) {
 2         final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));
 3 
 4         for (Node<E> t = tail, p = t;;) {
 5             Node<E> q = p.next;
 6             if (q == null) {
 7                 // p是最后一個節點
 8                 if (NEXT.compareAndSet(p, null, newNode)) {
 9                     if (p != t) // 每兩次更新一下tail
10                         TAIL.weakCompareAndSet(this, t, newNode);
11                     return true;
12                 }
13                 // CAS競爭失敗,再次嘗試
14             }
15             else if (p == q)
16                 // 遇到哨兵節點,從head開始遍歷
17                 //但是如果tail被修改,則使用tail(因為可能被修改正確了)
18                 p = (t != (t = tail)) ? t : head;
19             else
20                 // 每兩次更新后,確認tail更新
21                 p = (p != t && t != (t = tail)) ? t : q;
22         }
23     }

  首先值得注意的是,這個方法沒有任何的鎖操作。線程安全完全由CAS操作和隊列的算法來保證。整個方法的核心是for循環,這個循環沒有出口,直到嘗試成功,這也符合CAS的操作流程。當第一次加入元素時,由於隊列為空,因此p.next為null。程序進入第8行,將p的next節點賦值為newNode也就是將新的元素加入到隊列中。此時p==t成立,因此並不會執行第9行代碼更新tail末尾,如果 NEXT.compareAndSet(p, null, newNode) 成功,程序直接返回,如果失敗,則再進行一次循環嘗試,直到成功。因此,增加一個元素后,tail並不會被更新。此時隊列是這樣的:

  當程序試圖增加第二個元素時,這時tail在head的位置上,因此t表示的就是head,所以p.next就是Node1(即第一個元素),此時q != null ,p != q ,所以程序進入else環節,運行第21行代碼,獲得最后一個節點,此時 p = q,即p指向鏈表的第一個元素Node1,開始第二個循環,p.next = null,所以程序執行第8行代碼,p更新自己的next,讓它指向新加入的節點,如果成功,此時 p != t  為true,這樣就更新t的所在位置,將t移到鏈表的最后,即更新tail(這時tail才是鏈表的末尾)。

  所以tail的更新會產生滯后,並且每兩次會跳躍兩個元素。如下圖所示:

 

再來看代碼的第15行,對p = q 的情況進行了處理;這種情況就屬於遇到了哨兵節點導致的。所謂的哨兵節點就是next指向了自己。這種節點在隊列中存在的價值不大,主要表示要刪除的節點或者空節點,當遇到哨兵節點時,無法通過next取得后續的節點,通過代碼第18行,很可能直接返回head,這樣鏈表就會從頭部開始遍歷,重新查找到鏈表末尾。當然也有可能返回t,這樣就進行了一次“打賭”,使用新的tail作為鏈表尾部(即t),這樣避免了重新查找tail的開銷。

對代碼的第18行,進行分析: p = (t != (t = tail)) ? t : head

  這句代碼雖然只有一行,但是包含的信息比較多。首先“!=”不是原子操作,它是可以被中斷的,也就是說,在執行“!=”時,程序會先取得t的值,在執行t = tail,並取得新的t的值,然后比較這兩個值是否相等。在單線程中,t != t這種語句顯然不會成立,但是在並發環境下,有可能在獲得左邊的t值后,右邊的t值被其他線程修改,這樣,t != t 就成立了。這種情況表示在比較的過程中,tail被其他線程修改了,這時,我們就用新的tail為鏈表的尾,也就是等式右邊的t,但如果tail沒有被修改,則返回head,要求從頭部開始,重新查找鏈表末尾。

poll() :獲取並移除隊列的頭,如果隊列為空則返回null

先來看源代碼:

 1 public E poll() {
 2         restartFromHead: for (;;) {
 3             for (Node<E> h = head, p = h, q;; p = q) {
 4                 final E item;
 5                 if ((item = p.item) != null && p.casItem(item, null)) {
 6                     // Successful CAS is the linearization point
 7                     // for item to be removed from this queue.
 8                     if (p != h) // hop two nodes at a time
 9                         updateHead(h, ((q = p.next) != null) ? q : p);
10                     return item;
11                 }
12                 else if ((q = p.next) == null) {
13                     updateHead(h, p);
14                     return null;
15                 }
16                 else if (p == q)
17                     continue restartFromHead;
18             }
19         }
20     }

 說明:poll()方法的作用就是刪除鏈表的表頭節點,並返回被刪除節點的值。poll()的實現原理和offer()比較相似,下面對其分析:

代碼的第5行表示:表頭節點的數據不為null,並且設置表頭數據為null的操作成功。然后進入if,如果 p != h 為true,則更新表頭然后返回刪除元素的值item,若為false,則直接返回刪除元素的值,不更新表頭。這點說明表頭head也不是實時更新的,也是每兩次更新一次(通過注釋:hop two nodes at a time),跟tail的更新類似。

代碼的第12行,表示表頭的下一個節點為null,即鏈表只有一個內容為null的表頭節點。這樣則直接更新表頭為p,返回null。

代碼的第16行,當p == q 時,跳過當前循環,跳到restartFromHead標記重新操作。

即ConcurrentLinkedQueue的元素刪除示意圖如下所示:

 ConcurrentLinkedQueue的其他方法:

  peek():獲取表頭元素但不移除隊列的頭,如果隊列為空則返回null。

  remove(Object obj):移除隊列已存在的元素,返回true,如果元素不存在,返回false。

  add(E e):將指定元素插入隊列末尾,成功返回true,失敗返回false(此方法非線程安全的方法,不推薦使用)。

注意:

  雖然ConcurrentLinkedQueue的性能很好,但是在調用size()方法的時候,會遍歷一遍集合,對性能損害較大,執行很慢,因此應該盡量的減少使用這個方法,如果判斷是否為空,最好用isEmpty()方法。

  ConcurrentLinkedQueue不允許插入null元素,會拋出空指針異常。

  ConcurrentLinkedQueue是無界的,所以使用時,一定要注意內存溢出的問題。即對並發不是很大中等的情況下使用,不然占用內存過多或者溢出,對程序的性能影響很大,甚至是致命的。

 

通過以上這些說明,可以明顯的感覺到,不使用鎖而單純的使用CAS操作要求在應用層面上保證線程安全,並處理一些可能存在的不一致問題,大大增加了程序的設計和實現的難度。但是它帶來的好處就是可以得到性能的飛速提升。因此,有些場合也是值得的。

 

參考:《Java高並發程序設計》 葛一鳴 郭超 編著:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM