一、AQS原理
AQS(AbstractQueuedSynchronizer)隊列同步器是用來構建鎖、同步組件的基礎框架。
AQS內部通過一個volatile int類型的成員變量state控制同步狀態【0代表鎖未被占用,1表示已占用】,通過內部類Node構成FIFO的同步隊列實現等待獲取鎖的線程排隊工作,通過內部類ConditionObject構建條件等待隊列,來完成等待條件線程的排隊工作。當線程調用Condition對象的wait方法后會被加入等待隊列中,當有線程調用Condition的signal方法后,線程將從等待隊列移動到同步隊列進行鎖競爭。AQS內部會有一個同步隊列和可能多個等待隊列,前者存放等待獲取鎖的線程,后者分別存放等待不同條件的線程。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{ //指向同步隊列隊頭 private transient volatile Node head; //指向同步的隊尾 private transient volatile Node tail; //同步狀態,0代表鎖未被占用,1代表鎖已被占用 private volatile int state; static final class Node { //共享模式 static final Node SHARED = new Node(); //獨占模式 static final Node EXCLUSIVE = null; //標識線程已處於結束狀態 static final int CANCELLED = 1; //等待被喚醒狀態 static final int SIGNAL = -1; //條件狀態, static final int CONDITION = -2; //在共享模式中使用表示獲得的同步狀態會被傳播 static final int PROPAGATE = -3; //等待狀態,存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4種 volatile int waitStatus; //同步隊列中前驅結點 volatile Node prev; //同步隊列中后繼結點 volatile Node next; //請求鎖的線程 volatile Thread thread; //等待隊列中的后繼結點,這個與Condition有關,稍后會分析 Node nextWaiter; //判斷是否為共享模式 final boolean isShared() { return nextWaiter == SHARED; } //獲取前驅結點 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } //..... }
//上面Node head、tail對象構建同步隊列,這里用ConditionObject類的對象創建條件等待隊列 class ConditionObject implements Condition, java.io.Serializable { //等待隊列第一個等待結點 private transient Node firstWaiter; //等待隊列最后一個等待結點 private transient Node lastWaiter; //省略其他代碼....... } //AQS中的模板方法,由其子類實現 //獨占模式下獲取鎖的方法 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } //獨占模式下解鎖的方法 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } //共享模式下獲取鎖的方法 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } //共享模式下解鎖的方法 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } //判斷是否為持有獨占鎖 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } }
二、AQS的應用
AQS通過state狀態管理、同步隊列、等待隊列實現了多線程同步鎖獲取與釋放,多線程並發排隊、條件等待等復雜功能。
作為基礎組件,它對鎖的兩種模式【獨占模式和共享模式】都提供支持。設計上AQS采用模板方法模式構建,其內部提供了並發操作的核心方法、而將一些實現不同模式下實現可能有差異的操作定義為模板方法,讓其子類實現。如ReentrantLock通過內部類Sync及其子類繼承AQS實現tryAcuire()和tryRelease()方法來實現獨占鎖,而SemaPhore則通過內部類繼承AQS實現tryAcquireShared()方法和tryReleaseShared()方法實現共享模式鎖。AQS的繼承關系圖如下:

三、ReetrantLock非公平鎖實現分析AQS的用法
ReetrantLock中非公平鎖 //加鎖操作 public void lock() { sync.lock(); } /** * 非公平鎖實現sync.lock() */ static final class NonfairSync extends Sync { //加鎖 final void lock() { //執行CAS操作,獲取同步狀態 if (compareAndSetState(0, 1)) //成功則將獨占鎖線程設置為當前線程 setExclusiveOwnerThread(Thread.currentThread()); else //否則再次請求同步狀態 acquire(1); } } //acquire為AQS本身實現的方法,其實現如下: public final void acquire(int arg) { //再次嘗試獲取同步狀態 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //AQS子類Sync的子類NonfairSync中實現的tryAcquire方法 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } //nonfairTryAcquire方法 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //判斷同步狀態是否為0,並嘗試再次獲取同步狀態 if (c == 0) { //執行CAS操作 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果當前線程已獲取鎖,屬於重入鎖,再次獲取鎖后將status值加1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //設置當前同步狀態,當前只有一個線程持有鎖,因為不會發生線程安全問題,可以直接執行 setState(nextc); setState(nextc); return true; } return false; } //AQS本身實現的方法,將當前獲取鎖失敗的線程構造成node結點加入的同步隊列尾部,若隊列為空或者並發入隊失敗,則調用enq方法重試。 private Node addWaiter(Node mode) { //將請求同步狀態失敗的線程封裝成結點 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; //如果是第一個結點加入肯定為空,跳過。 //如果非第一個結點則直接執行CAS入隊操作,嘗試在尾部快速添加 if (pred != null) { node.prev = pred; //使用CAS執行尾部結點替換,嘗試在尾部快速添加 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //如果第一次加入或者CAS操作沒有成功執行enq入隊操作 enq(node); return node; } //AQS本身實現方法,通過循環CAS操作將當前線程構造的node結點入隊,解決上面隊列為空或者是並發入隊失敗的情況; private Node enq(final Node node) { //死循環 for (;;) { Node t = tail; //如果隊列為null,即沒有頭結點 if (t == null) { // Must initialize //創建並使用CAS設置頭結點 if (compareAndSetHead(new Node())) tail = head; } else {//隊尾添加新結點 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } //AQS本身方法,隊列中結點循環觀察,當自己的前驅是head結點執行的結點時嘗試獲取鎖,若成功將其設置為頭結點,否則循環嘗試;若當前結點前驅結點不是頭結點,則在設置其前驅結點狀態后將自己掛起 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //自旋,死循環 for (;;) { //獲取前驅結點 final Node p = node.predecessor(); 當且僅當p為頭結點才嘗試獲取同步狀態 if (p == head && tryAcquire(arg)) { //將node設置為頭結點 setHead(node); //清空原來頭結點的引用便於GC p.next = null; // help GC failed = false; return interrupted; } //如果前驅結點不是head,判斷是否掛起線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //最終都沒能獲取同步狀態,結束該線程的請求 cancelAcquire(node); } } //設置為頭結點 private void setHead(Node node) { head = node; //清空結點數據 node.thread = null; node.prev = null; } //如果前驅結點不是head,判斷是否掛起線程 if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) interrupted = true; } //AQS本身的方法,若前驅結點狀態為Node.SIGNAL則返回true,表示可以掛起當前結點,否則找到非結束狀態的前驅結點,並設置其狀態后,返回false private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //獲取當前結點的等待狀態 int ws = pred.waitStatus; //如果為等待喚醒(SIGNAL)狀態則返回true if (ws == Node.SIGNAL) return true; //如果ws>0 則說明是結束狀態, //遍歷前驅結點直到找到沒有結束狀態的結點 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果ws小於0又不是SIGNAL狀態, //則將其設置為SIGNAL狀態,代表該結點的線程正在等待喚醒。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //AQS本身方法,掛起當前線程並檢查其中斷狀態 private final boolean parkAndCheckInterrupt() { //將當前線程掛起 LockSupport.park(this); //獲取線程中斷狀態,interrupted()是判斷當前中斷狀態, //並非中斷線程,因此可能true也可能false,並返回 return Thread.interrupted(); } //ReentrantLock類的unlock public void unlock() { sync.release(1); } //AQS類的release()方法 public final boolean release(int arg) { //嘗試釋放鎖 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //喚醒后繼結點的線程 unparkSuccessor(h); return true; } return false; } //ReentrantLock類中的內部類Sync實現的tryRelease(int releases) protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //判斷狀態是否為0,如果是則說明已釋放同步狀態 if (c == 0) { free = true; //設置Owner為null setExclusiveOwnerThread(null); } //設置更新同步狀態 setState(c); return free; } //AQS本身方法,喚醒后續掛起的結點 private void unparkSuccessor(Node node) { //這里,node一般為當前線程所在的結點。 int ws = node.waitStatus; if (ws < 0)//置零當前線程所在的結點狀態,允許失敗。 compareAndSetWaitStatus(node, ws, 0); Node s = node.next;//找到下一個需要喚醒的結點s if (s == null || s.waitStatus > 0) {//如果為空或已取消 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0)//從這里可以看出,<=0的結點,都是還有效的結點。 s = t; } if (s != null) LockSupport.unpark(s.thread);//喚醒 }
