要介紹AQS,首先要介紹“同步器”的概念。
同步器是一種抽象數據類型,在該類型的內部,維護了以下內容:
1.一個狀態變量,該變量的不同取值可以表征不同的同步狀態語義(例如表示一個鎖已經被線程持有了還是沒有任何線程持有);
2.能夠更新和檢查該狀態變量值的操作(方法)集合;
3.至少有一個方法——當同步狀態的值需要時可調用該方法阻塞來修改該狀態的線程;或當其他的線程修改了同步狀態值,可允許調用該方法喚醒其他阻塞線程
簡單說,同步器中包含一個可表征同步狀態的變量,可操作該變量的方法集,以及可阻塞或喚醒其他來修改該狀態的線程的方法集。
互斥鎖,讀寫鎖,信號量,屏障,事件指示器等等都是同步器。
AQS,全稱是AbstractQueuedSynchronizer,中文譯為抽象隊列式同步器。這個抽象類對於JUC並發包非常重要,JUC包中的ReentrantLock,,Semaphore,ReentrantReadWriteLock,CountDownLatch等等幾乎所有的類都是基於AQS實現的。
“抽象”是說該類是一個抽象類,“隊列式同步器”是說AQS使用隊列來管理多個搶占資源的線程。AQS在其內部實現了上面所說的同步器的三要素,而且它會把搶占資源失敗的線程放入自己內部的一個隊列當中維護起來,在這個隊列內部的線程會排隊等待獲取線程。
線程獲取或釋放鎖的本質是去修改AQS內部那個可以表征同步狀態的變量的值。比如說,我們創建一個ReentrantLock的實例,此時該鎖實例內部的狀態的值為0,表征它還沒有被任何線程所持有。當多個線程同時調用它的lock()方法獲取鎖時,它們的本質操作其實就是將該鎖實例的同步狀態變量的值由0修改為1,第1個搶到這個操作執行的線程就成功獲取了鎖,后續執行操作的線程就會看到狀態變量的值已經為1了,即表明該鎖已經被其他線程獲取,它們搶占鎖失敗了。這些搶占鎖失敗的線程會被AQS放入到一個隊列里面去維護起來。當然,實際的情況肯定要稍微復雜些,但本質上是這個道理。
AQS是一個抽象類,當我們繼承AQS去實現自己的同步器時,要做的僅僅是根據自己同步器需要滿足的性質實現線程獲取和釋放資源的方式(修改同步狀態變量的方式)即可,至於具體線程等待隊列的維護(如獲取資源失敗入隊、喚醒出隊、以及線程在隊列中行為的管理等),AQS在其頂層已經幫我們實現好了,AQS的這種設計使用的正是模板方法模式。
AQS支持線程搶占兩種鎖——獨占鎖和共享鎖:
- 獨占鎖:同一個時刻只能被一個線程占有,如ReentrantLock,ReentrantWriteLock等,它又可分為:
- 公平鎖:按照線程在隊列中的排隊順序,先到者先拿到鎖
- 非公平鎖:當線程要獲取鎖時,無視隊列順序直接去搶鎖,誰搶到就是誰的
- 共享鎖:同一時間點可以被多個線程同時占有,如ReentrantReadLock,Semaphore等
1.AQS中的核心成員和內部類
AQS使用一個int成員變量state去表征當前資源的同步狀態。AQS使用CAS對該同步狀態進行原子操作實現對其值的修改。
private volatile int state; //共享變量,表征同步狀態的值,用volatile修飾保證線程間可見性
AQS可以修改該同步狀態值的方法:
/** 返回同步狀態的當前值(此操作具有volatile變量的讀語義) **/ protected final int getState() { //方法被final修飾,不允許被重寫 return state; } /** 設置同步狀態的值(此操作具有volatile變量的寫語義) **/ protected final void setState(int newState) { //方法被final修飾,不允許被重寫 state = newState; } /** * 原子地(CAS操作)將同步狀態值設置為給定值update如果當前同步狀態的值等於expect(此操作具有volatile變量的讀寫語義) * @return 成功返回true,失敗返回false,意味着當操作進行時同步狀態的當前值不是expect **/ protected final boolean compareAndSetState(int expect, int update) { //方法被final修飾,不允許被重寫 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS通過維護一個等待獲取鎖的線程隊列來管理(獲取資源失敗入隊/喚醒出隊)搶占鎖的線程,這個隊列是一種CLH隊列的變體。如下圖:
1.AQS持有head指針和tail指針,頭結點是搶占鎖成功而持有鎖的線程對應的結點,若有線程搶鎖失敗,AQS會創建新結點並用CAS操作使其成為新的尾結點
2.AQS把對某線程的一些控制信息放到了其前驅中維護,當某結點的前驅釋放鎖或被取消時會喚醒其后繼,而其后繼會在獲取鎖成功后將自己設為新的頭結點
可以看到,AQS對這個維護等待線程隊列的操作都是非阻塞的,也是線程安全的。
隊列中的每個結點都是類Node的一個實例,類Node的定義如下:
private transient volatile Node head;//隊頭,延遲加載,除初始化外其他情況都使用setHead操作;其waitStatus值不為CANCELLED private transient volatile Node tail;//隊尾 /** 隊列中的結點,每個等待中的結點可能會處於幾種不同的等待狀態 **/ static final class Node{ static final Node SHARED = new Node(); //表示結點對應線程想共享地搶占鎖 static final Node EXCLUSIVE = null; //表示結點對應線程想獨占地搶占鎖 volatile int waitStatus; //結點的等待狀態,CLH隊列中初始默認為0,Condition隊列中初始默認為-2 static final int CANCELLED = 1; // 結點已被取消,表示線程放棄搶鎖,結點狀態以后不再變直到GC回收它 static final int SIGNAL = -1;//結點的后繼已經或很快就阻塞,在結點釋放鎖或被取消時要喚醒其后面第1個非CANCELLED結點 /** Condition隊列中結點的狀態,CLH隊列中結點沒有該狀態,當Condition的signal方法被調用, Condition隊列中的結點被轉移進CLH隊列並且狀態變為0 **/ static final int CONDITION = -2; //與共享模式相關,當線程以共享模式去獲取或釋放鎖時,對后續線程的釋放動作需要不斷往后傳播 static final int PROGAGATE = -3; volatile Node prev; //指向結點在隊列中的前驅 volatile Node next; //指向結點在隊列中的后繼 volatile Thread thread; //使當前結點進隊的線程(與當前結點關聯的線程) Node nextWaiter;//Condition隊列中指向結點在隊列中的后繼;在CLH隊列中共享模式下值取SHARED,獨占模式下為null final boolean isShared() { //若結點在CLH隊列中以共享模式等待則返回true return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { //返回結點前驅 Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() {} // Used to establish initial head or SHARED marker Node(Thread thread, Node mode) { //往CLH隊列中添加結點時調用此構造器構造結點 this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { //往Condition隊列中添加結點時調用此構造器構造結點 this.waitStatus = waitStatus; //傳入的waitStatus為CONDITION this.thread = thread; } }
二、AQS的核心模板方法
AQS使用了模板方法模式,它核心模板方法包括:acquire--release,acquireShared--releaseShared四個方法,接下來,我們就一一介紹這幾個模板方法。
1.acquire(int arg)方法:AQS獨占模式下獲取鎖的頂層入口
線程獲取鎖,成功直接返回,失敗則進入等待隊列中排隊獲取鎖。在獲取鎖的過程中不響應發生的中斷而是記錄下來,最后檢查是否中斷過,如果中斷過再將中斷標記補上。
public final void acquire(int arg) { //獨占模式獲取鎖的模板方法 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //線程具體的獲取鎖方法,此方法由具體同步器(即AQS子類)實現,獲取鎖成功時要返回true,失敗返回false protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /** * 根據當前線程的獲取鎖模式創建一個結點並加入隊列中 * @param mode Node.EXCLUSIVE表示獨占模式, Node.SHARED表示共享模式 * @param return 返回創建的新結點 **/ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //先在當前方法中用CAS進隊試一次,不成功則進入enq()方法中反復嘗試直到進隊成功 Node pred = tail; if (pred != null) { node.prev = pred; //注意這里先置了node的前驅 if (compareAndSetTail(pred, node)) { //CAS操作嘗試原子地將tail置為指向當前新建結點 pred.next = node; //成功說明tail已指向當前結點,則給當前結點前驅的next指針賦值 return node; } } enq(node); //失敗進入enq()方法反復嘗試直到成功 return node; } //反復嘗試直到結點進入等待隊列成為隊尾,注意該方法返回輸入結點的前驅結點 private Node enq(final Node node) { for (;;) { //經典“CAS + 失敗重試” Node t = tail; if (t == null) { //需要初始化等待隊列 if (compareAndSetHead(new Node())) tail = head; } else { //下面這部分和addWaiter方法中一樣 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } //已經進入等待隊列的線程在隊列中獨占(且不響應中斷)地獲取鎖的行為 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; //獲取失敗標志,初值為true try { boolean interrupted = false; //記錄線程在隊列中獲取鎖的過程中是否發生過中斷 //死循環,在循環中線程可能會被阻塞0次,1次,或多次,直到獲取鎖成功才跳出循環,方法返回 for (;;) { final Node p = node.predecessor(); //獲取當前結點的前驅 //只有當前結點的前驅是頭結點,當前線程才被允許嘗試獲取鎖;只有獲取鎖成功才會跳出循環方法返回 if (p == head && tryAcquire(arg)) { setHead(node); //獲取鎖成功會將當前結點設為頭結點 p.next = null; // help GC failed = false; return interrupted; //返回是否發生過中斷 } //線程不被允許獲取鎖或獲取失敗都會進入下面的方法檢查是否自己可以阻塞;被喚醒后記錄是否是被中斷喚醒的 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; //如果是被中斷喚醒的,會記錄下來被中斷過 } } finally { if (failed) //線程發生異常則取消獲取鎖行為 cancelAcquire(node); } } //等待隊列中的線程不被允許獲取鎖或嘗試獲取鎖失敗后調用,檢查自己是否可以阻塞 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //獲取前驅的等待狀態 if (ws == Node.SIGNAL) //前驅的等待狀態已經是SIGNAL,則當前線程可以放心阻塞 return true; //表示要阻塞 if (ws > 0) { //前驅等待狀態為CANCELLED,說明前驅已無效 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0);//不斷向前尋找狀態不為CANCELLED的結點,同時將無效結點鏈成一個不可達的環,便於GC pred.next = node; //找到狀態不為CANCELLED的結點 } else {//前驅狀態是PROGAGATE或0時,將其前驅的狀態設為SIGNAL,在再次嘗試失敗后才阻塞(?) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; //表示還要再嘗試 } //線程阻塞,被喚醒時會返回是否是被中斷喚醒的 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
2.release(int arg)方法:AQS獨占模式下釋放鎖的頂層入口
調用tryRelease嘗試釋放鎖,如果成功了,需要查看head的waitStatus狀態,如果是0,表示CLH隊列中沒有后繼節點了,不需要喚醒后繼;否則調用unparkSuccessor喚醒后繼。而unparkSuccessor喚醒后繼的原理是:找到node后面的第一個非cancelled結點進行喚醒。
public final boolean release(int arg) { if (tryRelease(arg)) { //嘗試釋放鎖 Node h = head; //如果head的waitStatus為0說明沒有后繼了,因為如果有后繼,它的后繼在阻塞前一定會把它的waitStatus設為SIGNAL if (h != null && h.waitStatus != 0) unparkSuccessor(h); //喚醒后繼 return true; } return false; } //喚醒后繼 private void unparkSuccessor(Node node) { int ws = node.waitStatus; //node是獲取了鎖的結點 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //喚醒后繼前,重置waitStatus為0 Node s = node.next; //node的next指針不一定指向其后繼,當node的狀態為cancelled的時候,其next指向自己 if (s == null || s.waitStatus > 0) { //這里的s == null的條件判斷不理解(?) s = null; for (Node t = tail; t != null && t != node; t = t.prev) //從后往前找node的后繼中第一個沒有被取消的結點 if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); //喚醒該結點線程 }
有了上面的獨占鎖獲取和釋放的頂層入口解釋以及它們在隊列中的行為說明,現在就可以去看ReentrantLock的原理了。
3.acquireShared(int arg)方法:AQS共享模式下獲取鎖的頂層入口
共享鎖的獲取鎖方式和獨占鎖的獲取鎖方式有部分相似,有又一些不同。
剛開始時,在共享資源允許范圍內會有多個線程同時共享該鎖,剩下的線程就被加入到CLH等待隊列中排隊阻塞等待;當持有鎖的線程釋放鎖時,它會喚醒在隊列中等待的后繼,而這個后繼在獲取鎖之后會繼續檢查資源的剩余量,如果還有剩余,它會接着喚醒自己的后繼。也就是說,共享模式下,線程無論是在獲取鎖或者釋放鎖的時候,都可能會喚醒其后繼,而且在共享資源允許的條件下會引起多個線程被連續喚醒。如果有多個線程同時獲取了共享鎖,則head指向的那個是CLH隊列中最后一個持有鎖的線程,其他的都已經出隊了。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) //嘗試獲取鎖,成功返回值大於等於0,失敗返回值小於0 doAcquireShared(arg); //如果失敗,則調用doAcquireShared方法獲取鎖 } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); //線程入隊 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //取當前結點的前驅 if (p == head) { //前驅為頭結點才允許嘗試獲取鎖,這里體現了入隊之后獲取資源的順序性,只要入隊,就是順序的了 int r = tryAcquireShared(arg); if (r >= 0) { //獲取鎖成功 setHeadAndPropagate(node, r); //將當前線程設為頭,然后可能執行對后繼SHARED結點的連續喚醒 p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //獲取鎖失敗,設置前驅waitStatus為SIGNAL,然后阻塞,這個過程與獨占模式相同 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //獲取鎖過程中發生異常造成未成功獲取,則取消獲取 cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { //propagate是資源剩余量,從上面的調用中可以看到 Node h = head; //將舊的頭結點先記錄下來 setHead(node); //將當前node線程設為頭結點,node已經獲取了鎖 //如果資源有剩余量,或者原來的頭結點的waitStatus小於0,進一步檢查node的后繼是否也是共享模式 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //得到node的后繼 if (s == null || s.isShared()) //如果后繼是共享模式或者現在還看不到后繼的狀態,則都繼續喚醒后繼線程 doReleaseShared(); } } private void doReleaseShared() { for (;;) { Node h = head; //記錄下當前的head if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //如果head的waitStatus為SIGNAL,一定是它的后繼設的,共享模式下要喚醒它的后繼 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //先將head的waitStatus設置為0,成功后喚醒其后繼 continue; // loop to recheck cases unparkSuccessor(h); //關鍵,若成功喚醒了它的后繼,它的后繼就會去獲取鎖,如果獲取成功,會造成head的改變 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //沒有后繼結點,設為PROPAGATE continue; // loop on failed CAS } if (h == head) //若head發生改變,說明后繼成功獲取了鎖,此時要檢查新head的waitStatus,判斷是否繼續喚醒(下次循環) break; //head沒有發生改變則停止持續喚醒 } }
共享鎖獲取總結:
1.與獨占模式的最大不同是,共享模式下,線程無論是對共享鎖成功獲取還是對資源的釋放都可能會引起連續喚醒。獨占模式下只有當線程釋放鎖時才喚醒其后繼,而且不會連續喚醒(暫時忽略取消造成的喚醒)
2.每次喚醒新的線程,這個線程嘗試獲取鎖,如果獲取到了鎖,新線程除了將自己設為頭結點之外,還會檢查是否滿足繼續喚醒條件,如果滿足,則繼續喚醒其后繼。(這里共享模式的獲取沒有仔細分析,但是只要大體理解就好)
3.在共享模式下,當隊列中某個結點的waitStatus為0時,表明它沒有后繼(因為如果有后繼,后繼就會把它的waitStatus置為-1了),這時候線程會把它的waitStatus設置為PROPAGATE,表示一旦出現一個新的共享結點連接在該結點后,該結點的共享鎖將傳播下去。
4.releaseShared(int arg)方法:共享鎖釋放的頂層入口
共享鎖釋放的邏輯很簡單,如果釋放成功,則啟動對后繼可能的連續喚醒。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //如果釋放鎖成功 doReleaseShared(); //啟動對后繼的持續喚醒 return true; } return false; } private void doReleaseShared() { for (;;) { Node h = head; //記錄下當前的head if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //如果head的waitStatus為SIGNAL,一定是它的后繼設的,共享模式下要喚醒它的后繼 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //先將head的waitStatus設置為0,成功后喚醒其后繼 continue; // loop to recheck cases unparkSuccessor(h); //關鍵,若成功喚醒了它的后繼,它的后繼就會去獲取鎖,如果獲取成功,會造成head的改變 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //沒有后繼結點,設為PROPAGATE continue; // loop on failed CAS } if (h == head) //若head發生改變,說明后繼成功獲取了鎖,此時要檢查新head的waitStatus,判斷是否繼續喚醒(下次循環) break; //head沒有發生改變則停止持續喚醒 } }