一、AQS介紹
AQS,即AbstractQueuedSynchronizer, 抽象隊列同步器,它是Java多線程模塊用來構建鎖和其他同步組件的基礎框架。來看下同步組件對AQS的使用:
AQS是一個抽象類,主是是以繼承的方式使用。AQS本身是沒有實現任何同步接口的,它僅僅只是定義了同步狀態的獲取和釋放的方法來供自定義的同步組件的使用。從圖中可以看出,在java的同步組件中,AQS的子類(Sync等)一般是同步組件的靜態內部類,即通過組合的方式使用。
二、AQS原理介紹
AQS的實現依賴於內部的同步隊列,它是一個FIFO雙向隊列,如果當前線程獲取同步狀態失敗,AQS會將該線程以及等待狀態等信息構造成一個Node,將其加入同步隊列的尾部,同時阻塞當前線程,當同步狀態釋放時,喚醒隊列的頭節點。
具體看下,首先來看AQS最主要的三個成員變量:
private transient volatile Node head; private transient volatile Node tail; private volatile int state;
上面提到的同步狀態就是變量state。 head和tail分別是同步隊列的頭結點和尾結點。state=0表示同步狀態可用(如果用於鎖,則表示鎖可用),state=1表示同步狀態已被占用(鎖被占用)。
下面舉例說下獲取和釋放同步狀態的過程:
1.獲取同步狀態
假設線程A要獲取同步狀態,初始狀態state=0,線程A可以順利獲取鎖,A獲取鎖后將state置為1。在A沒有釋放鎖期間,線程B來獲取鎖,此時因為state=1,鎖被占用,所以將B的線程信息和等待狀態等數據構成一個Node節點,放入同步隊列中,head和tail分別指向隊列頭部和尾部(此時隊列中有一個空的Node節點作為頭點,head指向這個空節點,空Node的后繼節點是B對應的Node節點,tail指向它),同時阻塞線程B,阻塞使用的是LockSupport.park()方法。后續如果還有線程要獲取鎖,都會加入隊列尾部並阻塞。
2.釋放同步狀態
當線程A釋放鎖時,將state置為0,此時線程A會喚醒頭節點的后繼節點,喚醒其實是調用LockSupport.unpark(B)方法,即線程B從LockSupport.park()方法返回,此時線程B發現state已為0,所以線程B可以順利獲取鎖,線程B獲取鎖后,B的Node節點出隊。
上面就是AQS獲取和釋放的大致過程,下面結合AQS和ReentrantLock源碼來具體看下JDK是如何實現的,特別要注意JDK是如何保證同步和並發操作的。
三、AQS源碼分析
以ReentrantLock的源碼入手來深入理解下AQS的實現。
上面說過AQS一般是以繼承的方式被使用,同步組件內部組合一個繼承了AQS的子類。
在ReentrantLock類中,有一個Sync成員變量,即是繼承了AQS的子類,源碼如下:
public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; /** Synchronizer providing all implementation mechanics */
private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */
abstract static class Sync extends AbstractQueuedSynchronizer { ... } }
這里的Sync也是一個抽象類,其實現為FairSync和NonfairSync,分別對應公平鎖和非公平鎖。ReentrantLock的提供一個入參為boolean值的構造方法,來確定使用公平鎖還是非公平鎖:
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
1.獲取鎖
這里以NonfairSync類為例,看下它的Lock()的實現:
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
lock方法先CAS嘗試將同步狀態(state屬性)從0修改為1。若直接修改成功,則將占用鎖的線程設置為當前線程。看下compareAndSetState()和setExclusiveOwnerThread()實現:
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
可以看到compareAndSetState其實是調用的unsafe的CAS方法。
protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
exclusiveOwnerThread屬性是AQS從父類AbstractOwnableSynchronizer中繼承的屬性,用來保存當前占用同步狀態的線程。
如果CAS操作未成功,說明state已不為0,此時繼續acquire(1)操作,這個acquire()由AQS實現提供:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire方法嘗試獲取鎖,如果成功就返回,如果不成功,則把當前線程和狀態信息構建成一個Node節點,並將節點放入隊列尾部。然后為隊列中的當前節點循環等待獲取鎖,直到成功。
首先看tryAcquire(arg)在NonfairSync中的實現(這里arg=1):
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
先獲取AQS的同步狀態(state屬性),也就是鎖的狀態,如果狀態為0,則嘗試設置狀態為arg(這里為1), 若設置成功則表示當前線程獲取鎖,返回true。
如果狀態不為0,再判斷當前線程是否是鎖的owne,如果是owner, 則嘗試將狀態值增加acquires,如果這個狀態值越界,拋出異常;如果沒有越界,則設置后返回true。可以看到非公平鎖的含義,即獲取鎖並不會嚴格根據爭用鎖的先后順序決定。這里的實現邏輯類似synchroized的偏向鎖的做法,即可重入而不用進一步進行鎖的競爭,這也是ReentrantLock可重入的原因。
如果狀態不為0,且當前線程不是owner,則返回false。
回到上面的代碼,tryAcquire返回false,接着執行addWaiter(Node.EXCLUSIVE),這個方法創建節點並入隊,來看下源碼:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure
Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
先創建一個Node對象,Node中包含了當前線程和mode,mode用來記錄鎖模式,這里是排他鎖模式。tail是AQS的中表示同步隊列隊尾的屬性,剛開始為null,所以進行enq(node)方法,具體入隊細節:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
這里有一個死循環,本身無鎖,但可以多個線程並發訪問,假如某個線程進入方法,此時head, tail都為null, 進入if(t==null)區域,從方法名可以看出這里是用CAS的方式創建一個空的Node作為頭結點,因為此時隊列中只一個頭結點,所以tail也指向它,第一次循環執行結束。
進行第二次循環時或者是其他線程enq時,tail不為null,進入else區域。將當前線程的Node結點的prev指向tail,然后使用CAS將tail指向當前結點。
private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
expect為t, t此時指向tail,所以可以CAS成功,將tail重新指向當前線程結點。此時t為更新前的tail的值,指向的是空的頭結點,t.next=node,就將頭結點的后續結點指向當前線程結點,返回頭結點。其他線程插入節點以此類推,都是追加到鏈表尾部,並且通過CAS操作保證線程安全。
通過上面分析,AQS的寫入是一種雙向鏈表的插入操作。
addWaiter返回了插入的節點,作為acquireQueued方法的入參,看下源碼:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued方法也是一個死循環,直到進入 if (p == head && tryAcquire(arg))。acquireQueued接收的參數是addWaiter方法的返回值,也就是剛才的線程節點,arg=1。node.predecessor()返回當前線程結點(CNode)的前置節點,在這里也就是head節點,所以p==head成立,進而進行tryAcquire操作,即爭用鎖, 如果獲取成功,則進入if方法體,看下接下來的操作:
1) 將CNode設置為頭節點。
2) 將CNode的前置節點設置的next設置為null。
此時隊列如圖:
上面操作即完成了FIFO的出隊操作。
從上面的分析,只有隊列的第二個節點可以有機會爭用鎖,如果成功獲取鎖,則此節點晉升為頭節點。對於第三個及以后的節點,if (p == head)條件不成立,首先進行shouldParkAfterFailedAcquire(p, node)操作。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */
return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
shouldParkAfterFailedAcquire方法是判斷一個爭用鎖的線程是否應該被阻塞。它首先判斷一個節點的前置節點的狀態是否為Node.SIGNAL,如果是,是說明此節點已經將狀態設置,如果鎖釋放則應當通知它,所以它可以安全的阻塞,返回true。
如果前節點的狀態大於0,為CANCELLED狀態時,則從前節點開始循環找到一個沒有被CANCELLED節點設置為當前節點的前節點,返回false。在下次循環執行shouldParkAfterFailedAcquire時,返回true。這里是把隊列中CANCELLED的節點刪除掉。
如果shouldParkAfterFailedAcquire返回了true,則會執行parkAndCheckInterrupt()方法,它是通過LockSupport.park(this)將當前線程掛起到WATING狀態,它需要等待一個中斷、unpark方法來喚醒它。
2.釋放鎖
通過ReentrantLock的unlock方法來看下AQS的鎖釋放過程。來看下源碼:
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
unlock調用AQS的release()來完成, 由具體子類實現。如果tryRelease返回true,則會將head傳入到unparkSuccessor(Node)方法中並返回true,否則返回false。看看Sync中tryRelease(int)方法實現:
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
這就是一個設置鎖狀態的操作,而且是將狀態減掉傳入的參數值(arg是1),如果狀態為0了,就將排它鎖的Owner設置為null,以使得其它的線程有機會執行。
在排它鎖中,加鎖的時候狀態會增加1,在解鎖的時候減掉1,同一個鎖,在可以重入后,可能會被疊加為2、3、4這些值,只有unlock()的次數與lock()的次數對應才會將Owner線程設置為null,而且也只有這種情況下才會返回true。
在方法unparkSuccessor(Node)中,就意味着真正要釋放鎖了,它傳入的是head節點,head節點是占用鎖的節點:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
這里先會發生的動作是獲取head的next節點,如果獲取到的節點不為null,則直接LockSupport.unpark()來釋放與之對應的被掛起的線程,這樣就將使得有一個節點喚醒后繼續進入循環,然后去嘗試tryAcquire()方法來獲取鎖。