AQS源碼分析
AQS全稱AbstractQueuedSynchronizer(抽象隊列同步器)
AQS中維護了一個被volatile修飾的int類型的同步狀態state,以及CLH等待隊列。
state同步狀態用於維護同步資源被使用的情況,AQS本身並不關心state的值及其含義,完全由AQS的子類去定義以及維護。
CLH等待隊列是由一個雙向鏈表來實現的,存在head和tail指針分別指向鏈表中的頭節點以及尾節點,同時鏈表中的節點由AQS中的Node靜態內部類來表示。
ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore底層都是基於AQS來實現的。
AQS支持的模式
AQS支持兩種模式,一種是獨占模式,一種是共享模式。
獨占模式表示,同步資源在同一時刻只能被一個線程所持有,對應AQS的acquire()以及release()方法。
共享模式表示,同步資源在同一時刻可以被多個線程所持有,對應AQS的acquireShared()以及releaseShared()方法。
acquire()方法:獨占模式下獲取同步資源。
release()方法:獨占模式下釋放同步資源。
acquireShared()方法:共享模式下獲取同步資源。
releaseShared()方法:共享模式下釋放同步資源。
AQS使用了模板方法設計模式,在acquire()、release()、acquireShared()、releaseShared()方法中都會調用其對應的try方法,比如acquire()方法中會調用tryAcquire()方法,release()方法中會調用tryRelease()方法,AQS子類只需要重寫AQS提供的tryAcquire()、tryRelease()或tryAcquireShared()、tryReleaseShared()方法即可,只需要告訴AQS嘗試獲取同步資源或嘗試釋放同步資源是否成功,同時需要保證方法的實現是線程安全的。
tryAcquire()、tryRelease()、tryAcquireShared()、tryReleaseShared()方法都沒有使用abstract進行修飾,同時方法中都會直接拋出UnsupportedOperationException異常,好處是不需要強制子類同時實現獨占模式和共享模式中的方法,因為大多數AQS的子類都僅支持一種模式,用戶只需要根據實際情況進行選擇即可。
tryAcquire(int arg)方法:獨占模式下嘗試獲取同步資源,同時AQS規定,如果獲取同步資源成功則返回true,否則返回false。
tryRelease(int arg)方法:獨占模式下嘗試釋放同步資源,同時AQS規定,如果釋放同步資源成功則返回true,否則返回false。
tryAcquireShared(int arg)方法:共享模式下嘗試獲取同步資源,同時AQS規定,如果獲取同步資源成功則返回剩余的可用資源個數,否則返回負數。
tryReleaseShared(int arg)方法:共享模式下嘗試釋放同步資源,同時AQS規定,如果釋放同步資源成功則返回true,否則返回false。
剖析AQS中的Node類
Node類提供的核心屬性
// 節點封裝的線程
volatile Thread thread;
// 指向前驅節點的指針
volatile Node prev;
// 指向后繼節點的指針
volatile Node next;
// 節點的等待狀態(默認為0)(默認為0)(默認為0)
volatile int waitStatus;
// 下一個正在等待的節點
Node nextWaiter;
// 共享模式下的標識節點
static final Node SHARED = new Node();
// 獨占模式下的標識節點
static final Node EXCLUSIVE = null;
同時Node類中維護了一系列節點的等待狀態值
// CANCELLED狀態,表示線程已超時等等,處於CANCELLED狀態的節點會從等待隊列中剔除,不會參與到同步資源的競爭當中
static final int CANCELLED = 1;
// SIGNAL狀態,如果節點的等待狀態為SIGNAL,那么當它釋放同步資源時,將會喚醒離它最近的同時等待狀態不為CANCELLED的后繼節點(同時也能說明節點存在后繼節點)
static final int SIGNAL = -1;
// 表示線程在指定的條件下進行等待
static final int CONDITION = -2;
// PROPAGATE狀態,表示實際存在可用資源,需要再往下傳播(喚醒)
static final int PROPAGATE = -3;
因此每個Node節點中都會包含節點封裝的線程、分別指向前驅和后繼節點的指針、節點的等待狀態、指向下一個正在等待的節點的指針。
自定義AQS獨占模式下的同步器來實現獨享鎖
/**
* 自定義AQS獨占模式下的同步器來實現獨享鎖
*/
public class Mutex implements Lock, java.io.Serializable {
/**
* 自定義AQS獨占模式下的同步器
* 使用state為0表示當前鎖沒有被線程所持有
* 使用state為1表示當前鎖已經被線程所持有
*/
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 判斷鎖是否被當前線程所持有
*/
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
/**
* 嘗試獲取鎖
* 判斷鎖是否存在,如果鎖不存在則獲取鎖(通過CAS控制)
*/
public boolean tryAcquire(int acquires) {
assert acquires == 1; // 值必須是1(獨享鎖只有一把鎖嘛)
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread()); // 將當前線程設置為獨占模式下擁有同步資源的線程
return true;
}
return false;
}
/**
* 嘗試釋放鎖(要求被誰加的鎖只能被誰釋放)
* 判斷當前擁有同步資源的線程是否為當前線程,如果不是則拋出異常,否則釋放鎖
* 這里有三種調用情況,鎖空閑的狀態下調用、鎖已經被線程所持有但被並非擁有鎖的線程調用、鎖已經被線程所持有並被擁有鎖的線程調用,只有第三種情況才能夠解鎖成功
*/
protected boolean tryRelease(int releases) {
assert releases == 1; // 值必須是1(獨享鎖只有一把鎖嘛)
if (Thread.currentThread() != getExclusiveOwnerThread()) // 要求被誰加的鎖只能被誰釋放
throw new IllegalMonitorStateException();
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null); // 將獨占模式中擁有同步資源的線程置為NULL
setState(0);
return true;
}
/**
* 提供一個Condition實例
*/
Condition newCondition() {
return new ConditionObject();
}
/**
* 判斷鎖是否被線程所持有
*/
final boolean isLocked() {
return getState() == 1;
}
}
/**
* 同步器
*/
private final Sync sync = new Sync();
/**
* 加鎖
*/
public void lock() {
sync.acquire(1);
}
/**
* 嘗試獲取鎖
*/
public boolean tryLock() {
return sync.tryAcquire(1);
}
/**
* 解鎖
* 解鎖只能調用同步器的release(),不能調用tryRelease()方法,因為tryRelease()方法只是簡單的修改一下同步狀態的值而已,並沒有去喚醒等待隊列中的線程,正常是需要喚醒等待隊列中離頭節點最近的同時等待狀態不為CANCELLED的節點
*/
public void unlock() {
sync.release(1);
}
/**
* 返回與此Mutex綁定的Condition實例
*/
public Condition newCondition() {
return sync.newCondition();
}
/**
* 判斷鎖是否被線程所持有
*/
public boolean isLocked() {
return sync.isLocked();
}
/**
* 判斷是否有線程在等待獲取鎖
*/
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* 可能拋出InterruptedException的加鎖(如果線程被設置了中斷標識那么直接拋出異常)
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* 在指定的時間內嘗試獲取鎖
*/
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
AQS子類(同步器)一般都是通過內部類實現,然后作為內部組件來使用。
public class Main {
static class MyRunnable implements Runnable {
private Mutex mutex = new Mutex();
@Override
public void run() {
System.out.println(String.format("%s Running", Thread.currentThread().getName()));
mutex.lock();
System.out.println(String.format("%s加鎖", Thread.currentThread().getName()));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
mutex.unlock();
System.out.println(String.format("%s解鎖", Thread.currentThread().getName()));
}
}
public static void main(String[] args) {
Runnable runnable = new MyRunnable();
Thread threadA = new Thread(runnable, "線程A");
Thread threadB = new Thread(runnable, "線程B");
Thread threadC = new Thread(runnable, "線程C");
threadA.start();
threadB.start();
threadC.start();
}
}
當等待隊列中的線程被喚醒,在嘗試獲取同步資源之前,其他線程可以調用acquire()或tryAcquire()方法來獲取同步資源,因此該鎖是非公平鎖。
獨占模式下獲取同步資源的源碼分析
acquire()方法
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
總結:當線程要獲取同步資源時,可以調用acquire()或者tryAcquire()方法,acquire()方法中會調用AQS子類的tryAcquire()方法,嘗試獲取同步資源,如果獲取同步資源成功,則直接返回,做自己的事情,否則將會執行addWaiter()方法,將當前線程封裝成Node節點然后加入到等待隊列當中,然后執行acquireQueued()方法,用於自旋獲取同步資源,如果所有條件都滿足那么最后將會執行selfInterrupt()方法。
addWaiter()方法
private Node addWaiter(Node mode) {
// 將當前線程封裝成Node節點,並且指定為獨占模式,獨占模式Node.EXCLUSIVE為NULL,也就是說節點的nextWaiter為NULL
Node node = new Node(Thread.currentThread(), mode);
// 將節點加入到隊尾當中
Node pred = tail;
if (pred != null) {
// 將當前節點的前驅指針指向尾節點
node.prev = pred;
// 通過CAS設置尾節點(如果pred指針所指向的尾節點就是當前的尾節點,也就是在這個過程當中沒有其他節點插入到隊尾,則將tail指針指向當前節點)
if (compareAndSetTail(pred, node)) {
// 將之前尾節點的后繼指針指向當前節點
pred.next = node;
return node;
}
}
// 如果不存在尾節點,也就是隊列為空,或者通過CAS設置尾節點失敗(也就是在這個過程當中有其他節點插入到隊尾),那么將會通過enq()方法死循環進行設置。
enq(node);
// 無論怎么樣該方法最終都會返回封裝了當前線程的節點。
return node;
}
總結:addWaiter()方法用於將當前線程封裝成Node節點然后加入到等待隊列當中,如果在這個過程中,等待隊列為空或者通過CAS設置尾節點失敗,那么將會通過enq()方法死循環進行設置。
enq()方法
private Node enq(final Node node) {
// 死循環
for (;;) {
Node t = tail;
// 如果尾節點為空則初始化隊列,創建一個空的節點,並且將head和tail指針都指向這個節點
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 將當前節點的前驅指針指向尾節點
node.prev = t;
// 通過CAS設置尾節點(如果t指針所指向的節點就是當前的尾節點,也就是在這個過程當中沒有其他節點插入到隊尾,則將tail指針指向當前節點)
if (compareAndSetTail(t, node)) {
// 將之前的尾節點的后繼指針指向當前節點
t.next = node;
return t;
}
}
}
}
總結:enq()方法中使用死循環初始化隊列以及通過CAS設置尾節點,直到尾節點被設置成功,同時需要注意的是當隊列初始化后會有一個空的頭節點,該節點不包含任何的線程,然后再將當前節點加入到隊列當中。
acquireQueued()方法
final boolean acquireQueued(final Node node, int arg) {
// 失敗標識
boolean failed = true;
try {
// 中斷標識
boolean interrupted = false;
// 自旋
for (;;) {
// 獲取節點的前驅節點
final Node p = node.predecessor();
// 如果節點的前驅節點是頭節點那么嘗試獲取同步資源
// 強制要求隊列中的節點獲取同步資源的順序必須是從隊頭到隊尾,否則將會造成節點丟失,丟失了的節點中的線程將會永遠處於阻塞狀態,同時只有當線程獲取了同步資源后,它才能成為頭節點(隊列初始化后的頭節點除外),因此頭節點肯定是已經獲取過同步資源的(隊列初始化后的頭節點除外),因此為了遵循隊列中的節點獲取同步資源的順序必須是從隊頭到隊尾,所以永遠只有頭節點的后繼節點擁有嘗試獲取同步資源的權利,因此當在嘗試獲取同步資源之前,需要先判斷一下當前節點的前驅節點是否是頭節點,如果不是就不用獲取了
if (p == head && tryAcquire(arg)) {
// 當獲取同步資源成功,則將當前節點設置為頭節點
setHead(node);
// 將之前頭節點的后繼指針設置為null,幫助GC
p.next = null;
failed = false;
// 返回中斷標識
return interrupted;
}
// 如果節點的前驅節點不是頭節點,或者嘗試獲取同步資源失敗,那么將會調用shouldParkAfterFailedAcquire()方法,判斷線程能否進行阻塞,當線程能夠被阻塞時,將會調用parkAndCheckInterrupt()方法阻塞線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果在執行該方法的過程中,拋出了異常(線程超時等等),則failed標識為true,那么將會執行cancelAcquire()方法,將當前節點的等待狀態設置為CANCELLED,同時從等待隊列中剔除。
if (failed)
cancelAcquire(node);
}
}
總結:acquireQueued()方法用於自旋獲取同步資源,同時該方法的方法出口只有一個,也就是當節點的前驅節點是頭節點,同時嘗試獲取同步資源成功,那么就會將當前節點設置為頭節點,否則就會調用shouldParkAfterFailedAcquire()方法,判斷線程能否進行阻塞,當線程能夠被阻塞時,將會調用parkAndCheckInterrupt()方法阻塞線程,等待被喚醒,同時在執行acquireQueued()方法的過程中,如果拋出了異常,則failed標識為true,那么將會執行cancelAcquire()方法,將當前節點的等待狀態設置為CANCELLED,同時從等待隊列中剔除。
shouldParkAfterFailedAcquire()方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取節點的前驅節點的等待狀態
int ws = pred.waitStatus;
// 如果前驅節點的等待狀態為SIGNAL,那么當它釋放同步資源時,將會自動喚醒離它最近的同時等待狀態不為CANCELLED的后繼節點,因此當前節點就可以直接阻塞了,等待被喚醒時再去嘗試獲取同步資源
if (ws == Node.SIGNAL)
return true;
// 如果前驅節點的等待狀態為CANCELLED,那么通過循環找到前一個不為CANCELLED狀態的節點,並且將當前節點的前驅指針指向該節點,將該節點的后繼指針指向當前節點
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 通過CAS將前驅節點的等待狀態設置為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
總結:shouldParkAfterFailedAcquire()方法用於判斷線程能否進行阻塞,以及剔除被設置為CANCELLED狀態的節點。
正常情況下,線程第一次進來shouldParkAfterFailedAcquire()方法時,會將前驅節點的等待狀態設置為SIGNAL,然后再次自旋進來該方法,判斷到前驅節點的等待狀態為SIGNAL,直接返回,然后就進入待阻塞狀態。
當該節點的前驅節點被CANCELLED時,如果前驅節點的前驅節點是頭節點,那么將會喚醒當前節點,那么它會再次自旋進來該方法,判斷到前驅節點的等待狀態為CANCELLED,就會將當前節點的前驅指針指向前一個不為CANCELLED狀態的節點,也就是頭節點,然后再將頭節點的后繼指針指向當前節點,然后再次自旋進來該方法,判斷到前驅節點的等待狀態為SIGNAL,直接返回,再次進入待阻塞狀態。
無論怎么樣通過shouldParkAfterFailedAcquire()方法的所有節點最終都會進入待阻塞狀態,也就是說等待隊列中除了頭節點以外的所有線程都會處於阻塞狀態。
parkAndCheckInterrupt()方法
private final boolean parkAndCheckInterrupt() {
// 阻塞當前線程,blocker對象使用當前對象
LockSupport.park(this);
// 當被喚醒時返回線程的中斷標識
return Thread.interrupted();
}
總結:parkAndCheckInterrupt()方法用於阻塞線程,同時當線程被喚醒時會返回線程的中斷標識,盡管如果線程被設置了中斷標識,但也不會影響線程繼續往下執行,只不過當它成功獲取到同步資源時,會調用一次selfInterrupt()方法,再次為線程設置中斷標識。
selfInterrupt()方法
static void selfInterrupt() {
// 為線程設置中斷標識
Thread.currentThread().interrupt();
}
總結:當獲取了同步資源的線程被設置了中斷標識,才會調用selfInterrupt()方法,再次為線程設置中斷標識,因為在parkAndCheckInterrupt()方法中已經調用過一次Thread.interrupted()方法,避免外部又再次調用Thread.interrupted()方法導致線程的中斷標識被清除。
cancelAcquire()方法
private void cancelAcquire(Node node) {
if (node == null)
return;
// 將當前節點封裝的線程設置為NULL
node.thread = null;
// 通過循環獲取當前節點不為CANCELLED狀態的前驅節點
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 獲取前驅節點的后繼節點(如果節點的前驅節點不是CANCELLED狀態,那么前驅節點的后繼節點就是它自己)
Node predNext = pred.next;
// 將節點的等待狀態設置為CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果當前節點是尾節點,則直接通過CAS將tail指針指向當前節點不為CANCELLED狀態的前驅節點,同時通過CAS將前驅節點的后繼指針設置為NULL
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果當前節點的前驅節點不是頭節點 同時 前驅節點的等待狀態為SIGNAL(如果不是SIGNAL那就設置為SIGNAL) 且 前驅節點封裝的線程不為NULL
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// 獲取節點的后繼節點
Node next = node.next;
// 如果后繼節點的等待狀態不為CANCELLED,則通過CAS將前驅節點的后繼指針指向當前節點的后繼節點
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next); // 這里並沒有將當前節點的后繼節點的前驅指針指向前驅節點(不用設置,unparkSuccessor()方法會自動跳過)
} else {
// 如果當前節點的前驅節點是頭節點,則直接喚醒當前節點的后繼節點,讓它來剔除當前節點
unparkSuccessor(node);
}
node.next = node;
}
}
總結:如果線程在阻塞的過程當中拋出了異常,也就是直接中斷acquireQueued()方法,然后執行finally語句塊,由於failed標識為true,因此會執行cancelAcquire()方法,將當前節點的等待狀態設置為CANCELLED,如果當前節點是尾節點,則直接通過CAS將tail指針指向當前節點不為CANCELLED狀態的前驅節點,同時將該前驅節點的后繼指針設置為NULL,如果當前節點的前驅節點不是頭節點,則通過CAS將前驅節點的后繼指針指向當前節點的后繼節點,如果當前節點的前驅節點是頭節點,那么喚醒當前節點的后繼節點,讓它來剔除當前節點。
獨占模式下釋放同步資源的源碼分析
release()方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 如果隊列不等於空,同時頭節點的等待狀態不為0,也就是頭節點存在后繼節點,那么調用unparkSuccessor()方法,喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點。
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
總結:當獲取了同步資源的線程釋放同步資源時(外部線程或者頭節點中的線程),將會調用release()方法,release()方法中會調用AQS子類的tryRelease()方法,嘗試釋放同步資源,如果釋放同步資源成功,同時隊列不為空以及頭節點的等待狀態不為0,也就是頭節點存在后繼節點,那么就會調用unparkSuccessor()方法,喚醒離頭節點最近的(也就是頭節點的后繼節點)同時等待狀態不為CANCELLED的后繼節點,那么該節點將會通過自旋嘗試獲取同步資源。
unparkSuccessor()方法
private void unparkSuccessor(Node node) {
// 獲取節點的等待狀態
int ws = node.waitStatus;
// 如果節點的等待狀態不為CANCELLED,則通過CAS將節點的等待狀態設置為0(恢復成隊列初始化后的狀態)
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 獲取節點的后繼節點
Node s = node.next;
// 如果節點的后繼指針為NULL(不能說明節點就沒有后繼節點)或者后繼節點為CANCELLED狀態,那么就從后往前尋找離當前節點最近的同時等待狀態不為CANCELLED的后繼節點
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒該后繼節點中的線程
LockSupport.unpark(s.thread);
}
總結:unparkSuccessor()方法用於喚醒離節點最近的同時等待狀態不為CANCELLED的后繼節點,如果節點的后繼指針為NULL,不能說明節點就沒有后繼節點,或者后繼節點的等待狀態為CANCELLED,則從后往前,尋找離節點最近的同時等待狀態不為CANCELLED的節點,最終喚醒該節點中的線程。
獨占模式下源碼分析后的總結
1.當線程要獲取同步資源時,可以調用acquire()或者tryAcquire()方法,acquire()方法中會調用AQS子類的tryAcquire()方法,嘗試獲取同步資源,如果獲取同步資源成功,則直接返回,做自己的事情,否則將會執行addWaiter()方法,將當前線程封裝成Node節點然后加入到等待隊列當中,然后執行acquireQueued()方法,用於自旋獲取同步資源,如果所有條件都滿足那么最終將會執行selfInterrupt()方法。
2.addWaiter()方法用於將當前線程封裝成Node節點然后加入到等待隊列當中,如果在這個過程中,等待隊列為空或者通過CAS設置尾節點失敗(也就是當前指針所指向的尾節點並不是真正的尾節點,也就是在這個過程當中有其他節點插入到隊尾),那么將會通過enq()方法死循環進行設置。
3.enq()方法中使用死循環初始化隊列以及通過CAS設置尾節點,直到尾節點被設置成功,同時需要注意的是當隊列初始化后會有一個空的頭節點,該節點不包含任何的線程,然后再將當前節點加入到隊列當中。
4.acquireQueued()方法用於自旋獲取同步資源,同時該方法的方法出口只有一個,也就是當節點的前驅節點是頭節點,同時嘗試獲取同步資源成功,那么就會將當前節點設置為頭節點,否則就會調用shouldParkAfterFailedAcquire()方法,判斷線程能否進行阻塞,當線程能夠被阻塞時,將會調用parkAndCheckInterrupt()方法阻塞線程,等待被喚醒,同時在執行acquireQueued()方法的過程中,如果拋出了異常,則failed標識為true,那么將會執行cancelAcquire()方法,將當前節點的等待狀態設置為CANCELLED,同時從等待隊列中剔除。
5.shouldParkAfterFailedAcquire()方法用於判斷線程能否進行阻塞,以及剔除被設置為CANCELLED狀態的節點,正常情況下,線程第一次進來shouldParkAfterFailedAcquire()方法時,會將前驅節點的等待狀態設置為SIGNAL,然后再次自旋進來該方法,判斷到前驅節點的等待狀態為SIGNAL,直接返回,然后就進入待阻塞狀態,當該節點的前驅節點被CANCELLED時,如果前驅節點的前驅節點是頭節點,那么將會喚醒當前節點,那么它會再次自旋進來該方法,判斷到前驅節點的等待狀態為CANCELLED,就會將當前節點的前驅指針指向前一個不為CANCELLED狀態的節點,也就是頭節點,然后再將頭節點的后繼指針指向當前節點,然后再次自旋進來該方法,判斷到前驅節點的等待狀態為SIGNAL,直接返回,再次進入待阻塞狀態,無論怎么樣通過shouldParkAfterFailedAcquire()方法的所有節點最終都會進入待阻塞狀態,也就是說等待隊列中除了頭節點以外的所有線程都會處於阻塞狀態。
6.parkAndCheckInterrupt()方法用於阻塞線程,同時當線程被喚醒時會返回線程的中斷標識,盡管如果線程被設置了中斷標識,但也不會影響線程繼續往下執行,只不過當它成功獲取到同步資源時,會調用一次selfInterrupt()方法,再次為線程設置中斷標識,因為在parkAndCheckInterrupt()方法中已經調用過一次Thread.interrupted()方法,避免外部又再次調用Thread.interrupted()方法導致線程的中斷標識被清除。
此時等待隊列中除了頭節點以外的所有線程都會處於阻塞狀態
1.如果線程在阻塞的過程當中拋出了異常,也就是直接中斷acquireQueued()方法,然后執行finally語句塊,由於failed標識為true,因此會執行cancelAcquire()方法,將當前節點的等待狀態設置為CANCELLED,如果當前節點是尾節點,則直接通過CAS將tail指針指向當前節點不為CANCELLED狀態的前驅節點,同時將該前驅節點的后繼指針設置為NULL,如果當前節點的前驅節點不是頭節點,則通過CAS將前驅節點的后繼指針指向當前節點的后繼節點,如果當前節點的前驅節點是頭節點,那么喚醒當前節點的后繼節點,讓它來剔除當前節點。
2.當獲取了同步資源的線程釋放同步資源時(外部線程或者頭節點中的線程),將會調用release()方法,release()方法中會調用AQS子類的tryRelease()方法,嘗試釋放同步資源,如果釋放同步資源成功,同時隊列不為空以及頭節點的等待狀態不為0,也就是頭節點存在后繼節點,那么就會調用unparkSuccessor()方法,喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點,那么該節點將會通過自旋嘗試獲取同步資源。
3.在調用unparkSuccessor()方法喚醒離節點最近的同時等待狀態不為CANCELLED的后繼節點時,如果節點的后繼指針為NULL,不能說明節點就沒有后繼節點,或者后繼節點的等待狀態為CANCELLED,則從后往前,尋找離節點最近的同時等待狀態不為CANCELLED的節點,最終喚醒該節點中的線程。
獨占模式FAQ
為什么要用CAS設置尾節點?
如果在設置尾節點的這個過程當中,有其他節點插入到隊尾,然后將tail指針指向當前節點,當前節點的前驅指針指向之前的尾節點,之前的尾節點的后繼指針指向當前節點,那么中間插入的節點就會丟失。
在acquireQueued()方法中,為什么嘗試獲取同步資源之前,需要先判斷一下當前節點的前驅節點是否是頭節點?
強制要求等待隊列中的節點獲取同步資源的順序必須是從隊頭到隊尾,否則將會造成節點丟失,丟失了的節點中的線程將會永遠處於阻塞狀態(當同步資源被釋放時,還沒來得及喚醒離頭節點最近同時等待狀態不為CANCELLED的后繼節點時,等待隊列中一個排在很后的節點被喚醒,然后它將會通過自旋嘗試獲取同步資源,一旦它獲取了同步資源,那么它將成為頭節點,最終它與之前頭節點之間的所有節點中的線程將會永遠處於阻塞狀態),同時只有當線程獲取了同步資源后,它才能成為頭節點(隊列初始化后的頭節點除外),因此頭節點肯定是已經獲取過同步資源的(隊列初始化后的頭節點除外),因此為了遵循隊列中的節點獲取同步資源的順序必須是從隊頭到隊尾,所以永遠只有頭節點的后繼節點擁有嘗試獲取同步資源的權利,因此當在嘗試獲取同步資源之前,需要先判斷一下當前節點的前驅節點是否是頭節點,如果不是就不用獲取了,至於頭節點釋放同步資源后,能否被后繼節點獲取到同步資源另說,因為當同步資源被釋放時,被喚醒的后繼節點可能還沒來得獲取同步資源,此時就被外部線程直接獲取了,因此被喚醒的這個線程又只能再次進入阻塞狀態。
為什么在unparkSuccessor()方法中,如果節點的后繼指針為NULL,需要從后往前尋找離節點最近的同時等待狀態不為CANCELLED的后繼節點,而不從前往后進行尋找?
如果節點的后繼指針為NULL,不能說明節點就沒有后繼節點,因為無論是在addWaiter()方法還是enq()方法將節點加入到隊列,它總是先將當前節點的前驅指針指向尾節點,然后再通過CAS將tail指針指向當前節點,如果在將之前尾節點的后繼指針指向當前節點之前,需要喚醒尾節點的后繼節點,由於此時尾節點的后繼指針仍然為NULL,因此無法通過next指針從前往后尋找,只能通過pred指針從后往前尋找。
線程在什么情況會被喚醒?
線程被喚醒只有兩種情況
一種是外部線程或者頭節點釋放同步資源時,需要喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點,那么該節點就會通過自旋嘗試獲取同步資源。
一種是當節點的前驅節點被CANCELLED時,如果前驅節點的前驅節點是頭節點,那么將會喚醒當前節點,將當前節點的前驅指針指向前一個不為CANCELLED狀態的節點,也就是頭節點,然后再將頭節點的后繼指針指向當前節點。
等待隊列中處於CANCELLED狀態的節點什么時候被剔除?
cancelAcquire()和shouldParkAfterFailedAcquire()方法都可以剔除等待隊列中處於CANCELLED狀態的節點。
*在unparkSuccessor()中需要剔除處於CANCELLED狀態的節點是為了避免同步問題,可能存在一個處於CANCELLED狀態的節點未來得及被剔除,然后它又作為要喚醒的節點的后繼節點。
自定義AQS共享模式下的同步器來實現共享鎖
/**
* 自定義AQS共享模式下的同步器來實現共享鎖
*/
public class Share {
/**
* 自定義AQS共享模式下的同步器
*/
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 存儲線程獲取同步資源的情況
*/
private static ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
/**
* 初始化同步資源
*/
public Sync(int state) {
setState(state);
}
/**
* 嘗試獲取同步資源(需要保證是線程安全的)
*/
@Override
protected int tryAcquireShared(int arg) {
int state = getState();
int available = state - arg;
if (available >= 0 && compareAndSetState(state, available)) { // 通過CAS保證原子性
threadLocal.set(arg);
return available;
}
return -1;
}
/**
* 釋放同步資源(線程釋放同步資源的個數必須等於它獲取同步資源的個數)
*/
@Override
protected boolean tryReleaseShared(int arg) {
if (threadLocal.get() != arg)
throw new UnsupportedOperationException();
if (compareAndSetState(getState(), getState() + arg)) { // 通過CAS保證原子性
threadLocal.set(null);
return true;
}
return false;
}
}
/**
* 初始化同步器的同步資源
*/
public Share(int permits) {
sync = new Sync(permits);
}
public Sync sync;
/**
* 獲取許可
*/
public void acquire(int permits) {
sync.acquireShared(permits);
}
/**
* 嘗試獲取許可
*/
public boolean tryAcquire(int permits) {
return sync.tryAcquireShared(permits) >= 0;
}
/**
* 釋放許可
*/
public boolean release(int permits) {
return sync.releaseShared(permits);
}
}
public class Main {
static class MyRunnable implements Runnable {
private Share share;
private int permits;
@Override
public void run() {
System.out.println(String.format("%s Running", Thread.currentThread().getName()));
share.acquire(permits);
System.out.println(String.format("%s獲取了%s個許可", Thread.currentThread().getName(), permits));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
share.release(permits);
System.out.println(String.format("%s釋放了%s個許可", Thread.currentThread().getName(), permits));
}
public MyRunnable(Share share, int permits) {
this.share = share;
this.permits = permits;
}
}
public static void main(String[] args) {
Share share = new Share(10);
Thread threadA = new Thread(new MyRunnable(share,5),"線程A");
Thread threadB = new Thread(new MyRunnable(share,4),"線程B");
Thread threadC = new Thread(new MyRunnable(share,3),"線程C");
threadA.start();
threadB.start();
threadC.start();
}
}
共享模式下獲取同步資源的源碼分析
acquireShared()方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
總結:當線程要獲取同步資源時,可以調用acquireShared()或tryAcquireShared()方法,acquireShared()方法中會調用AQS子類的tryAcquireShared()方法,嘗試獲取同步資源,如果獲取同步資源成功,則直接返回,做自己的事情,否則將會調用doAcquireShared()方法。
doAcquireShared()方法
private void doAcquireShared(int arg) {
// 將當前線程封裝成Node節點,然后加入到等待隊列當中
// 當前節點會被指定為共享模式,共享模式Node.SHARED為一個空的節點,也就是說節點的nextWaiter不為NULL(isShared()方法返回true)
// 在調用addWaiter()方法的過程中,如果等待隊列為空或者通過CAS設置尾節點失敗,那么將會通過enq()方法死循環進行設置
final Node node = addWaiter(Node.SHARED);
// 失敗標識
boolean failed = true;
try {
// 中斷標識
boolean interrupted = false;
// 自旋
for (;;) {
// 獲取節點的前驅節點
final Node p = node.predecessor();
// 如果節點的前驅節點是頭節點,則嘗試獲取同步資源
if (p == head) {
int r = tryAcquireShared(arg);
// 如果獲取同步資源成功,則調用setHeadAndPropagate()方法
if (r >= 0) {
setHeadAndPropagate(node, r);
// 將之前的頭節點的后繼指針設置為NULL,help gc
p.next = null;
// 如果獲取了同步資源的線程被設置了中斷標識,那么調用selfInterrupt()方法,再次為線程設置一個中斷標識
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 如果節點的前驅節點不是頭節點,或者嘗試獲取同步資源失敗,那么將會調用shouldParkAfterFailedAcquire()方法,判斷線程能否進行阻塞,當線程能夠被阻塞時,將會調用parkAndCheckInterrupt()方法阻塞線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果在執行該方法的過程中,拋出了異常(線程超時等等),則failed標識為true,那么將會執行cancelAcquire()方法,將當前節點的等待狀態設置為CANCELLED,同時從等待隊列中剔除。
if (failed)
cancelAcquire(node);
}
}
總結:doAcquireShared()方法用於將當前線程封裝成Node節點然后加入到等待隊列當中,然后通過自旋獲取同步資源,同時該方法的方法出口只有一個,也就是當節點的前驅節點是頭節點,同時嘗試獲取同步資源成功,那么就會調用setHeadAndPropagate()方法,否則將會調用shouldParkAfterFailedAcquire()方法,判斷線程能否進行阻塞,當線程能夠被阻塞時,將會調用parkAndCheckInterrupt()方法阻塞線程,等待被喚醒,同時在執行doAcquireShared()方法的過程中,如果拋出了異常,則failed標識為true,那么將會執行cancelAcquire()方法,將當前節點的等待狀態設置為CANCELLED,同時從等待隊列中剔除。
setHeadAndPropagate()方法
private void setHeadAndPropagate(Node node, int propagate) {
// 獲取頭節點
Node h = head;
// 將當前節點設置為頭節點
setHead(node);
//如果線程獲取了同步資源后,仍然有剩余的可用資源(正常情況),或沒有剩余的可用資源但舊的和新的頭節點的等待狀態為PROPAGATE時(說明實際存在可用資源),那么將會調用doReleaseShared()方法
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) // 排除等待隊列中不為共享模式的節點
doReleaseShared();
}
}
總結:setHeadAndPropagate()方法用於將當前節點設置為頭節點,同時如果當線程獲取了同步資源后,仍然有剩余的可用資源(正常情況),或沒有剩余的可用資源但舊的和新的頭節點的等待狀態為PROPAGATE時(說明實際存在可用資源),那么將會調用doReleaseShared()方法。
doReleaseShared()方法
private void doReleaseShared() {
// 使用死循環來保證CAS操作最終肯定成功
for (;;) {
// 獲取頭節點
Node h = head;
// 如果head指針和tail指針不是指向同一個節點,說明頭節點肯定存在后繼節點(使用head != tail可以避免頭節點存在后繼節點但是頭節點的后繼指針又為NULL的情況)
if (h != null && h != tail) {
// 獲取頭節點的等待狀態,如果等待狀態為SIGNAL,則通過CAS將頭節點的等待狀態設置為0(重置),然后喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
// 如果頭節點的等待狀態為0,則通過CAS將頭節點的等待狀態設置為PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head) // 如果執行完以上步驟后,h指針指向的頭節點仍然為當前的頭節點,則退出循環,完成釋放過程,然后做自己的事情
break;
}
}
總結:當等待隊列中的線程獲取了同步資源后,仍然有剩余的可用資源,或沒有剩余的可用資源但舊的和新的頭節點的等待狀態為PROPAGATE,或者當線程釋放同步資源這兩種情況,都會調用doReleaseShared()方法,該方法使用死循環來保證CAS操作最終肯定成功,如果頭節點存在后繼節點,同時頭節點的等待狀態為SIGNAL時,那么將會通過CAS將頭節點的等待狀態設置為0(重置),然后喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點,如果判斷到頭節點的等待狀態為0,那么將會通過CAS將節點的等待狀態設置為PROPAGATE,表示需要傳播下去。
共享模式下釋放同步資源的源碼分析
releaseShared()方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
總結:當獲取了同步資源的線程釋放同步資源時,將會調用releaseShared()方法,releaseShared()方法中會調用AQS子類的tryReleaseShared()方法,嘗試釋放同步資源,如果釋放同步資源成功,則會調用doReleaseShared()方法,喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點。
共享模式下源碼分析后的總結
1.當線程要獲取同步資源時,可以調用acquireShared()或tryAcquireShared()方法,acquireShared()方法中會調用AQS子類的tryAcquireShared()方法,嘗試獲取同步資源,如果獲取同步資源成功,則直接返回,做自己的事情,否則將會調用doAcquireShared()方法。
2.doAcquireShared()方法用於將當前線程封裝成Node節點然后加入到等待隊列當中,然后通過自旋獲取同步資源,同時該方法的方法出口只有一個,也就是當節點的前驅節點是頭節點,同時嘗試獲取同步資源成功,那么就會調用setHeadAndPropagate()方法,否則將會調用shouldParkAfterFailedAcquire()方法,判斷線程能否進行阻塞,當線程能夠被阻塞時,將會調用parkAndCheckInterrupt()方法阻塞線程,等待被喚醒,同時在執行doAcquireShared()方法的過程中,如果拋出了異常,則failed標識為true,那么將會執行cancelAcquire()方法,將當前節點的等待狀態設置為CANCELLED,同時從等待隊列中剔除。
3.setHeadAndPropagate()方法用於將當前節點設置為頭節點,同時如果當線程獲取了同步資源后,仍然有剩余的可用資源(正常情況),或沒有剩余的可用資源但舊的和新的頭節點的等待狀態為PROPAGATE時(說明實際存在可用資源),那么將會調用doReleaseShared()方法。
4.當等待隊列中的線程獲取了同步資源后,仍然有剩余的可用資源,或沒有剩余的可用資源但舊的和新的頭節點的等待狀態為PROPAGATE,或者當線程釋放同步資源這兩種情況,都會調用doReleaseShared()方法,該方法使用死循環來保證CAS操作最終肯定成功,如果頭節點存在后繼節點,同時頭節點的等待狀態為SIGNAL時,那么將會通過CAS將頭節點的等待狀態設置為0(重置),然后喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點,如果判斷到頭節點的等待狀態為0(表示並發釋放同步資源),那么將會通過CAS將節點的等待狀態設置為PROPAGATE,表示需要傳播下去。
5.當獲取了同步資源的線程釋放同步資源時,將會調用releaseShared()方法,releaseShared()方法中會調用AQS子類的tryReleaseShared()方法,嘗試釋放同步資源,如果釋放同步資源成功,則會調用doReleaseShared()方法,喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點。
共享模式FAQ
有哪些場景會將節點的等待狀態設置為PROPAGATE,以及它的作用是什么?
1.當線程A釋放同步資源時,將當前的頭節點的等待狀態設置為0,然后喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點,如果被喚醒的節點獲取了同步資源,然后在調用setHeadAndPropagate()方法之前,線程B釋放了同步資源,此時判斷到頭節點的等待狀態為0,那么就會將頭節點的等待狀態設置為PROPAGATE,表示並發釋放了同步資源,目前還有可用的同步資源,然后被喚醒的節點在執行setHeadAndPropagate()方法時,如果沒有剩余的可用資源,但是判斷到舊的頭節點的等待狀態為PROPAGATE,說明實際存在可用資源,那么會再次調用doReleaseShared()方法,去喚醒后繼節點,嘗試獲取同步資源。
2.如果被喚醒的節點獲取了同步資源,在將當前節點設置為頭節點之后,線程A和B釋放了同步資源,那么就跟場景1一樣,線程B會將頭節點的等待狀態設置為PROPAGATE,然后被喚醒的節點在執行setHeadAndPropagate()方法時,如果沒有剩余的可用資源,除了判斷舊的頭節點的等待狀態是否為PROPAGATE以外,還需要判斷新的頭節點的等待狀態是否為PROPAGATE。
場景一和場景二的區別是獲取同步資源的線程在設置頭節點之前還是頭節點之后。