AQS是個啥?
AQS(AbstractQueuedSynchronizer)是Java並發用來構建鎖和其他同步組件的基礎框架。許多同步類實現都依賴於它,如常用的ReentrantLock/ReentrantReadWriterLock/CountDownLatch等
AQS提供了獨占(Exclusive)以及共享(Share)兩種資源共享方式:
acquire(acquireShare)/release(releaseShare)。
acquire:獲取資源,如果當前資源滿足條件,則直接返回,否則掛起當前線程,將該線程加入到隊列排隊。
release:釋放資源,喚醒掛起線程
AQS隊列
AQS隊列示意圖
AQS隊列中的主要屬性
// 等待隊列頭部 private transient volatile Node head; // 等待隊列尾部 private transient volatile Node tail; // 鎖的狀態(加鎖成功則為1,解鎖為0,重入再+1) private volatile int state; // 當前持有鎖的線程,注意這個屬性是從AbstractOwnableSynchronizer繼承而來 private transient Thread exclusiveOwnerThread;
Node類中的主要屬性
static final class Node { // 標記表示節點正在共享模式中等待 static final Node SHARED = new Node(); // 標記表示節點正在獨占模式下等待 static final Node EXCLUSIVE = null; // 節點的等待狀態 還有一個初始化狀態0 不屬於以下四種狀態 // 表示Node所代表的當前線程已經取消了排隊,即放棄獲取鎖 static final int CANCELLED = 1; // 當一個節點的waitStatus被置為SIGNAL,就說明它的下一個節點(即它的后繼節點)已經被掛起了(或者馬上就要被掛起了), // 只要前繼結點釋放鎖,就會通知標識為SIGNAL狀態的后繼結點的線程執行 static final int SIGNAL = -1; // 節點在等待隊列中 // 當其他線程對Condition調用了signal()后,該節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中 static final int CONDITION = -2; // 表示下一次共享式同步狀態獲取,將會無條件地傳播下去 static final int PROPAGATE = -3; // 節點等待狀態,該字段初始化為0, volatile int waitStatus; // 當前節點的前置節點 volatile Node prev; // 當前節點的后置節點 volatile Node next; // 在此節點上排隊的線程信息 volatile Thread thread; }
ReentrantLock實現
在引入ReentrantLock實現前,我先來科普一下 util.concurrent包的作者Doug Lea,相比較其他而言,並發包的源碼閱讀難度較大。臉上永遠掛着謙遜靦腆笑容的Doug Lea先生使用了大量相對復雜的邏輯判斷,比如一個判斷條件中執行多個或且方法,讓你很難跟上他的節奏,很難揣摩他的設計思想。小聲逼逼,還不是我太菜了,留下來沒有技術的淚水。

繼承關系圖
ReentrantLock是Lock接口的一個實現類,是一種可重入的獨占鎖。
ReentrantLock內部通過內部類實現了AQS框架(AbstractQueuedSynchronizer)的API來實現獨占鎖的功能。

主要屬性
private final Sync sync; // 公平鎖內部是FairSync,非公平鎖內部是NonfairSync。 // 兩者都通過繼承 Sync間接繼承自AbstractQueuedSynchronizer這個抽象類 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; // 加鎖 abstract void lock(); // 嘗試獲取鎖 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 嘗試釋放鎖 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } }
構造方法
//默認創建一個非公平鎖 public ReentrantLock() { sync = new NonfairSync(); } //傳入true創建公平鎖,false非公平鎖 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
ReentrantLock公平鎖
我們以公平鎖為例對其中重要方法源碼分析
// 繼承了 Sync,從而間接繼承了 AbstractQueuedSynchronizer這個抽象類 static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // 上鎖 final void lock() { //調用 AQS 中 acquire方法 acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // CAS操作設置 state // 設置當前線程為擁有鎖的線程 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
acquire方法源碼分析
public final void acquire(int arg) { // tryAcquire(arg)嘗試加鎖,如果加鎖失敗則會調用acquireQueued方法加入隊列去排隊,如果加鎖成功則不會調用 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire方法干了這么幾件事情
1、tryAcquire() 嘗試獲取資源,如果成功則直接返回;
2、addWaiter() 將該線程加入等待隊列, 更新AQS隊列鏈信息
3、acquireQueued() 使線程在等待隊列中獲取資源,直到獲取資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
4、selfInterrupt() 自我中斷,如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后再將中斷補上。
tryAcquire方法
protected final boolean tryAcquire(int acquires) { // 獲取當前線程 final Thread current = Thread.currentThread(); // 獲取lock對象的上鎖狀態,如果鎖是自由狀態則=0,如果被上鎖則為1,大於1表示重入 int c = getState(); // c=0 代表沒人占用鎖,當前線程可以直接獲取鎖資源執行 if (c == 0) { // 下面介紹hasQueuedPredecessors()方法,判斷自己是否需要排隊 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // CAS操作設置 state // 設置當前線程為擁有鎖的線程 setExclusiveOwnerThread(current); return true; } } // 非重入鎖直接返回false,加鎖失敗 else if (current == getExclusiveOwnerThread()) { // 若為重入鎖, state 加1 (acquires) int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
hasQueuedPredecessors方法
public final boolean hasQueuedPredecessors() { // 獲取隊列頭、尾節點信息 Node t = tail; Node h = head; Node s; // h != t 有幾種情況 // 1、隊列尚未初始化完成,第一個線程獲取鎖資源, // 此時h和t都是null, h != t返回fasle初始化隊列 // 2、隊列已經被初始化了,其他的線程嘗試獲取資源, // 此時頭尾節點不相同,h!=t返回true, // 繼續判斷s.thread != Thread.currentThread() 當前來參與競爭鎖的線程和第一個排隊的線程是同一個線程,則需要排隊。 // 3、隊列已經被初始化了,但是由於鎖釋放的原因導致隊列里面只有一個數據 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
addWaiter方法
private Node addWaiter(Node mode) { // AQS隊列中的元素類型為Node,需要把當前線程封裝成為一個Node對象 Node node = new Node(Thread.currentThread(), mode); // tail為隊尾,賦值給pred Node pred = tai // 判斷pred是否為空,其實就是判斷隊尾是否有節點,其實只要隊列被初始化了隊尾肯定不為空, if (pred != null) { // 拼裝node隊列鏈的過程 // 直接把當前線程封裝的node的上一個節點設置成為pred即原來的隊尾 node.prev = pred; if (compareAndSetTail(pred, node)) { // pred的下一個節點設置為當node pred.next = node; return node; } } // 拼接aqs隊列鏈 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued方法
final boolean acquireQueued(final Node node, int arg) { // 標記是否成功拿到資源 boolean failed = true; try { // 標記等待過程中是否被中斷過 boolean interrupted = false; // 自旋 for (;;) { final Node p = node.predecessor(); // 判斷自己是否為隊列中的第二個節點 // 成為隊列中第二個節點才有資格獲取資源 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; // 返回等待過程中是否被中斷過 return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
公平鎖和非公平鎖的主要區別
為了方便對比,在這里列舉了兩種鎖的上鎖過程源碼,注意紅色標識片段
// 公平鎖上鎖過程 final void lock() { //調用 AQS 中 acquire方法 acquire(1); }
// 非公平鎖上鎖過程 final void lock() { // 嘗試獲取鎖,加鎖不成功則排隊。排隊之前僅有的一次插隊機會。 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
總結
1、如果第一個線程嘗試獲取資源時,此時和AQS隊列無關,線程直接持有鎖。並且不會初始化隊列,如果接下來的線程都是交替執行,那么和AQS隊列永遠無關,均為線程直接持有鎖。
2、在線程發生資源競爭的情況下,才會初始化AQS隊列,AQS隊列的頭部永遠是一個虛擬的Thread為NULL的node。
3、未能獲取到資源的線程將會處於park狀態,此時只有隊列中第二個node等待被喚醒,嘗試去獲取資源。其他node並不去競爭資源,這也是AQS隊列的精髓所在,減少了CPU的占用。
4、公平鎖的上鎖是必須判斷自己是不是需要排隊;而非公平鎖是直接進行CAS修改計數器看能不能加鎖成功;如果加鎖不成功則乖乖排隊(調用acquire);所以不管公平還是不公平;只要進到了AQS隊列當中那么他就會排隊;一朝排隊;永遠排隊!