JAVA AQS的全稱為(AbstractQueuedSynchronizer),用於JAVA多線程的開發,從名稱我們也可以看出,它實現了同步的隊列,而這個隊列是指線程隊列。AQS類在java.util.concurrent.locks下面。
AQS和CAS作為JAVA5之后非常重要的特性,能在並發應用中提高程序性能,具體要就實際情況使用,因為JVM也在一直優化synchronized關鍵字,在JAVA7之后其性能也趨於穩定,不會隨着線程數增加而導致性能驟降(具體可以取網上搜索對比數據)。
總之,一般情況下還是建議用synchronized
CAS(CompareAndSet)是最小粒度的操作,保證了原子性,通過硬件指令集實現。簡單來說,CAS有3個操作數,內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改為B,否則返回V。
基於此,我們才能完成非阻塞同步操作(當然還有一些其他原子命令,例如FAI,LL/SC等),目的就是用樂觀鎖來換取性能的提升。
為什么要說CAS呢?因為AQS也是基於CAS實現的,下面進入正題,我們通過源碼來具體分析下AQS的實現:
-----------------------------------------------------------------------------華麗的分隔線---------------------------------------------------------------------------------------
AQS的包結構如下
繼承的子類有
我們從ReentrantLock來分析AQS的實現原理:
ReentrantLock是一個可重入的排他鎖。可重入指當前擁有鎖的對象可重復進入同步區域,防止重復操作,例如網頁登陸時重復點擊按鈕。排他鎖就是指該鎖(Lock)是互斥鎖,只允許一個對象擁有鎖。
先看看ReentrantLock的類結構
可以看到有3個內部類Sync,NonfairSync和FairSync。其中NonfairSync和FairSync均繼承Sync,而Sync繼承了AQS。NonfairSync和FairSync的區別在於當一個線程釋放了鎖的時候,隊列里的其他線程是否按照FIFO的規則去獲取鎖的。
換句話說,FairSync能夠保證先到的線程先拿到鎖(有一個特殊情況,就是隊列里的線程在unpark到獲取鎖的過程中有新的還未加入到隊列中的線程獲取到鎖,不過這種情況發生的概率很小,基本不用考慮),而NonfairSync不保證
下面我們看看ReentrantLock是如何實現上鎖的,這是lock函數:
public void lock() { sync.lock(); }
sync的lock函數為抽象的,由子類實現,這里只給出FairSync的實現
final void lock() { acquire(1); }
acquire就是AQS提供的接口
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
這里涉及到4個新方法,讓我們慢慢來分析:
首先,acquire有一個參數arg,是用於判斷鎖持有的次數,也就是重入的次數,當鎖持有者需要釋放鎖的時候,則要將鎖的state減去arg的值。在上面可以看到,調用acquire時傳入的參數為1。
1. tryAcquire:AQS里tryAcquire的實現如下
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
明顯是不對的,只拋出了一個異常,所以應該是子類覆蓋了,那么看看FairSync的實現
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
getState()是從ReentrantLock獲取的狀態,表示鎖當前是否被持有,為0時表示沒有線程持有鎖。此時當前線程會去爭取鎖的持有權。
首先判斷隊列中是否有排在當前線程之前的線程,有的話放棄爭搶鎖。hasQueuedPredecessors是AQS里的方法:
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
tail記錄隊列里的尾節點,head就是頭結點。這個函數就是判斷當前線程是否是下一個可持鎖的線程
然后,當當前線程滿足條件時,就通過CAS設置c的值,同時通過AbstractOwnableSynchronizer類提供的setExclusiveOwnerThread接口將當前線程鎖住,用於防止volatile字段被其他線程修改(這里是看注釋后的個人理解)
如果c != 0,同時鎖持有者為當前線程,那么這個請求就是重入請求了,將c+=acquire。如果上訴兩個條件都不滿足,返回false。
2. addWaiter:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
根據注釋可以知道,首先快速插入隊列,其實就是判斷隊列是否為空,否則就要通過enq(node)將當前節點添加至隊列中,為什么這樣會慢些?我們看看enq函數
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
這個方法可以好好看看:用for無限循環,考慮下多線程同時加入隊列,通過CAS保證設置當前節點到尾節點。這里可能隊列里的線程已經執行完任務釋放鎖了,所以還需要重新判斷隊列是否為空,因此其執行效率當然會有影響。
否則,設置前置節點,加入隊列。用圖例來幫助理解:
由於本身沒有鎖,可以有多個線程進來,如果有多個線程並發進入這個if判定區域,可能就會同時存在多個這樣的數據結構,在各自形成數據結構后,多個線程都會去做compareAndSetHead(h)的動作,也就是嘗試將這個臨時h節點設置為head,
顯然並發時只有一個線程會成功,因此成功的那個線程會執行tail = node的操作,整個AQS的鏈表就成為:
3. acquireQueued:節點加入隊列之后,就通過該函數去等待獲取鎖
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
同樣是for無限循環,判斷當前節點之前是否沒有線程節點了,如果是,就去爭搶鎖。用上面給出的子類tryAcquire函數,成功的話設置相關參數,這里也解釋了釋放指針,幫助垃圾回收(GC)。
如果爭搶失敗,判斷是否需要阻塞當前線程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
waitState參數有以下幾個:
- SIGNAL:當前線程持有鎖
- CANCELLED:超時獲取被中斷
- CONDITION
- PROPAGATE
- 0:不屬於以上任何一種情況
后面兩個沒怎么看,大致就是需要滿足多個條件,PROPAGATE用於shared鎖隊列。
回到上面的方法:因為AQS支持中斷等待,所以如果線程中斷了爭搶鎖(CANCELLED),那么就不需要阻塞,直接返回。acquireQueued方法沒有返回,而是設置一個interrupt參數為true而已,線程爭搶鎖失敗的話繼續休眠等待,而AQS里的doAcquireInterruptibly()發現爭搶失敗的話就直接throw new InterruptedException()。在ReentrantLock里需要調用ReentrantLock.lockInterruptibly()就會實現中斷返回。否則AQS會嘗試將當前線程狀態設置成SIGNAL,失敗就循環繼續嘗試
下面是parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
LockSupport.park通過Unsafe.park實現阻塞,它會設置一個AQS的blocker,讓隊列里的線程阻塞在一個地方,然后返回線程中斷的判斷
4. selfInterrupt
private static void selfInterrupt() { Thread.currentThread().interrupt(); }
acquireQueued會返回boolean值表示線程是否中斷,如果未中斷,就調用Thread.interrupt()中斷線程
---------------------------------------------------------------------------------------------------------------------
以上是鎖的實現原理,當tryAcquire()成功之后,線程獲取鎖,執行任務,執行完畢之后,會調用AQS的release方法:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease是子類Sync的實現:
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
其他就不多說了,比較簡單明晰。就看看unparkSuccessor怎么做的
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
通過CAS修改節點的waitStatus,然后將后續節點去除,這里會去遍歷后續節點,判斷其是否狀態為CANCELLED,將所有非CANCELLED的節點喚醒。