隊列(Queue)也是常用的數據結構之一。在JDK中提供了一個ConcurrentLinkedQueue類用來實現高並發的隊列。從名字可以看出,這個隊列是使用鏈表作為其數據的結構的。ConcurrentLinkedQueue應該算是在高並發的環境中性能最好的了。它之所以有很好的性能,是因為其內部復雜的實現。下面我們就來一探究竟,為何性能高?
ConcurrentLinkedQueue的介紹
ConcurrentLinkedQueue是一個基於鏈表的無界非阻塞隊列,並且是線程安全的,它采用的是先進先出的規則,當我們增加一個元素時,它會添加到隊列的末尾,當我們取一個元素時,它會返回一個隊列頭部的元素。
ConcurrentLinkedQueue原理探究
作為一個鏈表,自然需要定義有關鏈表內的節點,在ConcurrentLinkedQueue中,定義的節點Node核心如下:
1 static final class Node<E> { 2 volatile E item; 3 volatile Node<E> next; 4 5 /** 6 * Constructs a node holding item. Uses relaxed write because 7 * item can only be seen after piggy-backing publication via CAS. 8 */ 9 Node(E item) { 10 ITEM.set(this, item); 11 } 12 13 /** Constructs a dead dummy node. */ 14 Node() {} 15 16 void appendRelaxed(Node<E> next) { 17 // assert next != null; 18 // assert this.next == null; 19 NEXT.set(this, next); 20 } 21 22 boolean casItem(E cmp, E val) { 23 // assert item == cmp || item == null; 24 // assert cmp != null; 25 // assert val == null; 26 return ITEM.compareAndSet(this, cmp, val); 27 } 28 }
其中 item是用來表示目標元素的。比如,但隊列中存放String時,item就是String類型。字段next表示當前Node的下一個元素,這樣每個Node就能環環相扣,串在一起了。
方法casItem()表示設置當前Node的item值。它需要兩個參數,第一個參數為期望值,第二個參數為設置目標值。當當前值等於cmp期望值時,它就會將目標設置為val。這個方法使用了CAS操作。
head和tail
ConcurrentLinkedQueue內部有兩個重要的字段head和tail,分別表示鏈表的頭部和尾部,他們都是Node類型,對於head來說,它永遠不會為null,並且通過head以及succ()后繼方法一定能完整的遍歷整個鏈表。對於tail來說,它自然應該表示隊列的末尾。但ConcurrentLinkedQueue的內部實現非常復雜,它允許在運行時鏈表處於多個不同的狀態。以tail來說,我們期望tail總是為鏈表的末尾,但實際上,tail的更新並不是及時的,而是可能會產生拖延現象。來看一下源碼:
offer():將指定的元素插入隊列的尾部
1 public boolean offer(E e) { 2 final Node<E> newNode = new Node<E>(Objects.requireNonNull(e)); 3 4 for (Node<E> t = tail, p = t;;) { 5 Node<E> q = p.next; 6 if (q == null) { 7 // p是最后一個節點 8 if (NEXT.compareAndSet(p, null, newNode)) { 9 if (p != t) // 每兩次更新一下tail 10 TAIL.weakCompareAndSet(this, t, newNode); 11 return true; 12 } 13 // CAS競爭失敗,再次嘗試 14 } 15 else if (p == q) 16 // 遇到哨兵節點,從head開始遍歷 17 //但是如果tail被修改,則使用tail(因為可能被修改正確了) 18 p = (t != (t = tail)) ? t : head; 19 else 20 // 每兩次更新后,確認tail更新 21 p = (p != t && t != (t = tail)) ? t : q; 22 } 23 }
首先值得注意的是,這個方法沒有任何的鎖操作。線程安全完全由CAS操作和隊列的算法來保證。整個方法的核心是for循環,這個循環沒有出口,直到嘗試成功,這也符合CAS的操作流程。當第一次加入元素時,由於隊列為空,因此p.next為null。程序進入第8行,將p的next節點賦值為newNode也就是將新的元素加入到隊列中。此時p==t成立,因此並不會執行第9行代碼更新tail末尾,如果 NEXT.compareAndSet(p, null, newNode) 成功,程序直接返回,如果失敗,則再進行一次循環嘗試,直到成功。因此,增加一個元素后,tail並不會被更新。此時隊列是這樣的:
當程序試圖增加第二個元素時,這時tail在head的位置上,因此t表示的就是head,所以p.next就是Node1(即第一個元素),此時q != null ,p != q ,所以程序進入else環節,運行第21行代碼,獲得最后一個節點,此時 p = q,即p指向鏈表的第一個元素Node1,開始第二個循環,p.next = null,所以程序執行第8行代碼,p更新自己的next,讓它指向新加入的節點,如果成功,此時 p != t 為true,這樣就更新t的所在位置,將t移到鏈表的最后,即更新tail(這時tail才是鏈表的末尾)。
所以tail的更新會產生滯后,並且每兩次會跳躍兩個元素。如下圖所示:
再來看代碼的第15行,對p = q 的情況進行了處理;這種情況就屬於遇到了哨兵節點導致的。所謂的哨兵節點就是next指向了自己。這種節點在隊列中存在的價值不大,主要表示要刪除的節點或者空節點,當遇到哨兵節點時,無法通過next取得后續的節點,通過代碼第18行,很可能直接返回head,這樣鏈表就會從頭部開始遍歷,重新查找到鏈表末尾。當然也有可能返回t,這樣就進行了一次“打賭”,使用新的tail作為鏈表尾部(即t),這樣避免了重新查找tail的開銷。
對代碼的第18行,進行分析: p = (t != (t = tail)) ? t : head
這句代碼雖然只有一行,但是包含的信息比較多。首先“!=”不是原子操作,它是可以被中斷的,也就是說,在執行“!=”時,程序會先取得t的值,在執行t = tail,並取得新的t的值,然后比較這兩個值是否相等。在單線程中,t != t這種語句顯然不會成立,但是在並發環境下,有可能在獲得左邊的t值后,右邊的t值被其他線程修改,這樣,t != t 就成立了。這種情況表示在比較的過程中,tail被其他線程修改了,這時,我們就用新的tail為鏈表的尾,也就是等式右邊的t,但如果tail沒有被修改,則返回head,要求從頭部開始,重新查找鏈表末尾。
poll() :獲取並移除隊列的頭,如果隊列為空則返回null
先來看源代碼:
1 public E poll() { 2 restartFromHead: for (;;) { 3 for (Node<E> h = head, p = h, q;; p = q) { 4 final E item; 5 if ((item = p.item) != null && p.casItem(item, null)) { 6 // Successful CAS is the linearization point 7 // for item to be removed from this queue. 8 if (p != h) // hop two nodes at a time 9 updateHead(h, ((q = p.next) != null) ? q : p); 10 return item; 11 } 12 else if ((q = p.next) == null) { 13 updateHead(h, p); 14 return null; 15 } 16 else if (p == q) 17 continue restartFromHead; 18 } 19 } 20 }
說明:poll()方法的作用就是刪除鏈表的表頭節點,並返回被刪除節點的值。poll()的實現原理和offer()比較相似,下面對其分析:
代碼的第5行表示:表頭節點的數據不為null,並且設置表頭數據為null的操作成功。然后進入if,如果 p != h 為true,則更新表頭然后返回刪除元素的值item,若為false,則直接返回刪除元素的值,不更新表頭。這點說明表頭head也不是實時更新的,也是每兩次更新一次(通過注釋:hop two nodes at a time),跟tail的更新類似。
代碼的第12行,表示表頭的下一個節點為null,即鏈表只有一個內容為null的表頭節點。這樣則直接更新表頭為p,返回null。
代碼的第16行,當p == q 時,跳過當前循環,跳到restartFromHead標記重新操作。
即ConcurrentLinkedQueue的元素刪除示意圖如下所示:
ConcurrentLinkedQueue的其他方法:
peek():獲取表頭元素但不移除隊列的頭,如果隊列為空則返回null。
remove(Object obj):移除隊列已存在的元素,返回true,如果元素不存在,返回false。
add(E e):將指定元素插入隊列末尾,成功返回true,失敗返回false(此方法非線程安全的方法,不推薦使用)。
注意:
雖然ConcurrentLinkedQueue的性能很好,但是在調用size()方法的時候,會遍歷一遍集合,對性能損害較大,執行很慢,因此應該盡量的減少使用這個方法,如果判斷是否為空,最好用isEmpty()方法。
ConcurrentLinkedQueue不允許插入null元素,會拋出空指針異常。
ConcurrentLinkedQueue是無界的,所以使用時,一定要注意內存溢出的問題。即對並發不是很大中等的情況下使用,不然占用內存過多或者溢出,對程序的性能影響很大,甚至是致命的。
通過以上這些說明,可以明顯的感覺到,不使用鎖而單純的使用CAS操作要求在應用層面上保證線程安全,並處理一些可能存在的不一致問題,大大增加了程序的設計和實現的難度。但是它帶來的好處就是可以得到性能的飛速提升。因此,有些場合也是值得的。
參考:《Java高並發程序設計》 葛一鳴 郭超 編著: