一、AQS概述
AQS全名AbstractQueuedSynchronizer,意為抽象隊列同步器,JUC(java.util.concurrent包)下面的Lock和其他一些並發工具類都是基於它來實現的。AQS維護了一個volatile的state和一個CLH(FIFO)雙向隊列。
二、分析
state
state是一個由volatile修飾的int變量,它的訪問方式有三種:
- getState()
- setState(int newState)
- compareAndSetState(int expect, int update)
/**
* 由volatile修飾的state
*/
private volatile int state;
/**
* 基於內存可見性的讀
*/
protected final int getState() {
return state;
}
/**
* 基於內存可見性的寫
*/
protected final void setState(int newState) {
state = newState;
}
/**
* 使用CAS+volatile,基於原子性與可見性的對state進行設值
*/
protected final boolean compareAndSetState(int expect, int update) {
// 使用Unsafe類,調用JNI方法
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
資源獲取主要有兩種形式:
- 獨占式(EXCLUSIVE)
僅有一個線程能在同一時刻獲取到資源並處理,如ReentrantLock的實現。
- 共享式(SHARED)
多個線程可以同時獲取到資源並處理,如Semaphore/CountDownLatch等。
AQS中大部分邏輯已經被實現,集成類只需要重寫state的獲取(acquire)與釋放(release)方法,因為在AQS中,這些方法默認定義的實現方式都是拋出不支持操作異常,所以按需實現即可。
其中需要繼承類重寫的方法有:
- tryAcquire(int arg)
此方法是獨占式的獲取資源方法,成功則返回true,失敗返回false。
- tryRelease(int arg)
此方法是獨占式的釋放資源方法,成功則返回true,失敗返回false。
- tryAcquireShared(int arg)
此方法是共享式的獲取資源方法,返回負數表示失敗,0表示獲取成功,但是沒有可用資源,正數表示獲取成功,且有可用資源。
- tryReleaseShared(int arg)
此方法是共享式的釋放資源方法,如果允許喚醒后續等待線程則返回true,不允許則返回false。
- isHeldExclusively()
判斷當前線程是否正在獨享資源,是則返回true,否則返回false。
CLH(FIFO)隊列
AQS中是通過內部類Node來維護一個CLH隊列的。源碼如下:
static final class Node {
/** 標記共享式訪問 */
static final Node SHARED = new Node();
/** 標記獨占式訪問 */
static final Node EXCLUSIVE = null;
/** 字段waitStatus的值,表示當前節點已取消等待 */
static final int CANCELLED = 1;
/**字段waitStatus的值,表示當前節點取消或釋放資源后,通知下一個節點 */
static final int SIGNAL = -1;
/** 表示正在等待觸發條件 */
static final int CONDITION = -2;
/**
* 表示下一個共享獲取應無條件傳播
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: 表示當前節點取消或釋放資源后,通知下一個節點
* CANCELLED: 表示當前節點已取消等待
* CONDITION: 表示正在等待觸發條件
* PROPAGATE: 表示下一個共享獲取應無條件傳播
*/
volatile int waitStatus;
/**
* 前節點
*/
volatile Node prev;
/**
* 下一個節點
*/
volatile Node next;
/**
* 節點對應線程
*/
volatile Thread thread;
/**
* 下一個等待的節點
*/
Node nextWaiter;
/**
* 是否是共享式訪問
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回前節點
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // 共享式訪問的構造函數
}
Node(Thread thread, Node mode) { // 用於被添加等待者使用
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // 用於Condition使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
獨占模式-獲取資源
使用AQS中的acquire(int arg)方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
該方法分為4個部分:
- tryAcquire()
需要自己實現的方法,如果獲取到資源使用權,則返回true,反之fasle。如果獲取到資源,返回true,!true為false,根據&&的短路性,則不會執行后續方法,直接跳過程序。如果未獲取到資源,返回false,!false為true,則進入后續方法。
- addWaiter()
如果未獲取到資源使用權,則首先會調用此方法。上源碼:
private Node addWaiter(Node mode) {
// 封裝當前線程和獨占模式
Node node = new Node(Thread.currentThread(), mode);
// 獲取尾部節點
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CAS設置尾部節點
if (compareAndSetTail(pred, node)) {
// 將為節點的下一節點指向當前node
pred.next = node;
return node;
}
}
// 如果尾結點為空或者設置尾結點失敗
enq(node);
return node;
}
private Node enq(final Node node) {
// 如果CAS設置未成功則死循環
for (;;) {
// 獲得尾結點
Node t = tail;
// 如果尾節點為空,說明CLH隊列為空,需要初始化
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 設置當前節點的前驅節點
node.prev = t;
// CAS設置當前節點為尾結點
if (compareAndSetTail(t, node)) {
// 設置后驅節點
t.next = node;
return t;
}
}
}
}
- acquiredQueued()
final boolean acquireQueued(final Node node, int arg) {
// 標識資源獲取是否失敗
boolean failed = true;
try {
// 標識線程是否中斷
boolean interrupted = false;
for (;;) {
// 獲得當前節點的前驅節點
final Node p = node.predecessor();
// 如果前驅節點為頭結點,說明快到當前節點了,嘗試獲取資源
if (p == head && tryAcquire(arg)) {
// 獲取資源成功
// 設置當前節點為頭結點
setHead(node);
// 取消前驅節點(以前的頭部)的后節點,方便GC回收
p.next = null; // help GC
// 標識未失敗
failed = false;
// 返回中斷標志
return interrupted;
}
// 如果當前節點的前驅節點不是頭結點或獲取資源失敗
// 需要用shouldParkAfterFailedAcquire函數判斷是否需要阻塞該節點持有的線程
// 如果需要阻塞,則執行parkAndCheckInterrupt方法,並設置被中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果最終獲取資源失敗
if (failed)
// 當前節點取消獲取資源
cancelAcquire(node);
}
}
- selfInterrupt()
中斷當前線程
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
獨占模式-釋放資源
release() 釋放資源並喚醒后繼線程
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 獲取頭結點
Node h = head;
// 頭結點不為空且等待狀態值不為0
if (h != null && h.waitStatus != 0)
// 喚醒后續等待線程
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 如果等待狀態值小於0
if (ws < 0)
// 使用CAS將waitStatus設置為0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 如果當前節點沒有后繼節點或者后繼節點放棄競爭資源
if (s == null || s.waitStatus > 0) {
s = null;
// 從隊列尾部循環直到當前節點,找到最近的且等待狀態值小於0的節點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果找到的后繼節點不為空,則喚醒其持有的線程
if (s != null)
LockSupport.unpark(s.thread);
}