一:AQS概念
AQS是java.util.concurrent包的一個同步器,它實現了鎖的基本抽象功能,支持獨占鎖與共享鎖兩張方式,
獨占鎖:同一時刻只允許一個線程方法加鎖資源,例如:ReentrantLock
共享鎖:同一時刻允許多個線程方法資源,例如:countDownLatch
二:數據結構
AQS 隊列內部維護的是一個 FIFO 的雙向鏈表,這種結構的特點是每個數據結構都有兩個指針,分別指向直接的后繼節點和直接前驅節點。所以雙向鏈表可以從任
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
System.out.println("do something......");
} finally {
lock.unlock();
}
看一下new ReentrantLock()
默認創建了一個非公平鎖,ReentrantLock內部維護了一個Sync同步器,
public ReentrantLock() {
sync = new NonfairSync();
}

看一下lock方法:

首先是比較狀態,同步器類AbstractQueuedSynchronizer維護了一個同步狀態的字段
private volatile int state;
當狀態為0時,表示資源沒有被占用,大於0則表示被占用
來一個線程首先判斷當前狀態是否是0,如果是則把state設置為1,然后把當前線程設置為資源擁有者
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
這里的acquire都是同步器實現的:


如果當前的狀態是0,則說明此時沒有線程占用鎖,那么設置同步器狀態,並把當前線程設置為資源擁有者
如果當前狀態不是0,則判斷當前線程是否是資源擁有者(因為支持可重入,所以可能大於0),如果是則累加狀態值
如果不是則返回失敗
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果獲取鎖失敗,則入隊列

先看一下addWaiter方法:
CLH隊列底層維護的是一個雙向鏈表結構,每一個節點Node維護當前線程引用,前一個節點和后一個節點的引用,Node節點
還具有狀態
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
看一下addWaiter方法:
第一次進來tail為null,所以會調用enq這個方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
來看一下enq方法:
如果tail為null,則新建一個node節點,並設置為head,然后將head引用賦值給tail,這樣head和tail都指向一個空節點Node
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

初始化完成,新加進來的Node會被設置為tail尾部節點,然后之前最后一個節點建立pre、next連接
下面框里tail不為null,就不是第一個被放進來的node節點,直接把node設置到tail尾部。

看一下獲取對列方法acquireQueued:
前面已經將獲取鎖失敗的線程以node節點的形式放到了CLH對列的tail尾部,這里的node就是維護當前線程的node,
如果node的前驅節點為head(head為正在執行的線程的節點),那么會再次嘗試獲取鎖。
獲取鎖成功:把讓對列的head指向node,然后將node節點維護的前驅和線程置位null,在這里拿到鎖,其實並不需要前驅節點的線程喚醒,
因為當前線程並沒有阻塞。
獲取鎖失敗:如果前驅節點不是head或者是head獲取鎖失敗,那么就會park當前線程。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

看一下shouldParkAfterFailedAcquire:
如果前驅節點的狀態為signal,則可以安全的park阻塞當前線程,因為前驅為signal狀態,說明當前驅節點維護的線程釋放鎖后,會通知當前線程。
如果狀態大於0,就是已取消,則向前遍歷,直到找到一個未取消的,已取消狀態,可能該任務已經中斷或者超時。
如果不是signal狀態,也沒有取消,那么就把前驅節點的waitStatus設置為signal,然后該節點就可以安全的park了。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

這里很簡單,就是把當前線程park阻塞。然后當前線程就會在這里阻塞,直到被前驅節點的線程喚醒。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
如果此時前一個線程執行完畢,執行unpark,那么該線程就會被喚醒。
喚醒之后,又會進入for死循環,爭搶鎖資源(因為是非公平鎖),獲取到鎖則執行代碼,獲取不到,還會執行
到park方法阻塞,如果該線程在阻塞過程中被中斷,那么喚醒后會執行中斷方法。

上面是非公平鎖的實現,下面來看一下公平鎖的實現,公平鎖的實現應該更簡單一些,
就是獲取鎖失敗后,直接進入對列等待,不會在獲取鎖,進入對列的時候獲取鎖,非公平鎖,在阻塞之前
有三次獲取鎖的機會。
ReentrantLock的構造方法是有參數,可以設置是否采用公平鎖:


看lock方法:


公平鎖與非公平鎖的區別:
非公平鎖獲取失敗后,會再次嘗試獲取,而公平鎖直接獲取。
公平鎖:

非公平鎖:

如果當前狀態為0,當前沒有線程占有鎖,它會先判斷對列中是否有前驅節點,如果有則不會獲取鎖,然后進入對列中
等待。

鎖的釋放:


如果線程的狀態為0,則把排他線程設置為null,如果重入次數過多,那么就需要多次unlock才可以,到最后一次unlock才會
釋放鎖

喚醒下一個線程:
unparkSuccessor(h);
把head節點設置為0狀態,然后下一個節點不為null,則喚醒下一個節點,如果為null,
則從tail往前遍歷,找到node下面不能為null且最近的沒有被取消的節點,然后喚醒。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
