走近AbstractQueuedSynchronizer
一、從類結構開始
Java並發包中的同步器是很多並發組件的基礎,如各種Lock,ConcurrentHashMap中的Segment,阻塞隊列,CountDownLatch等。按我們一貫的風格,讓我們直接走近設計者對其的詮釋。
在java.util.concurrent.locks包中, AbstractQueuedSynchronizer直接繼承自AbstractOwnableSynchronizer,在接下來的文字中有時會簡寫為AQS.
1. AbstractOwnableSynchronizer
AbstractOwnableSynchronizer是一種可由一個線程獨占的同步器,同時也是創建鎖的基礎,它還包含了一種叫所有權的概念。AbstractOwnableSynchronizer本身不管理內部數據,但是它的子類可以用來維護一些值並用於控制或監視線程的訪問。
AbstractOwnableSynchronizer內部只有一個屬性:獨占當前同步狀態的線程和該屬性的set/get方法。
代碼如下:
public abstract class AbstractOwnableSynchronizer { private transient Thread exclusiveOwnerThread; }
2. AbstractQueuedSynchronizer
AbstractQueuedSynchronizer提供了一種框架,用於實現阻塞鎖和其他基於先入先出(FIFO)等待隊列的同步組件。該類用一個Int類型的原子變量來表示一種狀態。子類必須實現該類的protect方法,以此來改變同步狀態。在獲取或釋放該狀態時,需要定義這個狀態值。類中的其他方法實現了線程入隊與阻塞功能,子類依然可以維護其他狀態字段,但是只能使用getState、setState、compareAndSetState方法來跟蹤同步狀態。
子類應該定義為非公共的內部工具類,並需要在類中實現相應的同步方法。AbstractQueuedSynchronizer本身沒有實現任何接口,支持獨占式與共享式的獲取同步狀態。如果是獨占模式,那么其他線程則不能獲取到,而共享式則允許多個線程同時獲取。兩種不同模式下的等待線程共享同一個隊列,通常實現的子類只支持一種模式,但是也有兩種都支持的,如ReadWriteLock。僅支持獨占或共享的子類可以不用實現對應模式所定義的方法。
AbstractQueuedSynchronizer類中定義了一個嵌套類ConditionObject。ConditionObject主要提供一種條件,由子類決定是否支持獨占模式,並由isHeldExclusively方法決定當前線程是否是獨占的獲取同步狀態。
除此,類中還定義了一些方法,用於檢查、監控內部隊列與條件對象。
二、隊列節點
正式走近AbstractQueuedSynchronizer。
在AbstractQueuedSynchronizer內部,有一個靜態的Node內部類,Doug對他解釋如下:
等待隊列節點
等待隊列是一種“CLH(自旋鎖)”鎖隊列。我們用自旋鎖來實現阻塞同步器,但用的是同樣的策略來控制一個線程的前驅節點的信息。每個節點中的status字段記錄了一個線程是否已阻塞。當一個節點的前驅節點釋放鎖后會以信號的形式通知該節點,隊列的每個結點作為一個特定通知風格(specific-notification-style)的監視器服務,會持有一個單獨的等待線程,但是status字段不會控制是否線程能被賦予鎖。如果一個線程是第一個進入隊列的節點,他就可以嘗試獲取鎖,但是也不能保證獲取成功,只是有了競爭的權利。所以當前釋放鎖的競爭者線程可能需要再次等待。
為了進入CLH鎖隊列,你只需要原子的拼接成一個尾節點。要出隊列的話,你僅需要設置head字段即可。
+------+ prev +-----+ +-----+ head | | <---- | | <---- | | tail +------+ +-----+ +-----+
插入節點到CLH隊列要求在tail節點上是原子性的操作,未到隊列的節點與入隊的節點之間的界定就是是否有一個簡單的原子指向操作執行該節點。類似的,節點出隊牽涉到操作的就是更新head節點。然而,對於節點來說卻需要花很多功夫來決定他們的后繼結點是什么,處理一部分因超時或中斷而導致的取消。
prev鏈向符主要是用於處理取消,如果一個節點被取消后,他的后繼節點可以重新鏈向一個有效的前驅節點。(想要了解自旋鎖的更多說明可參考Scott and Scherer的論文)
我們還使用了next鏈向符,用於實現阻塞的原理。每個節點里保留了一個線程的Id,因此一個前驅節點可以通過遍歷next節點來找到具體的線程然后喚醒next節點。決定后繼節點時需要避免與新入隊的節點競爭去設置他們前驅節點的next字段。
取消節點采用一些保守的算法。由於我們必需要根據某節點來輪詢取消,因此可能會錯過在之前或之后的節點。在執行取消時,會喚醒他的后繼節點,並允許他們穩定在一個新的前驅節點上。
CLH隊列需要一個虛擬的head節點來開始,但不會在構造方法中創建他,因為如果沒有競爭那么會很浪費。相反,在創建節點時遇到第一次競爭時才會設置head和tail節點。
等待線程使用的也是同樣的節點,只不過用的是額外的鏈向符。條件是用來鏈接隊列的,線程在等待時,就會新增一個節點到條件隊列中,再被得到通知時,該節點就轉入到主隊列中。節點用一個特殊的狀態值來表示在哪個隊列中。
三、節點狀態
類上的注釋說完了,開始說說類本身吧。從Node開始。
static final class Node { //靜態內部Final類 //標記符,表示在共享模式下的等待節點 static final Node SHARED = new Node(); //標記符,表示在獨占模式下的等待節點 static final Node EXCLUSIVE = null; //等待狀態值,表示一個線程已經被取消了 static final int CANCELLED = 1; //等待狀態值,表示一個后繼節點的線程需要喚醒 static final int SIGNAL = -1; //等待狀態值,表示線程等待在某種條件下 static final int CONDITION = -2; //等待狀態值,表示下一次共享式獲取狀態的操作應該無條件的傳播 static final int PROPAGATE = -3; /*** 狀態字段,取值如下: SIGNAL: 當前結點的后繼節點將會是阻塞的(通過park方法),因此當前結點需要喚醒他的后繼節點,當他釋放或取消后。為了避免競爭,獲取同步狀態的方法必須搶先表示自己需要信號,然后重新原子的獲取。最后可能是獲取失敗,或者再次被阻塞。 CANCELLED: 由於超時、中斷等原因,當前結點會被取消。取消后,節點不會釋放狀態。特殊情景下,被取消的節點中的線程將不會再被阻塞 CONDITION: 當前結點在一個條件隊列中,再被轉移之前將不會被作為同步節點。被轉移時該值會被設置為0。 PROPAGATE: 共享式方式釋放同步狀態后應該被傳播到其他節點。這種設置(僅對head節點)在doReleaseShared方法中確保了可以持續,及時有其他的干預操作。 0: 非以上狀態 **/ volatile int waitStatus; /** 當前節點(線程)依賴於等待的狀態值而鏈向的前驅節點。在入隊列時被賦值,在出隊時被置空(讓GC回收)。 */ volatile Node prev; /** 當前結點(線程)在釋放同步狀態后會喚醒的后繼節點 */ volatile Node next; //節點關聯的線程,構造時被初始化、用完后置空 volatile Thread thread; /** 鏈向的下一個等待節點,或是一個特殊值SHARED.由於只能是獨占式的訪問條件隊列,所以只需簡單的鏈向隊列就行了。又由於條件只能是獨占式的獲取,我們保留了一個字段並使用特殊的值來表示共享模式。 **/ Node nextWaiter; //如果節點是以共享模式在等待,則返回true final boolean isShared() {return nextWaiter == SHARED;} 一組構造方法 Node (){} Node(Thread thread, Node mode) { // 添加一個等待節點時,可使用 this.nextWaiter = mode; this.thread = thread; } //添加一個依賴於某條件的節點時,可使用 Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
四、類成員與幾個方法
AbstractQueuedSynchronizer定義的重要的屬性和方法如下:
//等待隊列中的頭節點,會延遲初始化。只能通過setHead方法修改頭節點。注意:如果頭節點存在,需要保證他的waitStatus不能是CANCELLED。
private transient volatile Node head; //等待隊列中的尾節點,會延遲初始化。只能通過enq方法來添加一個新的等待節點。 private transient volatile Node tail; //同步狀態 private volatile int state; //返回當前的同步狀態值。該操作擁有volatile讀的內存語義。 protected final int getState() { return state; } //設置同步狀態的值 protected final void setState(int newState) { state = newState; } //如果當前的狀態值等於預期的值,則原子的設置同步狀態值為給定的update值,設置成功后返回true.如果實際的值不等於預期的expect值,則返回false protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
看看一些隊列操作的工具方法。
/**
新增節點到隊列中。 **/ private Node enq(final Node node) { for (;;) { Node t = tail; //先驗證尾節點是否為空 if (t == null) { // 如果為空,則必須初始化 if (compareAndSetHead(new Node())) tail = head; //第一次入隊時,頭節點和尾節點相同 } else { node.prev = t; if (compareAndSetTail(t, node)) { //原子的設置尾節點為當前結點,並鏈接好前后節點 t.next = node; return t; } } } } /*** 用當前的線程和給定的模式來創建一個節點,並加入到隊列中。 mode為Node.EXCLUSIVE表示獨占式,為Node.SHARED表示共享式 **/ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 嘗試快速入隊,如果失敗則候補到全隊列中 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** 設置隊列中的head節點為給的的node,從而出隊列。僅會被acquire方法調用。 */ private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } /*** 如果存在后繼節點,則喚醒該節點 **/ private void unparkSuccessor(Node node) { /* 如果狀態是負數(表示需要一個信號),先搶先設置狀態為0,表示自己需要信號。 當然也可能獲取失敗,然后則進入等待隊列 */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* 正常情況下,直接喚醒后繼節點。但是后繼節點是空的或被取消了,則從尾節點開始遍歷出一個沒有被取消的節點 */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t