SynchronousQueue原理詳解-公平模式
一、介紹
SynchronousQueue是一個雙棧雙隊列算法,無空間的隊列或棧,任何一個對SynchronousQueue寫需要等到一個對SynchronousQueue的讀操作,反之亦然。一個讀操作需要等待一個寫操作,相當於是交換通道,提供者和消費者是需要組隊完成工作,缺少一個將會阻塞線程,知道等到配對為止。
SynchronousQueue是一個隊列和棧算法實現,在SynchronousQueue中雙隊列FIFO提供公平模式,而雙棧LIFO提供的則是非公平模式。
對於SynchronousQueue來說,他的put方法和take方法都被抽象成統一方法來進行操作,通過抽象出內部類Transferer,來實現不同的操作。
注意事項:本文分析主要是針對jdk1.8的版本進行分析,下面的代碼中的線程執行順序可能並不能完全保證順序性,執行時間比較短,所以暫且認定有序執行。
約定:圖片中以Reference-開頭的代表對象的引用地址,通過箭頭方式進行引用對象。
Transferer.transfer方法主要介紹如下所示:
abstract static class Transferer<E> {
/**
* 執行put和take方法.
*
* @param e 非空時,表示這個元素要傳遞給消費者(提供者-put);
* 為空時, 則表示當前操作要請求消費一個數據(消費者-take)。
* offered by producer.
* @param timed 決定是否存在timeout時間。
* @param nanos 超時時長。
* @return 如果返回非空, 代表數據已經被消費或者正常提供; 如果為空,
* 則表示由於超時或中斷導致失敗。可通過Thread.interrupted來檢查是那種。
*/
abstract E transfer(E e, boolean timed, long nanos);
}
接下來看一下SynchronousQueue的字段信息:
/** CPU數量 */
static final int NCPUS = Runtime.getRuntime().availableProcessors();
/**
* 自旋次數,如果transfer指定了timeout時間,則使用maxTimeSpins,如果CPU數量小於2則自旋次數為0,否則為32
* 此值為經驗值,不隨CPU數量增加而變化,這里只是個常量。
*/
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
/**
* 自旋次數,如果沒有指定時間設置,則使用maxUntimedSpins。如果NCPUS數量大於等於2則設定為為32*16,否則為0;
*/
static final int maxUntimedSpins = maxTimedSpins * 16;
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices.
*/
static final long spinForTimeoutThreshold = 1000L;
- NCPUS:代表CPU的數量
- maxTimedSpins:自旋次數,如果transfer指定了timeout時間,則使用maxTimeSpins,如果CPU數量小於2則自旋次數為0,否則為32,此值為經驗值,不隨CPU數量增加而變化,這里只是個常量。
- maxUntimedSpins:自旋次數,如果沒有指定時間設置,則使用maxUntimedSpins。如果NCPUS數量大於等於2則設定為為32*16,否則為0;
- spinForTimeoutThreshold:為了防止自定義的時間限過長,而設置的,如果設置的時間限長於這個值則取這個spinForTimeoutThreshold 為時間限。這是為了優化而考慮的。這個的單位為納秒。
公平模式-TransferQueue
TransferQueue內部是如何進行工作的,這里先大致講解下,隊列采用了互補模式進行等待,QNode中有一個字段是isData,如果模式相同或空隊列時進行等待操作,互補的情況下就進行消費操作。
入隊操作相同模式
不同模式時進行出隊列操作:
這時候來了一個isData=false的互補模式,隊列就會變成如下狀態:
TransferQueue繼承自Transferer抽象類,並且實現了transfer方法,它主要包含以下內容:
QNode
代表隊列中的節點元素,它內部包含以下字段信息:
- 字段信息描述
字段 | 描述 | 類型 |
---|---|---|
next | 下一個節點 | QNode |
item | 元素信息 | Object |
waiter | 當前等待的線程 | Thread |
isData | 是否是數據 | boolean |
- 方法信息描述
方法 | 描述 |
---|---|
casNext | 替換當前節點的next節點 |
casItem | 替換當前節點的item數據 |
tryCancel | 取消當前操作,將當前item賦值為this(當前QNode節點) |
isCancelled | 如果item是this(當前QNode節點)的話就返回true,反之返回false |
isOffList | 如果已知此節點離隊列,判斷next節點是不是為this,則返回true,因為由於* advanceHead操作而忘記了其下一個指針。 |
E transfer(E e, boolean timed, long nanos) {
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/
QNode s = null; // constructed/reused as needed
// 分為兩種狀態1.有數據=true 2.無數據=false
boolean isData = (e != null);
// 循環內容
for (;;) {
// 尾部節點。
QNode t = tail;
// 頭部節點。
QNode h = head;
// 判斷頭部和尾部如果有一個為null則自旋轉。
if (t == null || h == null) // 還未進行初始化的值。
continue; // 自旋
// 頭結點和尾節點相同或者尾節點的模式和當前節點模式相同。
if (h == t || t.isData == isData) { // 空或同模式。
// tn為尾節點的下一個節點信息。
QNode tn = t.next;
// 這里我認為是閱讀不一致,原因是當前線程還沒有阻塞的時候其他線程已經修改了尾節點tail會導致當前線程的tail節點不一致。
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // 這里如果指定timed判斷時間小於等於0直接返回。
return null;
// 判斷新增節點是否為null,為null直接構建新節點。
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // 如果next節點不為null說明已經有其他線程進行tail操作
continue;
// 將t節點替換為s節點
advanceTail(t, s);
// 等待有消費者消費線程。
Object x = awaitFulfill(s, e, timed, nanos);
// 如果返回的x,指的是s.item,如果s.item指向自己的話清除操作。
if (x == s) {
clean(t, s);
return null;
}
// 如果沒有取消聯系
if (!s.isOffList()) {
// 將當前節點替換頭結點
advanceHead(t, s); // unlink if head
if (x != null) // 取消item值,這里是take方法時會進行item賦值為this
s.item = s;
// 將等待線程設置為null
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
// 獲取頭結點下一個節點
QNode m = h.next; // node to fulfill
// 如果當前線程尾節點和全局尾節點不一致,重新開始
// 頭結點的next節點為空,代表無下一個節點,則重新開始,
// 當前線程頭結點和全局頭結點不相等,則重新開始
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
我們來看一下awaitFulfill方法內容:
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
// 如果指定了timed則為System.nanoTime() + nanos,反之為0。
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 獲取當前線程。
Thread w = Thread.currentThread();
// 如果頭節點下一個節點是當前s節點(以防止其他線程已經修改了head節點)
// 則運算(timed ? maxTimedSpins : maxUntimedSpins),否則直接返回。
// 指定了timed則使用maxTimedSpins,反之使用maxUntimedSpins
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
// 自旋
for (;;) {
// 判斷是否已經被中斷。
if (w.isInterrupted())
//嘗試取消,將當前節點的item修改為當前節點(this)。
s.tryCancel(e);
// 獲取當前節點內容。
Object x = s.item;
// 判斷當前值和節點值不相同是返回,因為彈出時會將item值賦值為null。
if (x != e)
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
- 首先先判斷有沒有被中斷,如果被中斷則取消本次操作,將當前節點的item內容賦值為當前節點。
- 判斷當前節點和節點值不相同是返回
- 將當前線程賦值給當前節點
- 自旋,如果指定了timed則使用
LockSupport.parkNanos(this, nanos);
,如果沒有指定則使用LockSupport.park(this);
。 - 中斷相應是在下次才能被執行。
通過上面源碼分析我們這里做出簡單的示例代碼演示一下put操作和take操作是如何進行運作的,首先看一下示例代碼,如下所示:
/**
* SynchronousQueue進行put和take操作。
*
* @author battleheart
*/
public class SynchronousQueueDemo {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
SynchronousQueue<Integer> queue = new SynchronousQueue<>(true);
Thread thread1 = new Thread(() -> {
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread1.start();
Thread.sleep(2000);
Thread thread2 = new Thread(() -> {
try {
queue.put(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread2.start();
Thread.sleep(10000);
Thread thread3 = new Thread(() -> {
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread3.start();
}
}
首先上來之后進行的是兩次put操作,然后再take操作,默認隊列上來會進行初始化,初始化的內容如下代碼所示:
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
初始化后隊列的狀態如下圖所示:
當線程1執行put操作時,來分析下代碼:
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue;
首先執行局部變量t代表隊尾指針,h代表隊頭指針,判斷隊頭和隊尾不為空則進行下面的操作,接下來是if…else語句這里是分水嶺,當相同模式操作的時候執行if語句,當進行不同模式操作時執行的是else語句,程序是如何控制這樣的操作的呢?接下來我們慢慢分析一下:
if (h == t || t.isData == isData) { // 隊列為空或者模式相同時進行if語句
QNode tn = t.next;
if (t != tail) // 判斷t是否是隊尾,不是則重新循環。
continue;
if (tn != null) { // tn是隊尾的下個節點,如果tn有內容則將隊尾更換為tn,並且重新循環操作。
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // 如果指定了timed並且延時時間用盡則直接返回空,這里操作主要是offer操作時,因為隊列無存儲空間的當offer時不允許插入。
return null;
if (s == null) // 這里是新節點生成。
s = new QNode(e, isData);
if (!t.casNext(null, s)) // 將尾節點的next節點修改為當前節點。
continue;
advanceTail(t, s); // 隊尾移動
Object x = awaitFulfill(s, e, timed, nanos); //自旋並且設置線程。
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
}
上面代碼是if語句中的內容,進入到if語句中的判斷是如果頭結點和尾節點相等代表隊列為空,並沒有元素所有要進行插入隊列的操作,或者是隊尾的節點的isData標志和當前操作的節點的類型一樣時,會進行入隊操作,isData標識當前元素是否是數據,如果為true代表是數據,如果為false則代表不是數據,換句話說只有模式相同的時候才會往隊列中存放,如果不是模式相同的時候則代表互補模式,就不走if語句了,而是走了else語句,上面代碼中做有注釋講解,下面看一下這里:
if (s == null) // 這里是新節點生成。
s = new QNode(e, isData);
if (!t.casNext(null, s)) // 將尾節點的next節點修改為當前節點。
continue
當執行上面代碼后,隊列的情況如下圖所示:(這里視為插入第一個元素
圖,方便下面的引用)
接下來執行這段代碼:
advanceTail(t, s); // 隊尾移動
修改了tail節點后,這時候就需要進行自旋操作,並且設置QNode的waiter等待線程,並且將線程等待,等到喚醒線程進行喚醒操作
Object x = awaitFulfill(s, e, timed, nanos); //自旋並且設置線程。
方法內部分析局部內容,上面已經全部內容的分析:
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
如果自旋時間spins還有則進行循環遞減操作,接下來判斷如果當前節點的waiter是空則價格當前線程賦值給waiter,上圖中顯然是為空的所以會把當前線程進行賦值給我waiter,接下來就是等待操作了。
上面線程則處於等待狀態,接下來是線程二進行操作,這里不進行重復進行,插入第二個元素隊列的狀況,此時線程二也處於等待狀態。
上面的主要是put了兩次操作后隊列的情況,接下來分析一下take操作時又是如何進行操作的,當take操作時,isData為false,而隊尾的isData為true兩個不相等,所以不會進入到if語句,而是進入到了else語句
} else { // 互補模式
QNode m = h.next; // 獲取頭結點的下一個節點,進行互補操作。
if (t != tail || m == null || h != head)
continue; // 這里就是為了防止閱讀不一致的問題
Object x = m.item;
if (isData == (x != null) || // 如果x=null說明已經被讀取了。
x == m || // x節點和m節點相等說明被中斷操作,被取消操作了。
!m.casItem(x, e)) { // 這里是將item值設置為null
advanceHead(h, m); // 移動頭結點到頭結點的下一個節點
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
首先獲取頭結點的下一個節點用於互補操作,也就是take操作,接下來進行閱讀不一致的判斷,防止其他線程進行了閱讀操作,接下來獲取需要彈出內容x=1,首先進行判斷節點內容是不是已經被消費了,節點內容為null時則代表被消費了,接下來判斷節點的item值是不是和本身相等如果相等話說明節點被取消了或者被中斷了,然后移動頭結點到下一個節點上,然后將refenrence-715
的item值修改為null,至於為什么修改為null這里留下一個懸念,這里還是比較重要的,大家看到這里的時候需要注意下
,顯然這些都不會成立,所以if語句中內容不會被執行,接下來的隊列的狀態是是這個樣子的:
OK,接下來就開始移動隊頭head了,將head移動到m節點上,執行代碼如下所示:
advanceHead(h, m);
此時隊列的狀態是這個樣子的:
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
接下來將執行喚醒被等待的線程,也就是thread-0,然后返回獲取item值1,take方法結束,但是這里並沒有結束,因為喚醒了put的線程,此時會切換到put方法中,這時候線程喚醒后會執行awaitFulfill
方法,此時循環時,有與item值修改為null則直接返回內容。
Object x = s.item;
if (x != e)
return x;
這里的代碼我們可以對照插入第一個元素
圖,s節點也就是當前m節點,獲取值得時候已經修改為null,但是當時插入的值時1,所以兩個不想等了,則直接返回null值。
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
又返回到了transfer方法的if語句中,此時x和s並不相等所以不用進行clean操作,首先判斷s節點是否已經離隊了,顯然並沒有進行離隊操作,advanceHead(t, s);
操作不會被執行因為上面已近將頭節點修改了,但是第一次插入的時候頭結點還是reference-716
,此時已經是reference-715
,而t節點的引用地址是reference-716
,所以不會操作,接下來就是將waiter設置為null,也就是忘記掉等待的線程。
分析了正常的take和put操作,接下來分析下中斷操作,由於中斷相應后,會被執行if(w.isInterrupted())
這段代碼,它會執行s.tryCancel(e)
方法,這個方法的作用的是將QNode節點的item節點賦值為當前QNode,這時候x和e值就不相等了( if (x != e)
),x的值是s.item,則為當前QNode,而e的值是用戶指定的值,這時候返回x(s.item)。返回到函數調用地方transfer
中,這時候要執行下面語句:
if (x == s) {
clean(t, s);
return null;
}
進入到clean方法執行清理當前節點,下面是方法clean代碼:
/**
* Gets rid of cancelled node s with original predecessor pred.
*/
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
/*
* At any given time, exactly one node on list cannot be
* deleted -- the last inserted node. To accommodate this,
* if we cannot delete s, we save its predecessor as
* "cleanMe", deleting the previously saved version
* first. At least one of node s or the node previously
* saved can always be deleted, so this always terminates.
*/
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
QNode t = tail; // Ensure consistent read for tail
if (t == h)
return;
QNode tn = t.next;
// 判斷現在的t是不是末尾節點,可能其他線程插入了內容導致不是最后的節點。
if (t != tail)
continue;
// 如果不是最后節點的話將其現在t.next節點作為tail尾節點。
if (tn != null) {
advanceTail(t, tn);
continue;
}
// 如果當前節點不是尾節點進入到這里面。
if (s != t) { // If not tail, try to unsplice
// 獲取當前節點(被取消的節點)的下一個節點。
QNode sn = s.next;
// 修改上一個節點的next(下一個)元素為下下個節點。
if (sn == s || pred.casNext(s, sn))
//返回。
return;
}
QNode dp = cleanMe;
if (dp != null) { // 嘗試清除上一個標記為清除的節點。
QNode d = dp.next; //1.獲取要被清除的節點
QNode dn;
if (d == null || // 被清除節點不為空
d == dp || // 被清除節點已經離隊
!d.isCancelled() || // 被清除節點是標記為Cancel狀態的。
(d != t && // 被清除節點不是尾節點
(dn = d.next) != null && // 被清除節點下一個節點不為null
dn != d && // that is on list
dp.casNext(d, dn))) // 將被清除的節點的前一個節點的下一個節點修改為被清除節點的下一個節點。
casCleanMe(dp, null); // 清空cleanMe節點。
if (dp == pred)
return; // s is already saved node
} else if (casCleanMe(null, pred)) // 這里將上一個節點標記為被清除操作,但是其實要操作的是下一個節點。
return; // Postpone cleaning s
}
}
- 如果節點中取消的頭結點的下一個節點,只需要移動當前head節點到下一個節點即可。
- 如果取消的是中間的節點,則將當前節點next節點修改為下下個節點。
- 如果修改為末尾的節點,則將當前節點放入到QNode的clearMe中,等待有內容進來之后下一次進行清除操作。
實例一:清除頭結點下一個節點,下面是實例代碼進行講解:
/**
* 清除頭結點的下一個節點實例代碼。
*
* @author battleheart
*/
public class SynchronousQueueDemo {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
SynchronousQueue<Integer> queue = new SynchronousQueue<>(true);
AtomicInteger atomicInteger = new AtomicInteger(0);
Thread thread1 = new Thread(() -> {
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread1.start();
Thread.sleep(200);
Thread thread2 = new Thread(() -> {
try {
queue.put(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread2.start();
Thread.sleep(2000);
thread1.interrupt();
}
}
上面例子說明我們啟動了兩個線程,分別向SynchronousQueue隊列中添加了元素1和元素2,添加成功之后的,讓主線程休眠一會,然后將第一個線程進行中斷操作,添加兩個元素后節點所處在的狀態為下圖所示:
當我們調用thread1.interrupt
時,此時線程1等待的消費操作將被終止,會相應上面awaitFulfill
方法,該方法會運行下面代碼:
if (w.isInterrupted())
//嘗試取消,將當前節點的item修改為當前節點(this)。
s.tryCancel(e);
// 獲取當前節點內容。
Object x = s.item;
// 判斷當前值和節點值不相同是返回,因為彈出時會將item值賦值為null。
if (x != e)
return x;
首先上來現將s節點(上圖中的Reference-715引用對象)的item節點設置為當前節點引用(Reference-715引用對象),所以s節點和e=1不相等則直接返回,此時節點的狀態變化如下所示:
退出awaitFulfill
並且返回的是s節點內容(實際上返回的就是s節點),接下來返回到調用awaitFulfill
的方法transfer
方法中
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // 是否是被取消了
clean(t, s);
return null;
}
首先判斷的事x節點和s節點是否相等,上面我們也說了明顯是相等的所以這里會進入到clean方法中,clean(QNode pred, QNode s)
clean方法一個是前節點,一個是當前被取消的節點,也就是當前s節點的前節點是head節點,接下來我們一步一步的分析代碼:
s.waiter = null; // 刪除等待的線程。
進入到方法體之后首先先進行的是將當前節點的等待線程刪除,如下圖所示:
接下來進入while循環,循環內容時pred.next == s
如果不是則表示已經移除了節點,反之還在隊列中,則進行下面的操作:
QNode h = head;
QNode hn = h.next; // 如果取消的是第一個節點則進入下面語句
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
可以看到首先h節點為head節點,hn為頭結點的下一個節點,在進行判斷頭結點的下一個節點不為空並且頭結點下一個節點是被中斷的節點(取消的節點),則進入到if語句中,if語句其實也很簡單就是將頭結點修改為頭結點的下一個節點(s節點,別取消節點,並且將前節點的next節點修改為自己,也就是移除了之前的節點,我們看下advanceHead方法:
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
首先上來先進行CAS移動頭結點,再講原來頭結點h的next節點修改為自己(h),為什么這樣做呢?因為上面進行advanceHead
之后並沒有退出循環,是進行continue操作,也就是它並沒有跳出while循環,他還會循環一次prev.next此時已經不能等於s所以退出循環,如下圖所示:
實例二:清除中間的節點
/**
* SynchronousQueue實例二,清除中間的節點。
*
* @author battleheart
*/
public class SynchronousQueueDemo {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
SynchronousQueue<Integer> queue = new SynchronousQueue<>(true);
AtomicInteger atomicInteger = new AtomicInteger(0);
Thread thread1 = new Thread(() -> {
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread1.start();
//休眠一會。
Thread.sleep(200);
Thread thread2 = new Thread(() -> {
try {
queue.put(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread2.start();
//休眠一會。
Thread.sleep(200);
Thread thread3 = new Thread(() -> {
try {
queue.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread3.start();
//休眠一會。
Thread.sleep(10000);
thread2.interrupt();
}
}
看上面例子,首先先進行put操作三次,也就是入隊3條數據,分別是整型值1,整型值2,整型值3,然后將當前線程休眠一下,對中間線程進行中斷操作,通過讓主線程休眠一會保證線程執行順序性(當然上面線程不一定能保證執行順序,因為put操作一下子就執行完了所以這點時間是可以的),此時隊列所處的狀態來看一下下圖:
當休眠一會之后,進入到threa2進行中斷操作,目前上圖中表示Reference-723
被中斷操作,此時也會進入到awaitFulfill
方法中,將Reference-723
的item節點修改為當前節點,如下圖所示:
進入到clear方法中此時的prev節點為Reference-715
,s節點是被清除節點,還是首先進入clear方法中先將waiter設置為null,取消當前線程內容,如下圖所示:
接下來進入到循環中,進行下面處理
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
QNode t = tail; // Ensure consistent read for tail
if (t == h)
return;
QNode tn = t.next;
if (t != tail)
continue;
if (tn != null) {
advanceTail(t, tn);
continue;
}
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))
return;
}
第一個if語句已經分析過了所以說這里不會進入到里面去,接下來是進行尾節點t是否是等於head節點如果相等則代表沒有元素,在判斷當前方法的t尾節點是不是真正的尾節點tail如果不是則進行修改尾節點,先來看一下現在的狀態:
tn != null
判斷如果tn不是尾節點,則將tn作為尾節點處理,如果處理之后還不是尾節點還會進行處理直到tail是尾節點未知,我們現在這個是尾節點所以跳過這段代碼。s != t
通過上圖可以看到s節點是被清除節點,並不是尾節點所以進入到循環中:
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))
return;
}
首先獲取的s節點的下一個節點,上圖中表示Reference-725
節點,判斷sn是都等於當前節點顯然這一條不成立,pred節點為Reference-715
節點,將715節點的next節點變成Reference-725
節點,這里就將原來的節點清理出去了,現在的狀態如下所示:
實例三:刪除的節點是尾節點
/**
* SynchronousQueue實例三,刪除的節點為尾節點
*
* @author battleheart
*/
public class SynchronousQueueDemo {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
SynchronousQueue<Integer> queue = new SynchronousQueue<>(true);
AtomicInteger atomicInteger = new AtomicInteger(0);
Thread thread1 = new Thread(() -> {
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread1.start();
Thread thread2 = new Thread(() -> {
try {
queue.put(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread2.start();
Thread.sleep(10000);
thread2.interrupt();
Thread.sleep(10000);
Thread thread3 = new Thread(() -> {
try {
queue.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread3.start();
Thread.sleep(10000);
thread3.interrupt();
}
}
該例子主要說明一個問題就是刪除的節點如果是末尾節點的話,clear
方法又是如何處理的,首先啟動了三個線程其中主線程休眠了一會,為了能讓插入的順序保持線程1,線程2,線程3這樣子,啟動第二個線程后,又將第二個線程中斷,這是第二個線程插入的節點為尾節點,然后再啟動第三個節點插入值,再中斷了第三個節點末尾節點,說一下為啥這樣操作,因為當清除尾節點時,並不是直接移除當前節點,而是將被清除的節點的前節點設置到QNode的CleanMe中,等待下次clear方法時進行清除上次保存在CleanMe的節點,然后再處理當前被中斷節點,將新的被清理的節點prev設置為cleanMe當中,等待下次進行處理,接下來一步一步分析,首先我們先來看一下第二個線程啟動后節點的狀態。
此時運行thread2.interrupt();
將第二個線程中斷,這時候會進入到clear方法中,前面的代碼都不會被返回,會執行下面的語句:
QNode dp = cleanMe;
if (dp != null) { // Try unlinking previous cancelled node
QNode d = dp.next;
QNode dn;
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
casCleanMe(dp, null);
if (dp == pred)
return; // s is already saved node
} else if (casCleanMe(null, pred))
return;
首先獲得TransferQueue當中cleanMe節點,此時獲取的為null,當判斷dp!=null時就會被跳過,直接執行
casCleanMe(null, pred)
此時pred傳入的值時t節點指向的內容,也就是當前節點的上一個節點,它會被標記為清除操作節點(其實並不清楚它而是清除它下一個節點,也就是說item=this的節點),此時看一下節點狀態為下圖所示:
接下來第三個線程啟動了這時候又往隊列中添加了元素3,此時隊列的狀況如下圖所示:
此時thread3也被中斷操作了,這時候還是運行上面的代碼,但是這次不同的點在於cleanMe已經不是空值,是有內容的,首先獲取的是cleanMe的下一個節點(d),然我來把變量標記在圖上然后看起來好分析一些,如下圖所示:
dp表示d節點的前一個pred節點,dn表示d節點的next節點,主要邏輯在這里:
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
casCleanMe(dp, null);
if (dp == pred)
return; // s
首先判斷d節點是不是為null,如果d節點為null代表已經清除掉了,如果cleanMe節點的下一個節點和自己相等,說明需要清除的節點已經離隊了,判斷下個節點是不是需要被清除的節點,目前看d節點是被清除的節點,然后就將被清除的節點的下一個節點賦值給dn並且判斷d節點是不是末尾節點,如果不是末尾節點則進行dp.casNext
方法,這個地方是關鍵點,它將被清除節點d的前節點的next節點修改為被清除節點d的后面節點dn,然后調用caseCleanMe將TransferQueue中的cleanMe節點清空,此時節點的內容如下所示:
可以看出將上一次標記為清除的節點清除了隊列中,清除完了就完事兒?那這次的怎么弄呢?因為現在運行的是thread3的中斷程序,所以上面並沒有退出,而是再次進入循環,循環之后發現dp為null則會運行casCleanMe(null, pred)
,此時當前節點s的前一個節點已經被清除隊列,但是並不影響后續的清除操作,因為前節點的next節點還在維護中,也是前節點的next指向還是reference-725
,如下圖所示:
就此分析完畢如果有不正確的地方請指正。