1.什么是AQS?
AQS的核心思想是基於volatile int state這樣的volatile變量,配合Unsafe工具對其原子性的操作來實現對當前鎖狀態進行修改。同步器內部依賴一個FIFO的雙向隊列來完成資源獲取線程的排隊工作。
2.同步器的應用
同步器主要使用方式是繼承,子類通過繼承同步器並實現它的抽象方法來管理同步狀態,對同步狀態的修改或者訪問主要通過同步器提供的3個方法:
- getState() 獲取當前的同步狀態
- setState(int newState) 設置當前同步狀態
- compareAndSetState(int expect,int update) 使用CAS設置當前狀態,該方法能夠保證狀態設置的原子性。
同步器可以支持獨占式的獲取同步狀態,也可以支持共享式的獲取同步狀態,這樣可以方便實現不同類型的同步組件。
同步器也是實現鎖的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。
3.AQS同步隊列
同步器AQS內部的實現是依賴同步隊列(一個FIFO的雙向隊列,其實就是數據結構雙向鏈表)來完成同步狀態的管理。
當前線程獲取同步狀態失敗時,同步器AQS會將當前線程和等待狀態等信息構造成為一個節點(node)加入到同步隊列,同時會阻塞當前線程;
當同步狀態釋放的時候,會把首節點中的線程喚醒,使首節點的線程再次嘗試獲取同步狀態。AQS是獨占鎖和共享鎖的實現的父類。
4.AQS鎖的類別:分為獨占鎖和共享鎖兩種。
- 獨占鎖:鎖在一個時間點只能被一個線程占有。根據鎖的獲取機制,又分為“公平鎖”和“非公平鎖”。等待隊列中按照FIFO的原則獲取鎖,等待時間越長的線程越先獲取到鎖,這就是公平的獲取鎖,即公平鎖。而非公平鎖,線程獲取的鎖的時候,無視等待隊列直接獲取鎖。ReentrantLock和ReentrantReadWriteLock.Writelock是獨占鎖。
- 共享鎖:同一個時候能夠被多個線程獲取的鎖,能被共享的鎖。JUC包中ReentrantReadWriteLock.ReadLock,CyclicBarrier,CountDownLatch和Semaphore都是共享鎖。
JUC包中的鎖的包括:Lock接口,ReadWriteLock接口;Condition條件,LockSupport阻塞原語。
AbstractOwnableSynchronizer/AbstractQueuedSynchronizer/AbstractQueuedLongSynchronizer三個抽象類,
ReentrantLock獨占鎖,ReentrantReadWriteLock讀寫鎖。CountDownLatch,CyclicBarrier和Semaphore也是通過AQS來實現的。
下面是AQS和使用AQS實現的一些鎖,以及通過AQS實現的一些工具類的架構圖:
圖 1.依賴AQS實現的鎖和工具類
5.AQS同步器的結構:同步器擁有首節點(head)和尾節點(tail)。同步隊列的基本結構如下:
圖 1.同步隊列的基本結構 compareAndSetTail(Node expect,Node update)
- 同步隊列設置尾節點(未獲取到鎖的線程加入同步隊列): 同步器AQS中包含兩個節點類型的引用:一個指向頭結點的引用(head),一個指向尾節點的引用(tail),當一個線程成功的獲取到鎖(同步狀態),其他線程無法獲取到鎖,而是被構造成節點(包含當前線程,等待狀態)加入到同步隊列中等待獲取到鎖的線程釋放鎖。這個加入隊列的過程,必須要保證線程安全。否則如果多個線程的環境下,可能造成添加到隊列等待的節點順序錯誤,或者數量不對。因此同步器提供了CAS原子的設置尾節點的方法(保證一個未獲取到同步狀態的線程加入到同步隊列后,下一個未獲取的線程才能夠加入)。 如下圖,設置尾節點:
圖 2.尾節點的設置 節點加入到同步隊列
- 同步隊列設置首節點(原頭節點釋放鎖,喚醒后繼節點):同步隊列遵循FIFO,頭節點是獲取鎖(同步狀態)成功的節點,頭節點在釋放同步狀態的時候,會喚醒后繼節點,而后繼節點將會在獲取鎖(同步狀態)成功時候將自己設置為頭節點。設置頭節點是由獲取鎖(同步狀態)成功的線程來完成的,由於只有一個線程能夠獲取同步狀態,則設置頭節點的方法不需要CAS保證,只需要將頭節點設置成為原首節點的后繼節點 ,並斷開原頭結點的next引用。如下圖,設置首節點:
圖 3.首節點的設置
6.獨占式的鎖的獲取:調用同步器的acquire(int arg)方法可以獲取同步狀態,該方法對中斷不敏感,即線程獲取同步狀態失敗后進入同步隊列,后續對線程進行中斷操作時,線程不會從同步隊列中移除。
(1) 當前線程實現通過tryAcquire()方法嘗試獲取鎖,獲取成功的話直接返回,如果嘗試失敗的話,進入等待隊列排隊等待,可以保證線程安全(CAS)的獲取同步狀態。
(2) 如果嘗試獲取鎖失敗的話,構造同步節點(獨占式的Node.EXCLUSIVE),通過addWaiter(Node node,int args)方法,將節點加入到同步隊列的隊列尾部。
(3) 最后調用acquireQueued(final Node node, int args)方法,使該節點以死循環的方式獲取同步狀態,如果獲取不到,則阻塞節點中的線程。acquireQueued方法當前線程在死循環中獲取同步狀態,而只有前驅節點是頭節點的時候才能嘗試獲取鎖(同步狀態)( p == head && tryAcquire(arg))。
原因是:1.頭結點是成功獲取同步狀態的節點,而頭結點的線程釋放鎖以后,將喚醒后繼節點,后繼節點線程被喚醒后要檢查自己的前驅節點是否為頭結點。
2.維護同步隊列的FIFO原則,節點進入同步隊列以后,就進入了一個自旋的過程,每個節點(后者說每個線程)都在自省的觀察。
下圖為節點自旋檢查自己的前驅節點是否為頭結點:
圖 4 節點自旋獲取同步狀態
獨占式的鎖的獲取源碼:
acquire方法源碼如下
/** * Acquires in exclusive(互斥) mode, ignoring(忽視) interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued(排隊), possibly * repeatedly(反復的) blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed(傳達) to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. * * 獨占式的獲取同步狀態 * */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
嘗試獲取鎖:tryAcquire方法:如果獲取到了鎖,tryAcquire返回true,反之,返回false。
//方法2: protected final boolean tryAcquire(int acquires) { // 獲取當前線程 final Thread current = Thread.currentThread(); // 獲取“獨占鎖”的狀態,獲取父類AQS的標志位 int c = getState(); //c == 0 意思是鎖(同步狀態)沒有被任何線程所獲取 //1.當前線程是否是同步隊列中頭結點Node,如果是的話,則獲取該鎖,設置鎖的狀態,並設置鎖的擁有者為當前線程 if (c == 0) { if (!hasQueuedPredecessors() &&
// 修改下狀態為,這里的acquires的值是1,是寫死的調用子類的lock的方法的時候傳進來的,如果c == 0,compareAndSetState操作會更新成功為1. compareAndSetState(0, acquires)) {
// 上面CAS操作更新成功為1,表示當前線程獲取到了鎖,因為將當前線程設置為AQS的一個變量中,代表這個線程拿走了鎖。 setExclusiveOwnerThread(current); return true; } } //2.如果c不為0,即狀態不為0,表示鎖已經被拿走。
//因為ReetrantLock是可重入鎖,是可以重復lock和unlock的,所以這里還要判斷一次,獲取鎖的線程是否為當前請求鎖的線程。 else if (current == getExclusiveOwnerThread()) {
//如果是,state繼續加1,這里nextc的結果就會 > 1,這個判斷表示獲取到的鎖的線程,還可以再獲取鎖,這里就是說的可重入的意思 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
addWaiter方法的源碼:回到aquire方法,如果嘗試獲取同步狀態(鎖)失敗的話,則構造同步節點(獨占式的Node.EXCLUSIVE),
通過addWaiter(Node node,int args)方法
將該節點加入到同步隊列的隊尾。
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node * * * 如果嘗試獲取同步狀態失敗的話,則構造同步節點(獨占式的Node.EXCLUSIVE),通過addWaiter(Node node,int args)方法將該節點加入到同步隊列的隊尾。 * */ private Node addWaiter(Node mode) {
// 用當前線程夠着一個Node對象,mode是一個表示Node類型的字段,或者說是這個節點是獨占的還是共享的,或者說AQS的這個隊列中,哪些節點是獨占的,哪些節點是共享的。 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail;
//隊列不為空的時候 if (pred != null) { node.prev = pred; // 確保節點能夠被線程安全的添加,使用CAS方法
// 嘗試修改為節點為最新的節點,如果修改失敗,意味着有並發,這個時候進入enq中的死循環,進行“自旋”的方式修改 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } }
//進入自旋 enq(node); return node; }
enq方法的源碼:同步器通過死循環的方式來保證節點的正確添加,在“死循環” 中通過CAS將節點設置成為尾節點之后,當前線程才能從該方法中返回,否則
當前線程不斷的嘗試設置。
enq方法將並發添加節點的請求通過CAS變得“串行化”了。
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor * * 同步器通過死循環的方式來保證節點的正確添加,在“死循環” 中通過CAS將節點設置成為尾節點之后,當前線程才能從該方法中返回,否則當前線程不斷的嘗試設置。 * enq方法將並發添加節點的請求通過CAS變得“串行化”了。 * */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued方法:在隊列中的線程獲取鎖的過程:
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting * * acquireQueued方法當前線程在死循環中獲取同步狀態,而只有前驅節點是頭節點才能嘗試獲取同步狀態(鎖)( p == head && tryAcquire(arg)) * 原因是:1.頭結點是成功獲取同步狀態(鎖)的節點,而頭節點的線程釋放了同步狀態以后,將會喚醒其后繼節點,后繼節點的線程被喚醒后要檢查自己的前驅節點是否為頭結點。 * 2.維護同步隊列的FIFO原則,節點進入同步隊列之后,就進入了一個自旋的過程,每個節點(或者說是每個線程)都在自省的觀察。 * */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false;
//死循環檢查(自旋檢查)當前節點的前驅節點是否為頭結點,才能獲取鎖 for (;;) {
// 獲取節點的前驅節點 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//節點中的線程循環的檢查,自己的前驅節點是否為頭節點
//將當前節點設置為頭結點,移除之前的頭節點 setHead(node); p.next = null; // help GC failed = false; return interrupted; }
// 否則檢查前一個節點的狀態,看當前獲取鎖失敗的線程是否要掛起 if (shouldParkAfterFailedAcquire(p, node) &&
//如果需要掛起,借助JUC包下面的LockSupport類的靜態方法park掛起當前線程,直到被喚醒
parkAndCheckInterrupt()) interrupted = true; } } finally {
//如果有異常 if (failed)
//取消請求,將當前節點從隊列中移除 cancelAcquire(node); } }
獨占式的獲取同步狀態的流程如下:
圖5 獨占式的獲取同步狀態的流程
7.獨占鎖的釋放:下面直接看源碼:
/*
1. unlock():unlock()是解鎖函數,它是通過AQS的release()函數來實現的。 * 在這里,“1”的含義和“獲取鎖的函數acquire(1)的含義”一樣,它是設置“釋放鎖的狀態”的參數。 * 由於“公平鎖”是可重入的,所以對於同一個線程,每釋放鎖一次,鎖的狀態-1。 unlock()在ReentrantLock.java中實現的,源碼如下: */ public void unlock() { sync.release(1); }
release()會調用tryRelease方法嘗試釋放當前線程持有的鎖(同步狀態),成功的話喚醒后繼線程,並返回true,否則直接返回false
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} * * * */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
// tryRelease() 嘗試釋放當前線程的同步狀態(鎖) protected final boolean tryRelease(int releases) { //c為釋放后的同步狀態 int c = getState() - releases; //判斷當前釋放鎖的線程是否為獲取到鎖(同步狀態)的線程,不是拋出異常(非法監視器狀態異常) if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //如果鎖(同步狀態)已經被當前線程徹底釋放,則設置鎖的持有者為null,同步狀態(鎖)變的可獲取 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
釋放鎖成功后,找到AQS的頭結點,並喚醒它即可:
// 4. 喚醒頭結點的后繼節點 private void unparkSuccessor(Node node) { //獲取頭結點(線程)的狀態 int ws = node.waitStatus; //如果狀態<0,設置當前線程對應的鎖的狀態為0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //解釋:Thread to unpark is held in successor, which is normally just the next node. //But if cancelled or apparently(顯然) null, traverse backwards(向后遍歷) from tail to find the actual(實際的) non-cancelled successor(前繼節點). //從隊列尾部開始往前去找最前面的一個waitStatus小於0的節點。 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //喚醒后繼節點對應的線程 if (s != null) LockSupport.unpark(s.thread); }
上面說的是ReentrantLock的公平鎖獲取和釋放的AQS的源碼,唯獨還剩下一個非公平鎖NonfairSync沒說,其實,它和公平鎖的唯一區別就是獲取鎖的方式不同,公平鎖是按前后順序一次獲取鎖,非公平鎖是搶占式的獲取鎖,那ReentrantLock中的非公平鎖NonfairSync是怎么實現的呢?
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
非公平鎖的lock的時候多了上面加粗的代碼:在lock的時候先直接用cas判斷state變量是否為0(嘗試獲取鎖),成功的話更新成1,表示當前線程獲取到了鎖,不需要在排隊,從而直接搶占的目的。而對於公平鎖的lock方法是一開始就走AQS的雙向隊列排隊獲取鎖。更詳細的關於ReentrantLock的實現請看后面寫的一篇文章:http://www.cnblogs.com/200911/p/6035765.html
總結:在獲取同步狀態的時候,同步器維護一個同步隊列,獲取失敗的線程會被加入到隊列中並在隊列中自旋;移除隊列(或停止自旋)的條件是前驅節點為頭結點並且獲取到了同步狀態。在釋放同步狀態時,同步器調用tryRelease(int args)方法釋放同步狀態,然后喚醒頭結點的后繼節點。AQS的實現思路其實並不復雜,用一句話准確的描述的話,其實就是使用標志狀態位status(volatile int state)和 一個雙向隊列的入隊和出隊來實現。AQS維護一個線程何時訪問的狀態,它只是對狀態負責,而這個狀態的含義,子類可以自己去定義。
自己注釋的AQS的源碼:如下:

public class AbstractQueuedSynchronizerTest { /** * * (AQS節點的定義,同步隊列的節點定義) * * <p> * 修改歷史: <br> * 修改日期 修改人員 版本 修改內容<br> * -------------------------------------------------<br> * 2016年7月4日 上午10:26:38 user 1.0 初始化創建<br> * </p> * * @author Peng.Li * @version 1.0 * @since JDK1.7 */ static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode * * */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled * 在同步隊列中等待的線程等待超時或者被中斷,需要從同步隊列中取消等待 * */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking(喚醒) * 后繼節點的線程處於等待狀態,而當前的節點如果釋放了同步狀態或者被取消,將會通知后繼節點,使后繼節點的線程得以運行。 **/ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition * 節點在等待隊列中,節點的線程等待在Condition上,當其他線程對Condition調用了signal()方法后,該節點會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中 **/ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally(無條件的) propagate(傳播) * * 表示下一次共享式同步狀態獲取將會被無條件地傳播下去 */ static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated(傳播) to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened(干涉). * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). * * 使用CAS更改狀態,volatile保證線程可見性,即被一個線程修改后,狀態會立馬讓其他線程可見。 * */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueing(入隊), and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. * * 前驅節點,當前節點加入到同步隊列中被設置 */ volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. * * 后繼節點 */ volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. * * 獲取同步狀態的線程 */ volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive(獨有的) mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred(移動到) to the queue(同步隊列) to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. * * 等待隊列中的后繼節點,如果當前節點是共享的,那么這個字段是一個SHARED常量, * 也就是說節點類型(獨占和共享)和等待隊列中的后繼節點共用同一個字段。 */ Node nextWaiter; /** * Returns true if node is waiting in shared mode */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } /** * Head of the wait queue, lazily initialized. Except for (除...以外) * initialization(初始化), it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED.(如果head引用已經存在,等待狀態保證不會被取消) */ private transient volatile Node head; /** * Tail of the wait queue(等待隊列), lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail; /** * The synchronization state. * 同步狀態,線程可見的,共享內存里面保存 * */ private volatile int state; /** * Returns the current value of synchronization state. * This operation has memory semantics of a <tt>volatile</tt> read. * @return current state value * * 得到同步狀態的值 * */ protected final int getState() { return state; } /** * Sets the value of synchronization state. * This operation has memory semantics of a <tt>volatile</tt> write. * @param newState the new state value */ protected final void setState(int newState) { state = newState; } /** * Acquires in exclusive(互斥) mode, ignoring(忽視) interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued(排隊), possibly * repeatedly(反復的) blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed(傳達) to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. * * 獨占式的獲取同步狀態 * */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node * * * 如果嘗試獲取同步狀態失敗的話,則構造同步節點(獨占式的Node.EXCLUSIVE),通過 addWaiter(Node node,int args)方法將該節點加入到同步隊列的隊尾。 * */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; // 確保節點能夠被安全的添加 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** * Convenience method to interrupt current thread. * 分析:如果在acquireQueued()中,當前線程被中斷過,則執行selfInterrupt();否則不會執行。 * 線程在阻塞狀態被“中斷喚醒”而獲取CPU的執行權;但是該線程前面還有其他等待鎖的線程,根據公平性原則,該線程仍然無法獲取到鎖,他會再次阻塞。 * 直到該線程被他前面等待鎖的線程喚醒;線程才會獲取鎖。該線程“成功獲取鎖並真正執行起來之前”,他的中斷會被忽略並且中斷標記會被清除,因為在parkAndCheckInterrupt()中, * 我們線程的中斷狀態時調用了Thread.interrupted(),這個函數在返回中斷狀態之后,還會清除中斷狀態,正因為清除了中斷狀態,所以在selfInterrupt重新產生一個中斷。 * * * 當前線程自己產生一個中斷 */ private static void selfInterrupt() { Thread.currentThread().interrupt(); } /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting * * acquireQueued方法當前線程在死循環中獲取同步狀態,而只有前驅節點是頭節點才能嘗試獲取同步狀態( p == head && tryAcquire(arg)) * 原因是:1.頭結點是成功獲取同步狀態的節點,而頭節點的線程釋放了同步狀態以后,將會喚醒其后繼節點,后繼節點的線程被喚醒后要檢查自己的前驅節點是否為頭結點。 * 2.維護同步隊列的FIFO原則,節點進入同步隊列之后,就進入了一個自旋的過程,每個節點(或者說是每個線程)都在自省的觀察。 * */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor * * 同步器通過死循環的方式來保證節點的正確添加,在“死循環” 中通過CAS將節點設置成為尾節點之后,當前線程才能從該方法中返回,否則當前線程不斷的嘗試設置。 * enq方法將並發添加節點的請求通過CAS變得“串行化”了。 * */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted * * 阻塞當前線程 * */ private final boolean parkAndCheckInterrupt() { // 阻塞當前線程 LockSupport.park(this); // 線程被喚醒之后的中斷狀態 return Thread.interrupted(); } /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} * * 釋放公平鎖 * */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } /** * Attempts to set the state to reflect a release in exclusive * mode. * * <p>This method is always invoked by the thread performing release. * * <p>The default implementation throws * {@link UnsupportedOperationException}. * * @param arg the release argument. This value is always the one * passed to a release method, or the current state value upon * entry to a condition wait. The value is otherwise * uninterpreted and can represent anything you like. * @return {@code true} if this object is now in a fully released * state, so that any waiting threads may attempt to acquire; * and {@code false} otherwise. * @throws IllegalMonitorStateException if releasing would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block * 返回當前線程是否應該阻塞 * * 說明: (01) 關於waitStatus請參考下表(中擴號內為waitStatus的值),更多關於waitStatus的內容,可以參考前面的Node類的介紹。 CANCELLED[1] -- 當前線程已被取消 SIGNAL[-1] -- “當前線程的后繼線程需要被unpark(喚醒)”。一般發生情況是:當前線程的后繼線程處於阻塞狀態,而當前線程被release或cancel掉,因此需要喚醒當前線程的后繼線程。 CONDITION[-2] -- 當前線程(處在Condition休眠狀態)在等待Condition喚醒 PROPAGATE[-3] -- (共享鎖)其它線程獲取到“共享鎖” [0] -- 當前線程不屬於上面的任何一種狀態。 (02) shouldParkAfterFailedAcquire()通過以下規則,判斷“當前線程”是否需要被阻塞。 規則1:如果前繼節點狀態為SIGNAL,表明當前節點需要被unpark(喚醒),此時則返回true。 規則2:如果前繼節點狀態為CANCELLED(ws>0),說明前繼節點已經被取消,則通過先前回溯找到一個有效(非CANCELLED狀態)的節點,並返回false。 規則3:如果前繼節點狀態為非SIGNAL、非CANCELLED,則設置前繼的狀態為SIGNAL,並返回false。 * */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前驅節點的狀態 int ws = pred.waitStatus; // 如果前驅節點是SIGNAL狀態,則意味着當前線程需要unpark喚醒,此時返回true if (ws == Node.SIGNAL) /* * This node has already set status asking a release to signal it, so it can safely park. */ return true; // 如果前繼節點是取消的狀態,則設置當前節點的“當前前繼節點為”原節點的前繼節點 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we need a signal, but don't park yet. Caller will need to retry to make sure * it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * Cancels an ongoing attempt to acquire. * * @param node the node */ private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } } /** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try to clear in anticipation of signalling. It is OK if this fails or if * status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally just the next node. But if cancelled or apparently null, traverse * backwards from tail to find the actual non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } /** * Sets head of queue to be node, thus dequeuing. Called only by * acquire methods. Also nulls out unused fields for sake of GC * and to suppress unnecessary signals and traversals. * * @param node the node */ private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a <tt>volatile</tt> read * and write. * * @param expect the expected value * @param update the new value * @return true if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } /** * CAS waitStatus field of a node. */ private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } /** * CAS next field of a node. */ private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); } /** * CAS tail field. Used only by enq. */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } /** * CAS head field. Used only by enq. */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * Setup to support compareAndSet. We need to natively implement * this here: For the sake of permitting future enhancements, we * cannot explicitly subclass AtomicInteger, which would be * efficient and useful otherwise. So, as the lesser of evils, we * natively implement using hotspot intrinsics(編譯器內部函數) API. And while we * are at it, we do the same for other CASable fields (which could * otherwise be done with atomic field updaters). */ private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } }
AbstractOwnableSynchronizer的源碼如下:

package concurrentMy.aqs; /** * * (設置和獲取鎖的持有者線程) * * <p> * 修改歷史: <br> * 修改日期 修改人員 版本 修改內容<br> * -------------------------------------------------<br> * 2016年7月5日 下午3:42:37 user 1.0 初始化創建<br> * </p> * * @author Peng.Li * @version 1.0 * @since JDK1.7 */ public abstract class AbstractOwnableSynchronizerTest implements java.io.Serializable { /** Use serial ID even though all fields transient. */ private static final long serialVersionUID = 3737899427754241961L; /** * Empty constructor for use by subclasses. */ protected AbstractOwnableSynchronizerTest() { } /** * The current owner of exclusive mode synchronization. * * 加 transient 表示exclusiveOwnerThread不能被串行化,不會被作為序列化的一部分 * * 鎖的持有線程 */ private transient Thread exclusiveOwnerThread; /** * Sets the thread that currently owns exclusive access. A * <tt>null</tt> argument indicates that no thread owns access. * This method does not otherwise impose any synchronization or * <tt>volatile</tt> field accesses. * * protected final來修飾,表示子類可以使用這個方法,但是不能重載這個方法,也就是不能修改這個方法 */ protected final void setExclusiveOwnerThread(Thread t) { exclusiveOwnerThread = t; } /** * Returns the thread last set by * <tt>setExclusiveOwnerThread</tt>, or <tt>null</tt> if never * set. This method does not otherwise impose any synchronization * or <tt>volatile</tt> field accesses. * @return the owner thread */ protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
參考文章:
1.Doug Lea的論文: http://gee.cs.oswego.edu/dl/papers/aqs.pdf
2. 深度解析Java 8:JDK1.8 AbstractQueuedSynchronizer的實現分析(上): http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer
3. 深度解析Java 8:AbstractQueuedSynchronizer的實現分析(下): http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizer
4. 深入淺出 Java Concurrency (7): 鎖機制 part 2 AQS: http://www.blogjava.net/xylz/archive/2010/07/06/325390.html
5 AQS:http://www.cnblogs.com/leesf456/p/5350186.html
6.參考:https://tech.meituan.com/Java_Lock.html