概述
AbstractQueuedSynchronizer抽象隊列同步器簡稱AQS,它是實現同步器的基礎組件,juc下面Lock的實現以及一些並發工具類就是通過AQS來實現的,這里我們通過AQS的類圖先看一下大概,下面我們總結一下AQS的實現原理。先看看AQS的類圖。
(1)AQS是一個通過內置的FIFO雙向隊列來完成線程的排隊工作(內部通過結點head和tail記錄隊首和隊尾元素,元素的結點類型為Node類型,后面我們會看到Node的具體構造)。
/*等待隊列的隊首結點(懶加載,這里體現為競爭失敗的情況下,加入同步隊列的線程執行到enq方法的時候會創
建一個Head結點)。該結點只能被setHead方法修改。並且結點的waitStatus不能為CANCELLED*/
private transient volatile Node head;
/**等待隊列的尾節點,也是懶加載的。(enq方法)。只在加入新的阻塞結點的情況下修改*/
private transient volatile Node tail;
(2)其中Node中的thread用來存放進入AQS隊列中的線程引用,Node結點內部的SHARED表示標記線程是因為獲取共享資源失敗被阻塞添加到隊列中的;Node中的EXCLUSIVE表示線程因為獲取獨占資源失敗被阻塞添加到隊列中的。waitStatus表示當前線程的等待狀態
①CANCELLED=1:表示線程因為中斷或者等待超時,需要從等待隊列中取消等待;
②SIGNAL=-1:當前線程thread1占有鎖,隊列中的head(僅僅代表頭結點,里面沒有存放線程引用)的后繼結點node1處於等待狀態,如果已占有鎖的線程thread1釋放鎖或被CANCEL之后就會通知這個結點node1去獲取鎖執行。
③CONDITION=-2:表示結點在等待隊列中(這里指的是等待在某個lock的condition上,關於Condition的原理下面會寫到),當持有鎖的線程調用了Condition的signal()方法之后,結點會從該condition的等待隊列轉移到該lock的同步隊列上,去競爭lock。(注意:這里的同步隊列就是我們說的AQS維護的FIFO隊列,等待隊列則是每個condition關聯的隊列)
④PROPAGTE=-3:表示下一次共享狀態獲取將會傳遞給后繼結點獲取這個共享同步狀態。
(3)AQS中維持了一個單一的volatile修飾的狀態信息state(AQS通過Unsafe的相關方法,以原子性的方式由線程去獲取這個state)。AQS提供了getState()、setState()、compareAndSetState()函數修改值(實際上調用的是unsafe的compareAndSwapInt方法)。下面是AQS中的部分成員變量以及更新state的方法
//這就是我們剛剛說到的head結點,懶加載的(只有競爭失敗需要構建同步隊列的時候,才會創建這個head),如果頭節點存在,它的waitStatus不能為CANCELLED
private transient volatile Node head;
//當前同步隊列尾節點的引用,也是懶加載的,只有調用enq方法的時候會添加一個新的wait node
private transient volatile Node tail;
//AQS核心:同步狀態
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
(4)AQS的設計師基於模板方法模式的。使用時候需要繼承同步器並重寫指定的方法,並且通常將子類推薦為定義同步組件的靜態內部類,子類重寫這些方法之后,AQS工作時使用的是提供的模板方法,在這些模板方法中調用子類重寫的方法。其中子類可以重寫的方法:
//獨占式的獲取同步狀態,實現該方法需要查詢當前狀態並判斷同步狀態是否符合預期,然后再進行CAS設置同步狀態
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
//獨占式的釋放同步狀態,等待獲取同步狀態的線程可以有機會獲取同步狀態
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
//共享式的獲取同步狀態
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
//嘗試將狀態設置為以共享模式釋放同步狀態。 該方法總是由執行釋放的線程調用。
protected int tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
//當前同步器是否在獨占模式下被線程占用,一般該方法表示是否被當前線程所獨占
protected int isHeldExclusively(int arg) { throw new UnsupportedOperationException();}
(5)AQS的內部類ConditionObject是通過結合鎖實現線程同步,ConditionObject可以直接訪問AQS的變量(state、queue),ConditionObject是個條件變量 ,每個ConditionObject對應一個隊列用來存放線程調用condition條件變量的await方法之后被阻塞的線程。
非公平鎖的加鎖流程
AQS會把所有的請求線程構成一個CLH隊列,當一個線程執行完畢(lock.unlock())時會激活自己的后繼節點,但正在執行的線程並不在隊列中,而那些等待執行的線程全部處於阻塞狀態(park())。如下圖所示。
(1)假設這個時候在初始情況下,還沒有多任務來請求競爭這個state,這時候如果第一個線程thread1調用了lock方法請求獲得鎖,首先會通過CAS的方式將state更新為1,表示自己thread1獲得了鎖,並將獨占鎖的線程持有者設置為thread1。
final void lock() {
if (compareAndSetState(0, 1))
//setExclusiveOwnerThread是AbstractOwnableSynchronizer的方法,AQS繼承了AbstractOwnableSynchronizer
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
(2)這個時候有另一個線程thread2來嘗試或者鎖,同樣也調用lock方法,嘗試通過CAS的方式將state更新為1,但是由於之前已經有線程持有了state,所以thread2這一步CAS失敗(前面的thread1已經獲取state並且沒有釋放),就會調用acquire(1)方法(該方法是AQS提供的模板方法,它會調用子類的tryAcquire方法)。非公平鎖的實現中,AQS的模板方法acquire(1)就會調用NofairSync的tryAcquire方法,而tryAcquire方法又調用的Sync的nonfairTryAcquire方法,所以我們看看nonfairTryAcquire的流程。
//NofairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//(1)獲取當前線程
final Thread current = Thread.currentThread();
//(2)獲得當前同步狀態state
int c = getState();
//(3)如果state==0,表示沒有線程獲取
if (c == 0) {
//(3-1)那么就嘗試以CAS的方式更新state的值
if (compareAndSetState(0, acquires)) {
//(3-2)如果更新成功,就設置當前獨占模式下同步狀態的持有者為當前線程
setExclusiveOwnerThread(current);
//(3-3)獲得成功之后,返回true
return true;
}
}
//(4)這里是重入鎖的邏輯
else if (current == getExclusiveOwnerThread()) {
//(4-1)判斷當前占有state的線程就是當前來再次獲取state的線程之后,就計算重入后的state
int nextc = c + acquires;
//(4-2)這里是風險處理
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//(4-3)通過setState無條件的設置state的值,(因為這里也只有一個線程操作state的值,即
//已經獲取到的線程,所以沒有進行CAS操作)
setState(nextc);
return true;
}
//(5)沒有獲得state,也不是重入,就返回false
return false;
}
總結來說就是:
1、獲取當前將要去獲取鎖的線程thread2。
2、獲取當前AQS的state的值。如果此時state的值是0,那么我們就通過CAS操作獲取鎖,然后設置AQS的線程占有者為thread2。很明顯,在當前的這個執行情況下,state的值是1不是0,因為我們的thread1還沒有釋放鎖。所以CAS失敗,后面第3步的重入邏輯也不會進行
3、如果當前將要去獲取鎖的線程等於此時AQS的exclusiveOwnerThread的線程,則此時將state的值加1,這是重入鎖的實現方式。
4、最終thread2執行到這里會返回false。
(3)上面的thread2加鎖失敗,返回false。那么根據開始我們講到的AQS概述就應該將thread2構造為一個Node結點加入同步隊列中。因為NofairSync的tryAcquire方法是由AQS的模板方法acquire()來調用的,那么我們看看該方法的源碼以及執行流程。
//(1)tryAcquire,這里thread2執行返回了false,那么就會執行addWaiter將當前線程構造為一個結點加入同步隊列中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
那么我們就看一下addWaiter方法的執行流程。
private Node addWaiter(Node mode) {
//(1)將當前線程以及阻塞原因(是因為SHARED模式獲取state失敗還是EXCLUSIVE獲取失敗)構造為Node結點
Node node = new Node(Thread.currentThread(), mode);
//(2)這一步是快速將當前線程插入隊列尾部
Node pred = tail;
if (pred != null) {
//(2-1)將構造后的node結點的前驅結點設置為tail
node.prev = pred;
//(2-2)以CAS的方式設置當前的node結點為tail結點
if (compareAndSetTail(pred, node)) {
//(2-3)CAS設置成功,就將原來的tail的next結點設置為當前的node結點。這樣這個雙向隊
//列就更新完成了
pred.next = node;
return node;
}
}
//(3)執行到這里,說明要么當前隊列為null,要么存在多個線程競爭失敗都去將自己設置為tail結點,
//那么就會有線程在上面(2-2)的CAS設置中失敗,就會到這里調用enq方法
enq(node);
return node;
}
那么總結一下add Waiter方法
1、將當前將要去獲取鎖的線程也就是thread2和獨占模式封裝為一個node對象。
2、嘗試快速的將當前線程構造的node結點添加作為tail結點(這里就是直接獲取當前tail,然后將node的前驅結點設置為tail),並且以CAS的方式將node設置為tail結點(CAS成功后將原tail的next設置為node,然后這個隊列更新成功)。
3、如果2設置失敗,就進入enq方法。
在剛剛的thread1和thread2的環境下,開始時候線程阻塞隊列是空的(因為thread1獲取了鎖,thread2也是剛剛來請求鎖,所以線程阻塞隊列里面是空的)。很明顯,這個時候隊列的尾部tail節點也是null,那么將直接進入到enq方法。所以我們看看enq方法的實現
private Node enq(final Node node) {
for (;;) {
//(4)還是先獲取當前隊列的tail結點
Node t = tail;
//(5)如果tail為null,表示當前同步隊列為null,就必須初始化這個同步隊列的head和tail(建
//立一個哨兵結點)
if (t == null) {
//(5-1)初始情況下,多個線程競爭失敗,在檢查的時候都發現沒有哨兵結點,所以需要CAS的
//設置哨兵結點
if (compareAndSetHead(new Node()))
tail = head;
}
//(6)tail不為null
else {
//(6-1)直接將當前結點的前驅結點設置為tail結點
node.prev = t;
//(6-2)前驅結點設置完畢之后,還需要以CAS的方式將自己設置為tail結點,如果設置失敗,
//就會重新進入循環判斷一遍
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq方法內部是一個自旋循環,第一次循環默認情況如下圖所示
1、首先代碼塊(4)處將t指向了tail,判斷得到t==null,如圖(1)所示;
2、於是需要新建一個哨兵結點作為整個同步隊列的頭節點(代碼塊5-1處執行)
3、完了之后如圖(2)所示。這樣第一次循環執行完畢。
第二次循環整體執行如下圖所示。
1、還是先獲取當前tail結點然后將t指向tail結點。如下圖的(3)
2、然后判斷得到當前t!=null,所以enq方法中進入代碼塊(6).
3、在(6-1)代碼塊中將node的前驅結點設置為原來隊列的tail結點,如下圖的(4)所示。
4、設置完前驅結點之后,代碼塊(6-2)會以CAS的方式將當前的node結點設置為tail結點,如果設置成功,就會是下圖(5)所示。更新完tail結點之后,需要保證雙向隊列的,所以將原來的指向哨兵結點的t的next結點指向node結點,如下圖(6)所示。最后返回
總結來說,即使在多線程情況下,enq方法還是能夠保證每個線程結點會被安全的添加到同步隊列中,因為enq通過CAS方式將結點添加到同步隊列之后才會返回,否則就會不斷嘗試添加(這樣實際上就是在並發情況下,把向同步隊列添加Node變得串行化了)
(4)在上面AQS的模板方法中,acquire()方法還有一步acquireQueued,這個方法的主要作用就是在同步隊列中嗅探到自己的前驅結點,如果前驅結點是頭節點的話就會嘗試取獲取同步狀態,否則會先設置自己的waitStatus為-1,然后調用LockSupport的方法park自己。具體的實現如下面代碼所示
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//在這樣一個循環中嘗試tryAcquire同步狀態
for (;;) {
//獲取前驅結點
final Node p = node.predecessor();
//(1)如果前驅結點是頭節點,就嘗試取獲取同步狀態,這里的tryAcquire方法相當於還是調
//用NofairSync的tryAcquire方法,在上面已經說過
if (p == head && tryAcquire(arg)) {
//如果前驅結點是頭節點並且tryAcquire返回true,那么就重新設置頭節點為node
setHead(node);
p.next = null; //將原來的頭節點的next設置為null,交由GC去回收它
failed = false;
return interrupted;
}
//(2)如果不是頭節點,或者雖然前驅結點是頭節點但是嘗試獲取同步狀態失敗就會將node結點
//的waitStatus設置為-1(SIGNAL),並且park自己,等待前驅結點的喚醒。至於喚醒的細節
//下面會說到
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在上面的代碼中我們可以看出,這個方法也是一個自旋循環,繼續按照剛剛的thread1和thread2這個情況分析。在enq方法執行完之后,同步隊列的情況大概如下所示。
當前的node結點的前驅結點為head,所以會調用tryAcquire()方法去獲得同步狀態。但是由於state被thread1占有,所以tryAcquire失敗。這里就是執行acquireQueued方法的代碼塊(2)了。代碼塊(2)中首先調用了shouldParkAfterFailedAcquire方法,該方法會將同步隊列中node結點的前驅結點的waitStatus為CANCELLED的線程移除,並將當前調用該方法的線程所屬結點自己和他的前驅結點的waitStatus設置為-1(SIGNAL),然后返回。具體方法實現如下所示。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//(1)獲取前驅結點的waitStatus
int ws = pred.waitStatus;
//(2)如果前驅結點的waitStatus為SINGNAL,就直接返回true
if (ws == Node.SIGNAL)
//前驅結點的狀態為SIGNAL,那么該結點就能夠安全的調用park方法阻塞自己了。
return true;
if (ws > 0) {
//(3)這里就是將所有的前驅結點狀態為CANCELLED的都移除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//CAS操作將這個前驅節點設置成SIGHNAL。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
所以shouldParkAfterFailedAcquire方法執行完畢,現在的同步隊列情況大概就是這樣子,即哨兵結點的waitStatus值變為-1。
上面的執行完畢返回到acquireQueued方法的時候,在acquireQueued方法中就會進行第二次循環了,但是還是獲取state失敗,而當再次進入shouldParkAfterFailedAcquire方法的時候,當前結點node的前驅結點head的waitStatus已經為-1(SIGNAL)了,就會返回true,然后acquireQueued方法中就會接着執行parkAndCheckInterrupt將自己park阻塞掛起。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
(5)我們梳理一下整個方法調用的流程,假設現在又有一個thread3線程競爭這個state,那么這個方法調用的流程是什么樣的呢。
①首先肯定是調用ReentrantLock.lock()方法去嘗試加鎖;
②因為是非公平鎖,所以就會轉到調用NoFairSync.lock()方法;
③在NoFairSync.lock()方法中,會首先嘗試設置state的值,因為已經被占有那么肯定就是失敗的。這時候就會調用AQS的模板方法AQS.acquire(1)。
④在AQS的模板方法acquire(1)中,實際首先會調用的是子類的tryAcquire()方法,而在非公平鎖的實現中即Sync.nofairTryAcquire()方法。
⑤顯然tryAcquire()會返回false,所以acquire()繼續執行,即調用AQS.addWaiter(),就會將當前線程構造稱為一個Node結點,初始狀況下waitStatus為0。
⑥在addWaiter方法中,會首先嘗試直接將構建的node結點以CAS的方式(存在多個線程嘗試將自己設置為tail)設置為tail結點,如果設置成功就直接返回,失敗的話就會進入一個自旋循環的過程。即調用enq()方法。最終保證自己成功被添加到同步隊列中。
⑦加入同步隊列之后,就需要將自己掛起或者嗅探自己的前驅結點是否為頭結點以便嘗試獲取同步狀態。即調用acquireQueued()方法。
⑧在這里thread3的前驅結點不是head結點,所以就直接調用shouldParkAfterFailedAcquire()方法,該方法首先會將剛剛的thread2線程結點中的waitStatue的值改變為-1(初始的時候是沒有改變這個waitStatus的,每個新節點的添加就會改變前驅結點的waitStatus值)。
⑨thread2所在結點的waitStatus改變后,shouldParkAfterFailedAcquire方法會返回false。所以之后還會在acquireQueued中進行第二次循環。並再次調用shouldParkAfterFailedAcquire方法,然后返回true。最終調用parkAndCheckInterrupt()將自己掛起。
每個線程去競爭這個同步狀態失敗的話大概就會經歷上面的這些過程。假設現在thread3經歷上面這些過程之后也進入同步隊列,那么整個同步隊列大概就是下面這樣了.
將上面的流程整理一下大概就是下面這個圖
非公平鎖的釋放流程
上面說一ReentrantLock為例到了怎樣去獲得非公平鎖,那么thread1獲取鎖,執行完釋放鎖的流程是怎樣的呢。首先肯定是在finally中調用ReentrantLock.unlock()方法,所以我們就從這個方法開始看起。
(1)從下面的unlock方法中我們可以看出,實際上是調用AQS的release()方法,其中傳遞的參數為1,表示每一次調用unlock方法都是釋放所獲得的一次state。重入的情況下會多次調用unlock方法,也保證了lock和unlock是成對的。
public void unlock() {
sync.release(1); //這里ReentrantLock的unlock方法調用了AQS的release方法
}
public final boolean release(int arg) {
//這里調用了子類的tryRelease方法,即ReentrantLock的內部類Sync的tryRelease方法
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
(2)上面看到release方法首先會調用ReentrantLock的內部類Sync的tryRelease方法。而通過下面代碼的分析,大概知道tryRelease做了這些事情。
①獲取當前AQS的state,並減去1;
②判斷當前線程是否等於AQS的exclusiveOwnerThread,如果不是,就拋IllegalMonitorStateException異常,這就保證了加鎖和釋放鎖必須是同一個線程;
③如果(state-1)的結果不為0,說明鎖被重入了,需要多次unlock,這也是lock和unlock成對的原因;
④如果(state-1)等於0,我們就將AQS的ExclusiveOwnerThread設置為null;
⑤如果上述操作成功了,也就是tryRelase方法返回了true;返回false表示需要多次unlock。
protected final boolean tryRelease(int releases) {
//(1)獲取當前的state,然后減1,得到要更新的state
int c = getState() - releases;
//(2)判斷當前調用的線程是不是持有鎖的線程,如果不是拋出IllegalMonitorStateException
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//(3)判斷更新后的state是不是0
if (c == 0) {
free = true;
//(3-1)將當前鎖持者設為null
setExclusiveOwnerThread(null);
}
//(4)設置當前state=c=getState()-releases
setState(c);
//(5)只有state==0,才會返回true
return free;
}
(3)那么當tryRelease返回true之后,就會執行release方法中if語句塊中的內容。從上面我們看到,
if (tryRelease(arg)) {
//(1)獲取當前隊列的頭節點head
Node h = head;
//(2)判斷頭節點不為null,並且頭結點的waitStatus不為0(CACCELLED)
if (h != null && h.waitStatus != 0)
//(3-1)調用下面的方法喚醒同步隊列head結點的后繼結點中的線程
unparkSuccessor(h);
return true;
}
(4)在獲取鎖的流程分析中,我們知道當前同步隊列如下所示,所以判斷得到head!=null並且head的waitStatus=-1。所以會執行unparkSuccessor方法,傳遞的參數為指向head的一個引用h.那下面我們就看看unparkSuccessor方法中處理了什么事情。
private void unparkSuccessor(Node node) {
//(1)獲得node的waitStatus
int ws = node.waitStatus;
//(2)判斷waitStatus是否小於0
if (ws < 0)
//(2-1)如果waitStatus小於0需要將其以CAS的方式設置為0
compareAndSetWaitStatus(node, ws, 0);
//(2)獲得s的后繼結點,這里即head的后繼結點
Node s = node.next;
//(3)判斷后繼結點是否已經被移除,或者其waitStatus==CANCELLED
if (s == null || s.waitStatus > 0) {
//(3-1)如果s!=null,但是其waitStatus=CANCELLED需要將其設置為null
s = null;
//(3-2)會從尾部結點開始尋找,找到離head最近的不為null並且node.waitStatus的結點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//(4)node.next!=null或者找到的一個離head最近的結點不為null
if (s != null)
//(4-1)喚醒這個結點中的線程
LockSupport.unpark(s.thread);
}
從上面的代碼實現中可以總結,unparkSuccessor主要做了兩件事情:
①獲取head節點的waitStatus,如果小於0,就通過CAS操作將head節點的waitStatus修改為0
②尋找head節點的下一個節點,如果這個節點的waitStatus小於0,就喚醒這個節點,否則遍歷下去,找到第一個waitStatus<=0的節點,並喚醒。
(5)下面我們應該分析的是釋放掉state之后,喚醒同步隊列中的結點之后程序又是是怎樣執行的。按照上面的同步隊列示意圖,那么下面會執行這些
①thread1(獲取到鎖的線程)調用unlock方法之后,最終執行到unparkSuccessor方法會喚醒thread2結點。所以thread2被unpark。
②再回想一下,當時thread2是在調用acquireQueued方法之后的parkAndCheckInterrupt里面被park阻塞掛起了,所以thread2被喚醒之后繼續執行acquireQueued方法中的for循環(到這里可以往前回憶看一下acquireQueued方法中的for循環做了哪些事情);
③for循環中做的第一件事情就是查看自己的前驅結點是不是頭結點(按照上面的同步隊列情況是滿足的);
④前驅結點是head結點,就會調用tryAcquire方法嘗試獲取state,因為thread1已經釋放了state,即state=0,所以thread2調用tryAcquire方法時候,以CAS的方式去將state從0更新為1是成功的,所以這個時候thread2就獲取到了鎖
⑤thread2獲取state成功,就會從acquireQueued方法中退出。注意這時候的acquireQueued返回值為false,所以在AQS的模板方法的acquire中會直接從if條件退出,最后執行自己鎖住的代碼塊中的程序。
varhandle
jdk1.9新增的,可以實現原子性操作
原文地址:https://www.cnblogs.com/fsmly/p/11274572.html#_labelTop