1.什么是AQS?
AQS的核心思想是基於volatile int state這樣的volatile變量,配合Unsafe工具對其原子性的操作來實現對當前鎖狀態進行修改。同步器內部依賴一個FIFO的雙向隊列來完成資源獲取線程的排隊工作。
2.同步器的應用
同步器主要使用方式是繼承,子類通過繼承同步器並實現它的抽象方法來管理同步狀態,對同步狀態的修改或者訪問主要通過同步器提供的3個方法:
- getState() 獲取當前的同步狀態
- setState(int newState) 設置當前同步狀態
- compareAndSetState(int expect,int update) 使用CAS設置當前狀態,該方法能夠保證狀態設置的原子性。
同步器可以支持獨占式的獲取同步狀態,也可以支持共享式的獲取同步狀態,這樣可以方便實現不同類型的同步組件。
同步器也是實現鎖的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。
3.AQS同步隊列
同步器AQS內部的實現是依賴同步隊列(一個FIFO的雙向隊列,其實就是數據結構雙向鏈表)來完成同步狀態的管理。
當前線程獲取同步狀態失敗時,同步器AQS會將當前線程和等待狀態等信息構造成為一個節點(node)加入到同步隊列,同時會阻塞當前線程;
當同步狀態釋放的時候,會把首節點中的線程喚醒,使首節點的線程再次嘗試獲取同步狀態。AQS是獨占鎖和共享鎖的實現的父類。
4.AQS鎖的類別:分為獨占鎖和共享鎖兩種。
- 獨占鎖:鎖在一個時間點只能被一個線程占有。根據鎖的獲取機制,又分為“公平鎖”和“非公平鎖”。等待隊列中按照FIFO的原則獲取鎖,等待時間越長的線程越先獲取到鎖,這就是公平的獲取鎖,即公平鎖。而非公平鎖,線程獲取的鎖的時候,無視等待隊列直接獲取鎖。ReentrantLock和ReentrantReadWriteLock.Writelock是獨占鎖。
- 共享鎖:同一個時候能夠被多個線程獲取的鎖,能被共享的鎖。JUC包中ReentrantReadWriteLock.ReadLock,CyclicBarrier,CountDownLatch和Semaphore都是共享鎖。
JUC包中的鎖的包括:Lock接口,ReadWriteLock接口;Condition條件,LockSupport阻塞原語。
AbstractOwnableSynchronizer/AbstractQueuedSynchronizer/AbstractQueuedLongSynchronizer三個抽象類,
ReentrantLock獨占鎖,ReentrantReadWriteLock讀寫鎖。CountDownLatch,CyclicBarrier和Semaphore也是通過AQS來實現的。
下面是AQS和使用AQS實現的一些鎖,以及通過AQS實現的一些工具類的架構圖:

圖 1.依賴AQS實現的鎖和工具類
5.AQS同步器的結構:同步器擁有首節點(head)和尾節點(tail)。同步隊列的基本結構如下:

圖 1.同步隊列的基本結構 compareAndSetTail(Node expect,Node update)
- 同步隊列設置尾節點(未獲取到鎖的線程加入同步隊列): 同步器AQS中包含兩個節點類型的引用:一個指向頭結點的引用(head),一個指向尾節點的引用(tail),當一個線程成功的獲取到鎖(同步狀態),其他線程無法獲取到鎖,而是被構造成節點(包含當前線程,等待狀態)加入到同步隊列中等待獲取到鎖的線程釋放鎖。這個加入隊列的過程,必須要保證線程安全。否則如果多個線程的環境下,可能造成添加到隊列等待的節點順序錯誤,或者數量不對。因此同步器提供了CAS原子的設置尾節點的方法(保證一個未獲取到同步狀態的線程加入到同步隊列后,下一個未獲取的線程才能夠加入)。 如下圖,設置尾節點:
圖 2.尾節點的設置 節點加入到同步隊列
- 同步隊列設置首節點(原頭節點釋放鎖,喚醒后繼節點):同步隊列遵循FIFO,頭節點是獲取鎖(同步狀態)成功的節點,頭節點在釋放同步狀態的時候,會喚醒后繼節點,而后繼節點將會在獲取鎖(同步狀態)成功時候將自己設置為頭節點。設置頭節點是由獲取鎖(同步狀態)成功的線程來完成的,由於只有一個線程能夠獲取同步狀態,則設置頭節點的方法不需要CAS保證,只需要將頭節點設置成為原首節點的后繼節點 ,並斷開原頭結點的next引用。如下圖,設置首節點:

圖 3.首節點的設置
6.獨占式的鎖的獲取:調用同步器的acquire(int arg)方法可以獲取同步狀態,該方法對中斷不敏感,即線程獲取同步狀態失敗后進入同步隊列,后續對線程進行中斷操作時,線程不會從同步隊列中移除。
(1) 當前線程實現通過tryAcquire()方法嘗試獲取鎖,獲取成功的話直接返回,如果嘗試失敗的話,進入等待隊列排隊等待,可以保證線程安全(CAS)的獲取同步狀態。
(2) 如果嘗試獲取鎖失敗的話,構造同步節點(獨占式的Node.EXCLUSIVE),通過addWaiter(Node node,int args)方法,將節點加入到同步隊列的隊列尾部。
(3) 最后調用acquireQueued(final Node node, int args)方法,使該節點以死循環的方式獲取同步狀態,如果獲取不到,則阻塞節點中的線程。acquireQueued方法當前線程在死循環中獲取同步狀態,而只有前驅節點是頭節點的時候才能嘗試獲取鎖(同步狀態)( p == head && tryAcquire(arg))。
原因是:1.頭結點是成功獲取同步狀態的節點,而頭結點的線程釋放鎖以后,將喚醒后繼節點,后繼節點線程被喚醒后要檢查自己的前驅節點是否為頭結點。
2.維護同步隊列的FIFO原則,節點進入同步隊列以后,就進入了一個自旋的過程,每個節點(后者說每個線程)都在自省的觀察。
下圖為節點自旋檢查自己的前驅節點是否為頭結點:

圖 4 節點自旋獲取同步狀態
獨占式的鎖的獲取源碼:
acquire方法源碼如下
1 /** 2 * Acquires in exclusive(互斥) mode, ignoring(忽視) interrupts. Implemented 3 * by invoking at least once {@link #tryAcquire}, 4 * returning on success. Otherwise the thread is queued(排隊), possibly 5 * repeatedly(反復的) blocking and unblocking, invoking {@link 6 * #tryAcquire} until success. This method can be used 7 * to implement method {@link Lock#lock}. 8 * 9 * @param arg the acquire argument. This value is conveyed(傳達) to 10 * {@link #tryAcquire} but is otherwise uninterpreted and 11 * can represent anything you like. 12 * 13 * 獨占式的獲取同步狀態 14 * 15 */ 16 public final void acquire(int arg) { 17 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 18 selfInterrupt(); 19 }
嘗試獲取鎖:tryAcquire方法:如果獲取到了鎖,tryAcquire返回true,反之,返回false。
1 //方法2: 2 protected final boolean tryAcquire(int acquires) { 3 // 獲取當前線程 4 final Thread current = Thread.currentThread(); 5 // 獲取“獨占鎖”的狀態,獲取父類AQS的標志位 6 int c = getState(); 7 //c == 0 意思是鎖(同步狀態)沒有被任何線程所獲取 8 //1.當前線程是否是同步隊列中頭結點Node,如果是的話,則獲取該鎖,設置鎖的狀態,並設置鎖的擁有者為當前線程 9 if (c == 0) { 10 if (!hasQueuedPredecessors() && 11 12 // 修改下狀態為,這里的acquires的值是1,是寫死的調用子類的lock的方法的時候傳進來的,如果c == 0,compareAndSetState操作會更新成功為1. 13 compareAndSetState(0, acquires)) { 14 // 上面CAS操作更新成功為1,表示當前線程獲取到了鎖,因為將當前線程設置為AQS的一個變量中,代表這個線程拿走了鎖。 15 setExclusiveOwnerThread(current); 16 return true; 17 } 18 } 19 //2.如果c不為0,即狀態不為0,表示鎖已經被拿走。 20 //因為ReetrantLock是可重入鎖,是可以重復lock和unlock的,所以這里還要判斷一次,獲取鎖的線程是否為當前請求鎖的線程。 21 else if (current == getExclusiveOwnerThread()) { 22 //如果是,state繼續加1,這里nextc的結果就會 > 1,這個判斷表示獲取到的鎖的線程,還可以再獲取鎖,這里就是說的可重入的意思 23 int nextc = c + acquires; 24 if (nextc < 0) 25 throw new Error("Maximum lock count exceeded"); 26 setState(nextc); 27 return true; 28 } 29 return false; 30 }
addWaiter方法的源碼:回到aquire方法,如果嘗試獲取同步狀態(鎖)失敗的話,則構造同步節點(獨占式的Node.EXCLUSIVE),
通過addWaiter(Node node,int args)方法
將該節點加入到同步隊列的隊尾。
1 /** 2 * Creates and enqueues node for current thread and given mode. 3 * 4 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 5 * @return the new node 6 * 7 * 8 * 如果嘗試獲取同步狀態失敗的話,則構造同步節點(獨占式的Node.EXCLUSIVE),通過addWaiter(Node node,int args)方法將該節點加入到同步隊列的隊尾。 9 * 10 */ 11 private Node addWaiter(Node mode) { 12 // 用當前線程夠着一個Node對象,mode是一個表示Node類型的字段,或者說是這個節點是獨占的還是共享的,或者說AQS的這個隊列中,哪些節點是獨占的,哪些節點是共享的。 13 Node node = new Node(Thread.currentThread(), mode); 14 // Try the fast path of enq; backup to full enq on failure 15 Node pred = tail; 16 //隊列不為空的時候 17 if (pred != null) { 18 node.prev = pred; 19 // 確保節點能夠被線程安全的添加,使用CAS方法 20 // 嘗試修改為節點為最新的節點,如果修改失敗,意味着有並發,這個時候進入enq中的死循環,進行“自旋”的方式修改 21 if (compareAndSetTail(pred, node)) { 22 pred.next = node; 23 return node; 24 } 25 } 26 //進入自旋 27 enq(node); 28 return node; 29 }
enq方法的源碼:同步器通過死循環的方式來保證節點的正確添加,在“死循環” 中通過CAS將節點設置成為尾節點之后,當前線程才能從該方法中返回,否則
當前線程不斷的嘗試設置。
enq方法將並發添加節點的請求通過CAS變得“串行化”了。
1 /** 2 * Inserts node into queue, initializing if necessary. See picture above. 3 * @param node the node to insert 4 * @return node's predecessor 5 * 6 * 同步器通過死循環的方式來保證節點的正確添加,在“死循環” 中通過CAS將節點設置成為尾節點之后,當前線程才能從該方法中返回,否則當前線程不斷的嘗試設置。 7 * enq方法將並發添加節點的請求通過CAS變得“串行化”了。 8 * 9 */ 10 private Node enq(final Node node) { 11 for (;;) { 12 Node t = tail; 13 if (t == null) { // Must initialize 14 if (compareAndSetHead(new Node())) 15 tail = head; 16 } else { 17 node.prev = t; 18 if (compareAndSetTail(t, node)) { 19 t.next = node; 20 return t; 21 } 22 } 23 } 24 }
acquireQueued方法:在隊列中的線程獲取鎖的過程:
1 /** 2 * Acquires in exclusive uninterruptible mode for thread already in 3 * queue. Used by condition wait methods as well as acquire. 4 * 5 * @param node the node 6 * @param arg the acquire argument 7 * @return {@code true} if interrupted while waiting 8 * 9 * acquireQueued方法當前線程在死循環中獲取同步狀態,而只有前驅節點是頭節點才能嘗試獲取同步狀態(鎖)( p == head && tryAcquire(arg)) 10 * 原因是:1.頭結點是成功獲取同步狀態(鎖)的節點,而頭節點的線程釋放了同步狀態以后,將會喚醒其后繼節點,后繼節點的線程被喚醒后要檢查自己的前驅節點是否為頭結點。 11 * 2.維護同步隊列的FIFO原則,節點進入同步隊列之后,就進入了一個自旋的過程,每個節點(或者說是每個線程)都在自省的觀察。 12 * 13 */ 14 final boolean acquireQueued(final Node node, int arg) { 15 boolean failed = true; 16 try { 17 boolean interrupted = false; 18 //死循環檢查(自旋檢查)當前節點的前驅節點是否為頭結點,才能獲取鎖 19 for (;;) { 20 // 獲取節點的前驅節點 21 final Node p = node.predecessor(); 22 if (p == head && tryAcquire(arg)) {//節點中的線程循環的檢查,自己的前驅節點是否為頭節點 23 //將當前節點設置為頭結點,移除之前的頭節點 24 setHead(node); 25 p.next = null; // help GC 26 failed = false; 27 return interrupted; 28 } 29 // 否則檢查前一個節點的狀態,看當前獲取鎖失敗的線程是否要掛起 30 if (shouldParkAfterFailedAcquire(p, node) && 31 //如果需要掛起,借助JUC包下面的LockSupport類的靜態方法park掛起當前線程,直到被喚醒 32 parkAndCheckInterrupt()) 33 interrupted = true; 34 } 35 } finally { 36 //如果有異常 37 if (failed) 38 //取消請求,將當前節點從隊列中移除 39 cancelAcquire(node); 40 } 41 }
獨占式的獲取同步狀態的流程如下:

圖5 獨占式的獲取同步狀態的流程
7.獨占鎖的釋放:下面直接看源碼:
1 /* 2 1. unlock():unlock()是解鎖函數,它是通過AQS的release()函數來實現的。 3 * 在這里,“1”的含義和“獲取鎖的函數acquire(1)的含義”一樣,它是設置“釋放鎖的狀態”的參數。 4 * 由於“公平鎖”是可重入的,所以對於同一個線程,每釋放鎖一次,鎖的狀態-1。 5 6 unlock()在ReentrantLock.java中實現的,源碼如下: 7 */ 8 public void unlock() { 9 sync.release(1); 10 }
release()會調用tryRelease方法嘗試釋放當前線程持有的鎖(同步狀態),成功的話喚醒后繼線程,並返回true,否則直接返回false
1 /** 2 * Releases in exclusive mode. Implemented by unblocking one or 3 * more threads if {@link #tryRelease} returns true. 4 * This method can be used to implement method {@link Lock#unlock}. 5 * 6 * @param arg the release argument. This value is conveyed to 7 * {@link #tryRelease} but is otherwise uninterpreted and 8 * can represent anything you like. 9 * @return the value returned from {@link #tryRelease} 10 * 11 * 12 * 13 */ 14 public final boolean release(int arg) { 15 if (tryRelease(arg)) { 16 Node h = head; 17 if (h != null && h.waitStatus != 0) 18 unparkSuccessor(h); 19 return true; 20 } 21 return false; 22 }
1 // tryRelease() 嘗試釋放當前線程的同步狀態(鎖) 2 protected final boolean tryRelease(int releases) { 3 //c為釋放后的同步狀態 4 int c = getState() - releases; 5 //判斷當前釋放鎖的線程是否為獲取到鎖(同步狀態)的線程,不是拋出異常(非法監視器狀態異常) 6 if (Thread.currentThread() != getExclusiveOwnerThread()) 7 throw new IllegalMonitorStateException(); 8 boolean free = false; 9 //如果鎖(同步狀態)已經被當前線程徹底釋放,則設置鎖的持有者為null,同步狀態(鎖)變的可獲取 10 if (c == 0) { 11 free = true; 12 setExclusiveOwnerThread(null); 13 } 14 setState(c); 15 return free; 16 }
釋放鎖成功后,找到AQS的頭結點,並喚醒它即可:
1 // 4. 喚醒頭結點的后繼節點 2 3 private void unparkSuccessor(Node node) { 4 //獲取頭結點(線程)的狀態 5 int ws = node.waitStatus; 6 //如果狀態<0,設置當前線程對應的鎖的狀態為0 7 if (ws < 0) 8 compareAndSetWaitStatus(node, ws, 0); 9 10 Node s = node.next; 11 12 //解釋:Thread to unpark is held in successor, which is normally just the next node. 13 //But if cancelled or apparently(顯然) null, traverse backwards(向后遍歷) from tail to find the actual(實際的) non-cancelled successor(前繼節點). 14 //從隊列尾部開始往前去找最前面的一個waitStatus小於0的節點。 15 if (s == null || s.waitStatus > 0) { 16 s = null; 17 for (Node t = tail; t != null && t != node; t = t.prev) 18 if (t.waitStatus <= 0) 19 s = t; 20 } 21 //喚醒后繼節點對應的線程 22 if (s != null) 23 LockSupport.unpark(s.thread); 24 }
上面說的是ReentrantLock的公平鎖獲取和釋放的AQS的源碼,唯獨還剩下一個非公平鎖NonfairSync沒說,其實,它和公平鎖的唯一區別就是獲取鎖的方式不同,公平鎖是按前后順序一次獲取鎖,非公平鎖是搶占式的獲取鎖,那ReentrantLock中的非公平鎖NonfairSync是怎么實現的呢?
1 /** 2 * Sync object for non-fair locks 3 */ 4 static final class NonfairSync extends Sync { 5 private static final long serialVersionUID = 7316153563782823691L; 6 7 /** 8 * Performs lock. Try immediate barge, backing up to normal 9 * acquire on failure. 10 */ 11 final void lock() { 12 if (compareAndSetState(0, 1)) 13 setExclusiveOwnerThread(Thread.currentThread()); 14 else 15 acquire(1); 16 } 17 18 protected final boolean tryAcquire(int acquires) { 19 return nonfairTryAcquire(acquires); 20 } 21 }
非公平鎖的lock的時候多了上面加粗的代碼:在lock的時候先直接用cas判斷state變量是否為0(嘗試獲取鎖),成功的話更新成1,表示當前線程獲取到了鎖,不需要在排隊,從而直接搶占的目的。而對於公平鎖的lock方法是一開始就走AQS的雙向隊列排隊獲取鎖。更詳細的關於ReentrantLock的實現請看后面寫的一篇文章:http://www.cnblogs.com/200911/p/6035765.html
總結:在獲取同步狀態的時候,同步器維護一個同步隊列,獲取失敗的線程會被加入到隊列中並在隊列中自旋;移除隊列(或停止自旋)的條件是前驅節點為頭結點並且獲取到了同步狀態。在釋放同步狀態時,同步器調用tryRelease(int args)方法釋放同步狀態,然后喚醒頭結點的后繼節點。AQS的實現思路其實並不復雜,用一句話准確的描述的話,其實就是使用標志狀態位status(volatile int state)和 一個雙向隊列的入隊和出隊來實現。AQS維護一個線程何時訪問的狀態,它只是對狀態負責,而這個狀態的含義,子類可以自己去定義。
