ConcurrentLinkedQueue
在考慮並發的時候可以先考慮單線程的情況,然后再將並發的情況考慮進來。
比如ConcurrentLinkedQueue:
- 先考慮單線的offer
- 再考慮多線程時候的offer:
- 多個線程offer
- 部分線程offer,部分線程poll
- offer比poll快
- poll比offer快
offer
public boolean offer(E e) {
checkNotNull(e);
// 新建一個node
final Node<E> newNode = new Node<E>(e);
// 不斷重試(for只有初始化條件,沒有判斷條件),直到將node加入隊列
// 初始化p、t都是指向tail
// 循環過程中一直讓p指向最后一個節點。讓t指向tail
for (Node<E> t = tail, p = t;;) {
// q一直指向p的下一個
Node<E> q = p.next;
if (q == null) {
// p is last node
// 如果q為null表示p是最后一個元素,嘗試加入隊列
// 如果失敗,表示其他線程已經修改了p指向的節點
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
// node加入隊列之后,tail距離最后一個節點已經相差大於一個了,需要更新tail
if (p != t) // hop two nodes at a time
// 這兒允許設置tail為最新節點的時候失敗,因為添加node的時候是根據p.next是不是為null判斷的,
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// 雖然q是p.next,但是因為是多線程,在offer的同時也在poll,如offer的時候正好p被poll了,那么在poll方法中的updateHead方法會將head指向當前的q,而把p.next指向自己,即:p.next == p
// 這個時候就會造成tail在head的前面,需要重新設置p
// 如果tail已經改變,將p指向tail,但這個時候tail依然可能在head前面
// 如果tail沒有改變,直接將p指向head
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
// tail已經不是最后一個節點,將p指向最后一個節點
p = (p != t && t != (t = tail)) ? t : q;
}
}
poll
public E poll() {
// 如果出現p被刪除的情況需要從head重新開始
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
// 隊列為空
updateHead(h, p);
return null;
}
else if (p == q)
// 當一個線程在poll的時候,另一個線程已經把當前的p從隊列中刪除——將p.next = p,p已經被移除不能繼續,需要重新開始
continue restartFromHead;
else
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
為什么會出現p == q
假設下面這種情況:

在第一種情況下,線程A執行offer操作:
第一次循環的時候
- tail = node0, p = t = node0
- 執行
Node<E> q = p.next;之后q = p.next,也就是node0.next - 執行
if (q == null),不滿足,繼續往下 - 到了
else if (p == q)這一步的時候線程A暫停
線程A現在的結果是:
head = node0
tail = node0
t = node0
p = node0
q = node0.next
因為程序時多線程的,我們假設線程A暫定在了第4步,接下來看線程B,線程B執行poll操作:
第一次循環:
- head = node0, p = h = node0;
- 執行
E item = p.item;,item = null if (item != null && p.casItem(item, null))不滿足- 執行
else if ((q = p.next) == null),q = node1,條件不滿足 - 執行
else if (p == q),條件不滿足 - 執行
p = q;,p = node1
第二次循環:
- head = node0, h = node0, p = node1;
- 執行
E item = p.item;,item = node2 if (item != null && p.casItem(item, null)),item != null滿足,如果CAS操作成功p != h成立,調用updateHead- 執行
updateHead(h, ((q = p.next) != null) ? q : p);之后,q = node2 - 在updateHead里面
h != p成立,如果CAS操作成功(將head設置為node2)- 執行
h.lazySetNext(h);,這個時候h = node0,這個方法執行完之后,node0.next = node0
- 將item返回
這個時候隊列就是圖中第二種情況,線程A結果為:
head = node2
tail = node0
t = node0
p = node0
q = node0.next = node0
看到結果了吧,這個時候 p = q = node0 !其實主要原因是在 updateHead方法的 h.lazySetNext(h) 操作里面,將舊的head.next設置成為了自己即 head.next = head。但是要注意:是舊的head
從上面分析的過程要注意:
- 多線程執行環境,單線程下一定不會出現這種情況
- 注意引用賦值比如
Node<E> q = p.next,q指向的是p.next,雖然目前p.next = node1,但是當p.next指向變了之后,q也就跟着變了 - 再就是閱讀源碼的時候一定要弄清楚調用的每個方法的作用,這樣才能對整個方法有一個准確的理解,比如這里的
h.lazySetNext(h);
參考
http://www.jianshu.com/p/7816c1361439
Java並發編程的藝術
