JDK 中基於鏈表的非阻塞無界隊列 ConcurrentLinkedQueue 原理剖析,ConcurrentLinkedQueue 內部是如何使用 CAS 非阻塞算法來保證多線程下入隊出隊操作的線程安全?
ConcurrentLinkedQueue是線程安全的無界非阻塞隊列,其底層數據結構是使用單向鏈表實現,入隊和出隊操作是使用我們經常提到的CAS來保證線程安全的。
我們首先看一下ConcurrentLinkedQueue的類圖結構先,好有一個內部邏輯有一個大概的印象,如下圖所示:

可以清楚的看到ConcurrentLinkedQueue內部的隊列是使用單向鏈表方式實現,類中兩個volatile 類型的Node 節點分別用來存放隊列的首位節點。
首先我們先來看一下ConcurrentLinkedQueue的構造函數,如下:
public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
通過無參構造函數可知默認頭尾節點都是指向 item 為 null 的哨兵節點。
Node節點內部則維護一個volatile 修飾的變量item 用來存放節點的值,next用來存放鏈表的下一個節點,從而鏈接為一個單向無界鏈表,這就是單向無界的根本原因。如下圖:

接下來看ConcurrentLinkedQueue 主要關注入隊,出隊,獲取隊列元素的方法的源碼,如下所示:
1.首先看入隊方法offer,offer 操作是在隊列末尾添加一個元素,如果傳遞的參數是 null 則拋出 NPE 異常,否者由於 ConcurrentLinkedQueue 是無界隊列該方法一直會返回 true。另外由於使用 CAS 無阻塞算法,該方法不會阻塞調用線程,其源碼如下:
public boolean offer(E e) { //(1)e為null則拋出空指針異常 checkNotNull(e); //(2)構造Node節點 final Node<E> newNode = new Node<E>(e); //(3)從尾節點進行插入 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; //(4)如果q==null說明p是尾節點,則執行插入 if (q == null) { //(5)使用CAS設置p節點的next節點 if (p.casNext(null, newNode)) { //(6)cas成功,則說明新增節點已經被放入鏈表,然后設置當前尾節點 if (p != t) casTail(t, newNode); // Failure is OK. return true; } } else if (p == q)//(7) //多線程操作時候,由於poll操作移除元素后有可能會把head變為自引用,然后head的next變為新head,所以這里需要 //重新找新的head,因為新的head后面的節點才是正常的節點。 p = (t != (t = tail)) ? t : head; else //(8) 尋找尾節點 p = (p != t && t != (t = tail)) ? t : q; } }
類圖結構時候談到構造隊列時候參構造函數創建了一個 item 為 null 的哨兵節點,並且 head 和 tail 都是指向這個節點,下面通過圖形結合來講解下 offer 操作的代碼實現。
1.首先看一下,當一個線程調用offer(item)時候情況:首先代碼(1)對傳參判斷空檢查,如果為null 則拋出空指針異常,然后代碼(2)則使用item作為構造函數參數創建一個新的節點,
代碼(3)從隊列尾部節點開始循環,目的是從隊列尾部添加元素。如下圖:

上圖是執行代碼(4)時候隊列的情況,這時候節點 p , t ,head ,tail 同時指向了item為null的哨兵節點,由於哨兵節點的next節點為null,所以這里q指向也是null。
代碼(4)發現q==null 則執行代碼(5),通過CAS原子操作判斷p 節點的next節點是否為null,如果為null 則使用節點newNode替換p 的next節點,
然后執行代碼(6),由於 p == t ,所以沒有設置尾部節點,然后退出offer方法,這時候隊列的狀態圖如下:

上面講解的是一個線程調用offer方法的情況下,如果多個線程同時調用,就會存在多個線程同時執行到代碼(5),假設線程A調用offer(item1),
線程B調用offer(item2),線程 A 和線程B同時到 p.casNext(null,newNode)。而CAS的比較並設置操作是原子性的,假設線程A先執行了比較設置操作,
則發現當前P的next節點確實是null ,則會原子性更新next節點為newNode,這時候線程B 也會判斷p 的next節點是否為null,結果發現不是null,(因為線程 A 已經設置了 p 的 next 為 newNode)則會跳到代碼(3),
然后執行到代碼(4)的時候的隊列分布圖如下:

根據這個狀態圖可知線程B會執行代碼(8),然后q 賦值給了p,這個時候狀態圖為:

然后線程B再次跳轉到代碼(3)執行,當執行到代碼(4)時候隊列狀態圖為:

由於這時候q == null ,所以線程B 會執行步驟(5),通過CAS操作判斷 當前p的next 節點是否為null ,不是則再次循環后嘗試,是則使用newNode替換,假設CAS成功了,那么執行步驟(6),
由於 p != t 所以設置tail節點為newNode ,然后退出offer方法。這時候隊列的狀態圖為:

到現在為止,offer代碼在執行路徑現在就差步驟(7)還沒有執行過,其實這個要在執行poll操作才會出現的,這里先看一下執行poll操作后可能會存在的一種情況,如下圖所示:

下面分析下當隊列處於這種狀態調用offer添加元素代碼執行到代碼(4)的時候的隊列狀態圖,如下:

由於q節點不為空並且p==q 所以執行代碼(7),因為 t == tail所以p 被賦值為head ,然后進入循環,循環后執行到代碼(4)的時候的隊列狀態圖,如下:

由於 q ==null,所以執行代碼(5),進行CAS操作,如果當前沒有其他線程執行offer操作,則CAS操作會成功,p的next節點被設置為新增節點,然后執行代碼(6),
由於p != t 所以設置新節點為隊列尾節點,現在隊列狀態圖,如下:

在這里的自引用的節點會被垃圾回收掉,可見offer操作里面關鍵步驟是代碼(5)通過原子CAS操作來進行控制同時只有一個線程可以追加元素到隊列末尾,進行cas競爭失敗的線程,
則會通過循環一次次嘗試進行cas操作,知道cas成功才會返回,也就是通過使用無限循環里面不斷進行CAS嘗試方式來替代阻塞算法掛起調用線程,相比阻塞算法,這是使用CPU資源換取阻塞帶來的開銷。
2.poll操作,poll 操作是在隊列頭部獲取並且移除一個元素,如果隊列為空則返回 null,我們首先看改方法的源碼,如下:
public E poll() { //(1) goto標記 restartFromHead: //(2)無限循環 for (;;) { for (Node<E> h = head, p = h, q;;) { //(3)保存當前節點值 E item = p.item; //(4)當前節點有值則cas變為null if (item != null && p.casItem(item, null)) { //(5)cas成功標志當前節點以及從鏈表中移除 if (p != h) updateHead(h, ((q = p.next) != null) ? q : p); return item; } //(6)當前隊列為空則返回null else if ((q = p.next) == null) { updateHead(h, p); return null; } //(7)自引用了,則重新找新的隊列頭節點 else if (p == q) continue restartFromHead; else//(8) p = q; } } }
final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
poll操作是從隊頭獲取元素,所以代碼(2)內層循環是從head節點開始迭代,代碼(3)獲取當前隊頭的節點,當隊列一開始為空的時候隊列狀態為:

由於head 節點指向的item 為null 的哨兵節點,所以會執行到代碼(6),假設這個過程沒有線程調用offer,則此時q等於null ,如下圖:

所以執行updateHead方法,由於h 等於 p所以沒有設置頭節點,poll方法直接返回null。
假設執行到代碼(6)的時候已經有其他線程調用了offer 方法成功添加了一個元素到隊列,這時候q執行的是新增元素的節點,這時候隊列狀態圖為:

所以代碼(6)判斷結果為false,然后會轉向代碼(7)執行,而此時p不等於q,所以轉向代碼(8)執行,執行結果是p指向了節點q,此時的隊列狀態如下:

然后程序轉向代碼(3)執行,p現在指向的元素值不為null,則執行p.casItem(item, null) 通過 CAS 操作嘗試設置 p 的 item 值為 null,
如果此時沒有其他線程進行poll操作,CAS成功則執行代碼(5),由於此時 p != h ,所以設置頭節點為p,poll然后返回被從隊列移除的節點值item。此時隊列狀態為:

這個狀態就是前面提到offer操作的時候,offer代碼的執行路徑(7)執行的前提狀態。
假如現在一個線程調用了poll操作,則在執行代碼(4)的時候的隊列狀態為:

可以看到這時候執行代碼(6)返回null。
現在poll的代碼還有個分支(7)還沒有被執行過,那么什么時候會執行呢?假設線程A執行poll操作的時候,當前的隊列狀態,如下:

那么執行p.casItem(item, null) 通過 CAS 操作嘗試設置 p 的 item 值為 null。
假設 CAS 設置成功則標示該節點從隊列中移除了,此時隊列狀態為:

然后由於p != h,所以會執行updateHead 方法,假如線程A執行updateHead前,另外一個線程B開始poll操作,這時候線程B的p指向head節點,
但是還沒有執行到代碼(6),這時候隊列狀態為:

然后線程A執行 updateHead 操作,執行完畢后線程 A 退出,這時候隊列狀態為:

然后線程B繼續執行代碼(6)q=p.next由於該節點是自引用節點所以p==q,所以會執行代碼(7)跳到外層循環restartFromHead,重新獲取當前隊列隊頭 head, 現在狀態為:

總結:poll元素移除一個 元素的時候,只是簡單的使用CAS操作把當前節點的item值設置為null,然后通過重新設置頭節點讓該元素從隊列里面摘除,
被摘除的節點就成了孤立節點,這個節點會被在GC的時候會被回收掉。另外,執行分支中如果發現頭節點被修改了要跳到外層循環重新獲取新的頭節點。
3.peek操作,peek 操作是獲取隊列頭部一個元素(只不獲取不移除),如果隊列為空則返回 null,其源碼如下:
public E peek() { //(1) restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { //(2) E item = p.item; //(3) if (item != null || (q = p.next) == null) { updateHead(h, p); return item; } //(4) else if (p == q) continue restartFromHead; else //(5) p = q; } } }
代碼結構與poll操作類似,不同於代碼(3)的使用只是少了castItem 操作,其實這很正常,因為peek只是獲取隊列頭元素值,並不清空其值,
根據前面我們知道第一次執行 offer 后 head 指向的是哨兵節點(也就是 item 為 null 的節點),那么第一次peek的時候,代碼(3)中會發現item==null,
然后會執行 q = p.next, 這時候 q 節點指向的才是隊列里面第一個真正的元素或者如果隊列為 null 則 q 指向 null。
當隊列為空的時候,隊列狀態圖,如下:

這時候執行updateHead 由於 h 節點等於 p 節點所以不進行任何操作,然后 peek 操作會返回 null。
當隊列中至少有一個元素的時候(假如只有一個),這時候隊列狀態為:

這時候執行代碼(5)這時候 p 指向了 q 節點,然后執行代碼(3)這時候隊列狀態為:

執行代碼(3)發現 item 不為 null,則執行 updateHead 方法,由於 h!=p, 所以設置頭結點,設置后隊列狀態為:

可以看到其實就是剔除了哨兵節點。
總結:peek操作代碼與poll操作類似,只是前者只獲取隊列頭元素,但是並不從隊列里面刪除,而后者獲取后需要從隊列里面刪除,另外,在第一次調用peek操作的時候,
會刪除哨兵節點,並讓隊列的head節點指向隊列里面第一個元素或者null。
4.size方法,獲取當前隊列元素個數,在並發環境下不是很有用,因為 CAS 沒有加鎖所以從調用 size 函數到返回結果期間有可能增刪元素,導致統計的元素個數不精確。源碼如下:
public int size() { int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) // 最大返回Integer.MAX_VALUE if (++count == Integer.MAX_VALUE) break; return count; }
//獲取第一個隊列元素(哨兵元素不算),沒有則為null Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; } } }
//獲取當前節點的next元素,如果是自引入節點則返回真正頭節點 final Node<E> succ(Node<E> p) { Node<E> next = p.next; return (p == next) ? head : next; }
5.remove方法,如果隊列里面存在該元素則刪除給元素,如果存在多個則刪除第一個,並返回 true,否者返回 false。源碼如下:
public boolean remove(Object o) { //查找元素為空,直接返回false if (o == null) return false; Node<E> pred = null; for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; //相等則使用cas值null,同時一個線程成功,失敗的線程循環查找隊列中其它元素是否有匹配的。 if (item != null && o.equals(item) && p.casItem(item, null)) { //獲取next元素 Node<E> next = succ(p); //如果有前驅節點,並且next不為空則鏈接前驅節點到next, if (pred != null && next != null) pred.casNext(p, next); return true; } pred = p; } return false; }
ConcurrentLinkedQueue 底層使用單向鏈表數據結構來保存隊列元素,每個元素被包裝為了一個 Node 節點,隊列是靠頭尾節點來維護的,創建隊列時候頭尾節點指向一個 item 為 null 的哨兵節點,
第一次 peek 或者 first 時候會把 head 指向第一個真正的隊列元素。由於使用非阻塞 CAS 算法,沒有加鎖,所以獲取 size 的時候有可能進行了 offer,poll 或者 remove 操作,導致獲取的元素個數不精確,所以在並發情況下 size 函數不是很有用。
- JDK 中基於鏈表的非阻塞無界隊列 ConcurrentLinkedQueue 原理剖析,ConcurrentLinkedQueue 內部是如何使用 CAS 非阻塞算法來保證多線程下入隊出隊操作的線程安全?
