首先我們從java.util.concurrent.locks包中的AbstraceQueuedSynchronizer說起,在下文中稱為AQS。
AQS是一個用於構建鎖和同步器的框架。例如在並發包中的ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock等都是基於AOS構建,這些鎖都有一個特點,都不是直接擴展自AQS,而是都有一個內部類繼承自AQS。為什么會這么設計而不是直接繼承呢?簡而言之,鎖面向的是使用者,同步器面向的是線程控制,在鎖的實現中聚合同步器而不是直接繼承AQS很好的隔離了二者所關注的領域。
AbstractQueuedSynchronizer在內部依賴一個雙向同步隊列來完成同步狀態的管理,當前線程獲取同步狀態失敗時,同步器會將該線程和等待狀態信息構造成一個節點並將其加入到同步隊列中。Node節點以AQS的內部類存在,其字段屬性如下:
AbstractQueuedSynchronizer$Node |
|
屬性 |
描述 |
volatile int waitStatus |
等待狀態,並不是同步狀態,而是在隊列中的線程節點等待狀態(Node節點中一共定義四種狀態) CANCELLED = 1 //線程由於超時或被中斷會被取消在隊列中的等待,被取消了的線程不會再被阻塞,即狀態不會再改變 SIGNAL = -1 //后繼節點處於等待狀態,當前節點釋放鎖或者取消等待時會通知后繼節點 CONDITION = -2 //暫時忽略,涉及Condition PROPAGATE = -3 //下一次共享式同步狀態獲取將會無條件傳播下去 |
volatile Node prev |
前驅節點 |
volatile Node next |
后繼節點 |
volatile Thread thread |
保持對當前獲取同步狀態線程的引用 |
Node nextWaiter |
等待隊列中的后繼節點,同時也表示該節點是共享模式(SHARED)還是獨占(EXCLUSIVE)模式。它們共用一個字段。因為在等待隊列中的線程一定是獨占模式。所以如果nextWatiter == SHARED,那么表示該節點為共享模式。 |
在AQS同步器中由一個頭節點和尾節點來維護這個同步隊列。
AbstractQueuedAbatract |
|
屬性 |
描述 |
private transient volatile Node head |
同步隊列頭節點 |
private transient volatile Node tail |
同步隊列尾節點 |
以上內容我們需要知道一點的就是:同步器中是依靠一個同步隊列來完成的同步狀態管理,當線程獲取鎖(或者稱為同步狀態)失敗時,會將線程構造為一個Node節點新增到同步隊列的尾部。
在鎖的獲取當中,並不一定是只有一個線程才能持有這個鎖(或者稱為同步狀態),所以此時有了獨占模式和共享模式的區別,也就是在Node節點中由nextWait來標識。比如ReentrantLock就是一個獨占鎖,只能有一個線程獲得鎖,而WriteAndReadLock的讀鎖則能由多個線程同時獲取,但它的寫鎖則只能由一個線程持有。本章先介紹獨占模式下鎖(或者稱為同步狀態)的獲取與釋放,在此之前要稍微提一下“模板方法模式”,在AQS同步器中提供了不少的模板方法,關於模板方法模式可以移至《模板方法模式》,總結就是一句話:定義一個操作中的算法的骨架,而將一些步驟的實現延遲到子類中。
1)獨占模式同步狀態的獲取
AbstractQueuedSynchronizer |
|
方法 |
描述 |
public final void acquire(int arg) |
獨占模式下獲取同步狀態,忽略中斷,即表示無論如何也會在獲得同步狀態后才返回。 |
此方法即為一個模板方法,它的實現代碼如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire:AQS中有一個默認實現,其默認實現即拋出一個UnsupportedOperationException異常,意為默認下獨占模式是不支持此操作的。而這個操作在子類又是怎樣的呢?我們可以通過查看ReentrantLock中的Sync實現:
//ReentrantLock protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
可以看到在AQS的其中一個實現中,ReentrantLock$Sync對它進行了重寫,具體意義在這里不做討論。這個在AQS定義的方法表示該方法保證線程安全的獲取同步狀態,如果同步狀態獲取失敗(返回false)則構造同步節點並將節點加入到同步隊列的尾部,這個操作即是addWaiter方法的實現:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //將線程構造成Node節點。 /*嘗試強行直接掛到同步隊列的尾部*/ Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } /*如果此時有多個線程都在想把自己掛到同步隊列的尾部,上面的操作就會 失敗,此時將“無限期”線程安全的等待着掛到同步隊列的尾部*/ enq(node); return node; }
在enq的實現中實際就是一個for“死循環”,其目的就是直到成功地添加到同步隊列尾部才推出循環。
在獲取同步狀態失敗(tryAcqurie) ->構造節點(addWaiter)->添加到同步隊列尾部(addWaiter)過后,接下來就是一個很重要的操作acquireQueued自旋。這個動作很重要,其目的就在於每個節點都各自的在做判斷是否能獲取到同步狀態,每個節點都在自省地觀察,當條件滿足獲取到了同步狀態則可以從自旋過程中退出,否則繼續。
final boolean acquireQueued(final Node node, int qrg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { interrupted = true; } } } finally { if (failed) cancelAcquire(node); } }
從上面的代碼實現我們可以看到,盡管每個節點都在“無限期”的獲取鎖,但並不是每個節點能有獲取鎖的這個資格,而是當它的前驅節點是頭節點時才會去獲取鎖(tryAcquire)。當這個節點獲取同步狀態時,接下來的方法shouldParkAfterFailedAcquire則會判斷當前線程是否需要被阻塞,而這個判斷方法則是通過它的前驅節點的waitStatus判斷。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //首先獲取當前節點的前驅節點等待狀態 if (ws == Node.SIGNAL) //當前線程需要被阻塞,即需要被unpark(喚醒) return true; if (ws > 0) { //pred.waitStatus == CANCELLED do { node.prev = pred = pred.prev; //前驅節點等待狀態已經處於取消,即不會再獲取同步狀態時,把前驅節點從同步狀態中移除。 } while (pred.waitStatus > 0); pred.next = node; } else { //pred.waitStatus == CONDITION || PROPAGATE compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
如果調用該方法判斷為當前線程需要被阻塞(返回true),則接着執行parkAndCheckInterrupt阻塞當前線程,直到當前線程被喚醒的時候才從parkAndCheckInterrupt返回。
關於獨占模式獲取同步狀態可以總結為下面一段話:
AQS的模板方法acquire通過調用子類自定義實現的tryAcquire獲取同步狀態失敗后->將線程構造成Node節點(addWaiter)->將Node節點添加到同步隊列對尾(addWaiter)->每個節點以自旋的方法獲取同步狀態(acquirQueued)。在節點自旋獲取同步狀態時,只有其前驅節點是頭節點的時候才會嘗試獲取同步狀態,如果該節點的前驅不是頭節點或者該節點的前驅節點是頭節點單獲取同步狀態失敗,則判斷當前線程需要阻塞,如果需要阻塞則需要被喚醒過后才返回。
2).獨占模式同步狀態的釋放
AbstractQueuedSynchronizer |
|
方法 |
描述 |
public final boolean release(int arg) |
釋放同步狀態,並喚醒后繼節點 |
當線程獲取到了同步狀態並且執行了相應的邏輯過后,此時就應該釋放同步狀態。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); //喚醒頭節點的后繼節點 return true; } return false; }
AQS中的release釋放同步狀態和acquire獲取同步狀態一樣,都是模板方法,tryRelease釋放的具體操作都有子類去實現,父類AQS只提供一個算法骨架。
private void unparkSucessor(Node node) { int ws = node.waitStatus; if (ws < 0) //ws != CANCELLED compareAndSetWaitStatus(node, ws, 0); //利用CAS將當前線程的等待狀態置為CANCELLE Node s = node.next; if (s == null || s.waitSatatus > 0) { //如果當前線程的后繼節點為空,則從同步隊列的尾節點開始向前尋找當前線程的下一個不為空的節點 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = r; } if (s != null) LockSuport.unpark(s.thread); //如果當前線程的后繼節點不為空,則調用LockSuport.unpark喚醒其后繼節點,使得后繼節點得以重新嘗試獲取同步狀態 }
對AQS的源碼解讀才剛剛開始,本節只介紹了AQS在內部使用一個同步隊列來管理同步狀態,並且介紹了在AQS在模板方法模式的基礎上實現獨占模式同步狀態的獲取與釋放。下一節會繼續解讀AQS共享模式下同步狀態的獲取與釋放。