本文開始介紹並發隊列,為后面介紹線程池打下基礎。並發隊列莫非也是出隊、入隊操作,還有一個比較重要的點就是如何保證其線程安全性,有些並發隊列保證線程安全是通過lock,有些是通過CAS。
我們從ConcurrentLinkedQueue開始吧。
1. 介紹
ConcurrentLinkedQueue是集合框架的一員,是一個無界限且線程安全,基於單向鏈表的隊列。該隊列的順序是FIFO。當多線程訪問公共集合時,使用這個類是一個不錯的選擇。不允許null元素。是一個非阻塞的隊列。
它的迭代器是弱一致性的,不會拋出java.util.ConcurrentModificationException,也可能在迭代期間,其他操作也正在進行。size()方法,不能保證是正確的,因為在迭代時,其他線程也可以操作該隊列。
1.1 類圖
(顯示的方法都是公有方法)
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>
繼承至AbstractQueue,他提供了隊列操作的一個框架,有基本的方法,add、remove,element等等,這些方法基於offer,poll,peek(最主要看這幾個方法)。
2. 源碼分析
2.1 類的整體結構
隊列中的元素Node
private static class Node<E> {
// 保證兩個字段的可見性
volatile E item;
volatile Node<E> next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
// putOrderedXXX是putXXXVolatile的延遲版本,設置某個值不會被其他線程立即看到(可見性)
// putOrderedXXX設置的值的修飾應該是volatile,這樣該方法才有用
// 關於為什么使用這個方法,主要目的肯定是提高效率,但是具體原理,我只能告訴大家跟內存屏障有關(我也不太清楚這一塊,待我研究后,再寫一篇文章)
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe類中的東西,可以去了解一下
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
構造器1:
// private transient volatile Node<E> head;
// private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
構造器2:
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}
下面開始講方法,從offer,poll,peek從這幾個方法入手
2.2 offer
添加元素到隊尾。因為隊列是無界的,這個方法永遠不會返回false
分為三種情況進行分析(一定自己跟着代碼debug,一步步的走)
- 單線程時(使用IDEA debug一直進入的是 else if把我搞迷茫了,我會寫一個博客來解釋原因)
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("A");
queue.offer("B");
以上面的代碼,分析每一個步驟。
執行構造函數后:
此時鏈表的head與tail指向哨兵節點
插入"A", 此時沒有設置tail('兩跳機制',這里的原因后面詳見)
插入"B",
單線程情況比較簡單
- 多線程offer時
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
// 只有一個線程能夠CAS成功,其余的都重試
if (p.casNext(null, newNode)) {
// 延遲設置tail,第一個node入隊不會設置tail,第二個node入隊才會設置tail
//以此類推, '兩跳機制'
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
// 這里是有其他線程正在poll操作才會進入,此時只考慮多線程offer的情況,暫不分析
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
// 存在tail被更改前,和更改后的兩種情況
p = (p != t && t != (t = tail)) ? t : q;
}
}
結合上面的代碼,看圖
- 步驟一,線程A、線程B都執行到
if (p.casNext(null, newNode))
- 步驟二,只有一個線程執行成功,假設線程A成功,線程B失敗
因為p(a) == t(a), 此時不執行casTail,tail不變。q = p.next, 所以此時q(b) = Node2 ,那么 p(b) != q(b), 線程B執行p = (p != t && t != (t = tail)) ? t : q;
線程B即將執行
p = (p != t && t != (t = tail)) ? t : q;
- 步驟三 此時線程C進入。
此時,p(c) != q(c), 線程C執行
p = (p != t && t != (t = tail)) ? t : q;
執行完后,q(c)賦值給p(c). 再次循環,此時,q(c) == null, 設置p(c)的next,線程C將值入隊
- 步驟四 p(c) != t(c), 線程C執行casTail(t, newNode), 線程C設置尾結點
- 此時線程B執行
p = (p != t && t != (t = tail)) ? t : q;
因為p(b) == t(b),所以 q(b) 賦值給 p(b)。繼續循環,最后得到
- 多線程的另一種情況,回到步驟三,此時線程C把值入隊了,但是還沒有設置tail
- 線程B,將值入隊成功
在步驟三的基礎上,線程B入隊成功后,目前的狀況如下:
此時,線程C執行casTail(t, newNode),但是現在的tail != t(c), CAS失敗, 直接返回。
2.2.1 小結
上面不管是多線程還是單線程,都是努力的去尋找next為null的節點,若為next節點為null,再判斷是否滿足設置tail的條件。
多線程offer的第一種情況存在設置tail滯后的問題,我把它稱之為"兩跳機制",后面講使用這種機制的原因。
我們看到上面的情況一直沒有進入else if (p == q)分支,進入else if分支只會發生在有其他線程在poll時,我們先講講poll,再講講何時進入else if分支。
2.3 poll
刪除並返回頭結點的值
簡單提一下單線程與多線程的poll,着重分析一下poll與offer共存的情況
-
單線程時
單線程比較簡單,就不畫圖了,按照上面的queue,進行一步一步的debug就行了 -
多線程,只有poll時
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// casItem這里只有一個線程能夠成功,其余的繼續下面的代碼
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
// 將之前的頭節點,自己指向自己,等待被GC
h.lazySetNext(h);
}
從上面代碼可以看出,修改item與head都會使用CAS,這些變量都是被volatile修飾,所以保證了這些變量的線程安全性。不管是單線程還是多線程的poll,它們都是去尋找一個有效的頭節點,刪除並返回該值,若不是有效的就繼續找,若隊列為空了,就返回null。
最后分析一下,offer與poll共存的情況
-
線程A做offer操作,線程B做poll操作,初始的狀態如下:
-
線程A進入。
-
線程A將要執行
Node<E> q = p.next;
線程B進入,進行poll操作
此時,線程B執行了一次內循環,將q(b)賦值給了p(b);
-
線程B再次執行內循環,此時將p(b).item置空,將p(b)賦值給head,之前的h(b)的next指向自己,線程B退出
-
線程A執行
Node<E> q = p.next;
此時,p(a).next 指向自己(等待被GC), 進入else if (p == q)分支,線程A退出,經過一番執行后,最后得到的狀態,如下:
進入else if (p == q)分支的情況,只會發生在poll與offer共存的情況下。
2.4 peek
獲取首個有效的節點,並返回
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
peek與poll的操作類似,這里就貼一下代碼就是了。
3. 總結
ConcurrentLinkedQueue是使用非阻塞的方式保證線程的安全性,在設置關系到整個Queue結構的變量時(這些變量都被volatile修飾),都使用CAS的方式對它們進行賦值。
- size方法是線程不安全的,返回的結果可能不准確
關於“兩跳機制”(自己取得名字),
Both head and tail are permitted to lag. In fact, failing to update them every time one could is a significant optimization (fewer CASes). As with LinkedTransferQueue (see the internal documentation for that class), we use a slack threshold of two; that is, we update head/tail when the current pointer appears to be two or more steps away from the first/last node.
Since head and tail are updated concurrently and independently, it is possible for tail to lag behind head (why not)? -- ConcurrentLinkedQueue
大致意思,head與tail允許被延遲設置。不是每次更新它們是一個重大的優化,這樣做就可以更少的CAS(這樣在很多線程使用時,積少成多,效率更高)。它的延遲閾值是2,設置head/tail時,當前的結點離first/last有兩步或更多的距離。 這就是“兩跳機制”
我們想不通的地方,可能是這個類或方法的一個優化的地方。向着大佬看齊~
4. 引用
- Java多線程 39 - ConcurrentLinkedQueue詳解,講的非常好,上面的思路是跟着他來的