JUC系列(一) AQS
AQS是什么
AQS是java並發包里很多並發工具都使用的基類,全名稱是AbstractQueuedSynchronizer。例如ReentrantLock,Semaphore,其他的諸如SynchronousQueue,FutureTask都是基於AQS實現的,我們也可以基於AQS實現自己的同步器。
AQS原理
AQS內部維護一個FIFO隊列來完成獲取鎖線程的排隊工作。
AQS包含兩個類型的節點,一個指向隊列頭部,一個指向隊列尾部,未獲取到鎖的線程會利用cas操作線程安全的加入到隊列尾部,隊列頭部的線程是獲取到同步狀態(鎖)的線程。隊列頭部的線程在完成任務釋放鎖后會喚醒它的后繼,后繼會在獲取鎖成功后把自己設置為頭節點。
AQS源碼分析
static final class Node {
//共享
static final Node SHARED = new Node();
//獨占
static final Node EXCLUSIVE = null;
//waitStatus=1,表示線程被取消(被中斷或者等待超時)
static final int CANCELLED = 1;
//waitStatus=-1,表示后繼線程需要被喚醒(unparking)
static final int SIGNAL = -1;
//waitStatus=-2,表示線程在等待條件
static final int CONDITION = -2;
//waitStatus=-3,表示下次共享同步會無條件傳播
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
//與當前節點關聯的隊列中的線程
volatile Thread thread;
}
//aqs指向隊列頭部的節點
private transient volatile Node head;
//aqs指向隊列尾部的節點
private transient volatile Node tail;
//用於同步線程之間的共享狀態。通過 CAS 和 volatile 保證其原子性和可見性
private volatile int state;
獨占式源碼分析:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1:調用使用者重寫的tryAcquire方法嘗試獲取同步狀態,或獲取成功,則后面邏輯全部結束,方法退出,否則進2.
2:此時獲取同步狀態失敗,構造獨占隊列節點,以線程安全的方式加入到隊列尾部。
3:該獨占節點在隊列中嘗試獲取同步狀態,若獲取失敗,則阻塞節點線程直到被前驅喚醒或者中斷。
addWaiter:cas加入到隊列尾部,如果失敗就enq(node)
//cas加入到隊列尾部,如果失敗就enq(node)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq:加入隊列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//隊列為空必須初始化
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//cas操作設置尾節點,失敗自旋一直嘗試,是一種樂觀的並發操作
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued:在隊列中嘗試獲取同步狀態
//在隊列中嘗試獲取同步狀態
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//初始化獲取同步狀態失敗為true
try {
boolean interrupted = false;//初始化中斷標志為false
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//前驅是頭節點並且嘗試獲取同步狀態成功
setHead(node);//設置當前節點為頭節點
p.next = null; //幫助GC清理垃圾
failed = false;//獲取同步狀態失敗標志false
return interrupted;//返回中斷標志false
}
//如果執行到這里,表示之前獲取同步狀態失敗了,shouldParkAfterFailedAcquire判斷是否需要阻塞,如果需要就會調用parkAndCheckInterrupt阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire判斷是否需要阻塞當前線程
//cancelled=1 signal=-1 CONDITION = -2 PROPAGATE = -3
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//node的前驅節點的等待狀態
if (ws == Node.SIGNAL)//前驅是喚醒狀態,可以阻塞,安心等待前驅來喚醒自己去競爭同步狀態
return true;
if (ws > 0) {//cancelled狀態
//向前找到第一個狀態<=0的節點並賦值給node的前驅
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//cas設置狀態為signal
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
共享式源碼分析:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
//嘗試獲取共享鎖失敗,則進入doAcquireShared方法
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//如果新增的節點前任節點是頭節點
final Node p = node.predecessor();
if (p == head) {
//嘗試獲取共享鎖
int r = tryAcquireShared(arg);
if (r >= 0) {
//獲取共享鎖成功則設置自己為頭節點並嘗試喚醒后續節點
setHeadAndPropagate(node, r);
p.next = null; // help GC 回收之前的頭節點
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//如果獲取鎖失敗,前任節點狀態為SIGNAL,則當前節點阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
