1、AQS是AbstractQueuedSynchronizer的簡稱。提供用於實現阻塞鎖和同步器框架(信號量、事件等),依靠先入先出(FIFO)等待隊列。AQS為一系列同步器依賴於一個單獨的原子變量(state)的同步器提供了一個非常有用的基礎。AQS對於state的操作都是基於CAS操作,保證了state的原子性和可見性。
state使用
AQS 提供了三種操作state的方法
- getState()
- setState(int)
- compareAndSetState(int, int)
具體源碼如下
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS資源
AQS定義了兩種獲取資源的方式,獨占(exclusive)和共享(Shared)兩種方式,使用AQS值需要子類實現了以下方法的一部分或全部,如果沒有實現而調用會報UnsupportedOperationException異常
- tryAcquire(int) 嘗試獲取獨占資源
- tryRelease(int) 嘗試釋放獨占資源
- tryAcquireShared(int) 嘗試獲取共享資源
- tryReleaseShared(int) 嘗試釋放共享資源
- isHeldExclusively() 該線程是否正在獨占資源。只有用到condition才需要去實現它。
AQS的其他重要方法
- acquire(int arg) 在獨占模式中獲得,忽略中斷。
- 調用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
- 沒成功,則addWaiter()將該線程加入等待隊列的尾部,並標記為獨占模式;
- acquireQueued()使線程在等待隊列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
- 如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上。
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 如果沒有拿到鎖
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//提供中斷當前線程的方法
selfInterrupt();
}
/**將當前線程加入到等待隊列的隊尾,並返回代表當前線程的節點
* Creates and enqueues node for current thread and given mode.
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//EXCLUSIVE(獨占)和SHARED(共享)
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {// 直接放到隊尾。
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);// 按正常邏輯入隊列
return node;
}
/**將node加入隊尾
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) { // CAS + 循環 = 自旋
Node t = tail;
if (t == null) { // Must initialize 隊列為空
if (compareAndSetHead(new Node()))// 創建一個空的標志結點作為head結點
tail = head; // tail 和 head都是同一個節點
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // 放入tail尾部
t.next = node;
return t;
}
}
}
}
/**
* 線程阻塞等待獲取鎖
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 當前線程釋放中斷的標志位
for (;;) {// 循環搶鎖
final Node p = node.predecessor(); // 獲取前一個節點
if (p == head && tryAcquire(arg)) { // 如果前一個節點是head,就嘗試搶鎖,並且嘗試搶鎖成功
setHead(node); // 更換head,即當前Node出隊列
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&// 檢查狀態,是否需要掛起線程
parkAndCheckInterrupt()) // 掛起
interrupted = true; // 如果出現中斷,則修改標記
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- release(int) 釋放獨占模式。
- 一般來說,獨占都是可以釋放成功,但是得判斷該線程釋放完全釋放
public final boolean release(int arg) {
if (tryRelease(arg)) {//嘗試釋放
Node h = head; // 從頭開始找
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 喚醒下一個線程
return true;
}
return false;
}
/** 喚醒等待者
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; // 正在釋放鎖的線程節點狀態
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 修改當前節點狀態
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; // 找下一個節點
if (s == null || s.waitStatus > 0) { // 如果不存在或者被取消了,繼續尋找合適的下一個節點
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // 如果找到了合適的節點,就喚醒它
LockSupport.unpark(s.thread);
}
- acquireShared(int) 在共享模式中獲取,忽略中斷。
- 只要沒有獨占資源,該方法都應該成功,需要注意喚醒線程時需要判斷隊列中的下個線程是否時共享資源,如果時,也要同時喚醒。
/** 線程獲取共享資源的入口
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // 判斷量夠不夠
doAcquireShared(arg); // 沒拿到資源,需要等待
}
/**等待..
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 以共享模式加入隊列尾部
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 自旋
final Node p = node.predecessor(); // 前置節點
if (p == head) { // 如果前置為head
int r = tryAcquireShared(arg); // 嘗試獲取資源,返回資源剩余的數量
if (r >= 0) { // 拿到資源
setHeadAndPropagate(node, r); // 修改head節點
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- releaseShared(int)釋放共享資源
- 釋放共享資源都能成功
/** 線程釋放共享資源
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//嘗試
doReleaseShared();//執行
return true;
}
return false;
}
/** 共享模式下 - 喚醒當前head節點的后續節點
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 判定是否還有后續節點
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 如果狀態為SIGNAL,代表需要通知后續節點
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))// 修改狀態為0,通知一次
continue; // loop to recheck cases 修改失敗,代表已經通知,繼續處理
unparkSuccessor(h); // 喚醒
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 通知過后,修改節點狀態為PROPAGATE
continue; // loop on failed CAS
}
if (h == head) // loop if head changed 知道其他的節點,把這個head擠下來,它才跳出循環
break;
}
}