1. MyAQS介紹
在這個系列博客中,我們會參考着jdk的AbstractQueuedLongSynchronizer,從零開始自己動手實現一個AQS(MyAQS)。通過模仿,自己造輪子來學習主要有兩個好處,一是可以從簡單到復雜,從核心邏輯再到旁路邏輯的實現,學習曲線較為平滑;二是可以站在設計者的角度去思考實現具體功能時可以采用的各種方案的優劣,更深刻的理解其設計的精妙、關鍵之處。
AQS支持互斥和共享這兩種工作模式,其中互斥模式比共享模式要簡單許多。本着由淺入深的原則,本篇博客實現的MyAQS暫時只支持互斥模式。
MyAQS會按照順序,逐步的實現互斥模式、共享模式、允許取消加鎖(中斷、超時退出)和支持條件變量這四個模塊,具體內容會在后續的博客中分享出來。
1.1 樂觀鎖與CAS原理介紹
基於CAS策略的樂觀鎖機制是實現無鎖並發的關鍵所在,因此在展開AQS的實現原理前需要先簡單介紹一下樂觀鎖和CAS機制的原理。
悲觀鎖與樂觀鎖是用來控制線程並發安全的兩種機制。為了防止臨界區數據被不同線程並發的讀寫出現問題,悲觀鎖只允許一個線程進入臨界區以進行臨界區數據的訪問,而其余沒有爭用到鎖的線程則會停留在臨界區外(自旋或者進入阻塞態)。而樂觀鎖則是基於比較並設置這一思想來實現的,其允許不同線程並發的訪問、修改某一臨界區數據,但保證同一瞬間只有一個線程能夠修改成功。從這個角度看樂觀鎖其實並不是傳統概念上的鎖,而更像是一種策略。
具體來說,樂觀鎖中每個線程在需要修改某一臨界區數據前需要先讀取當前數據的快照值(expect),然后執行一次cas操作(compareAndSet),如果CAS操作返回成功則說明修改成功,如果返回失敗則說明對應數據在當前線程讀取快照后、執行cas操作前的這段時間內有別的線程已經進行過修改,則需要重新讀取出當前最新的快照值進行處理后再次嘗試cas操作。
並發場景下cas操作可能會失敗很多次,所以一般是放在一個循環中執行的,無限循環直到cas操作成功才結束。
CAS操作示例偽代碼(CAS自增):
boolean compareAndSet(expect,update,targetDataMemory){ if(expect == targetDataMemory.data){ // compare 比較 targetDataMemory.data = update; // set 設置 return true; }else{ return false; } } Integer atomicIntegerAdd(){ // targetDataMemory.data標識對應的整數 do{ expect = targetDataMemory.data; newUpdate = expect+1; }while(compareAndSet(expect, newUpdate, targetDataMemory)); return newUpdate; }
從偽代碼中可以看到,compareAndSet中有一次比較操作(expect == targetDataMemory.data)和一次賦值操作(targetDataMemory.data = update),如果在比較和賦值操作中出現了並發(線程A執行比較為true,線程B執行比較也為true,線程B執行賦值,線程A再執行賦值),則線程A將會覆蓋掉線程B的改動,那這個cas操作就是有問題的。因此cas操作作為一個基礎的底層操作,其對數據的比較和修改必須是原子性的,即compare和set是一起執行的,在此期間不允許插入其它操作。
保證原子性操作的手段有很多,在應用程序層可以實現,在操作系統層、硬件層也可以實現。但由於上層功能的實現都是基於底層功能的,如果沒有硬件層提供的原子操作,是不可能在基於硬件上的軟件層實現原子操作的,而compareAndSet操作由於其指令集合足夠簡單,因此很自然的被cpu硬件實現了,使得可以通過一條匯編指令來控制cpu去執行cas指令。基於硬件提供的各種基礎的原子性操作,上層的操作系統、應用程序才能夠實現更高級的阻塞/喚醒、信號量、互斥鎖等等功能。
在java中提供了許多可以利用硬件CAS機制的工具類,例如juc包下的atomic系列工具類集合(AtomicInteger、AtomicReference等);unsafe類中提供的cas方法(compareAndSwapInt、compareAndSwapObject等)。
1.2 MyAQS基礎結構
MyAQS是jdkAQS的簡化版,因此整體的結構與jdk中的AQS差別不大,都有一個雙向鏈表結構的同步隊列作為底層支撐。由於本篇博客中的AQS只實現了互斥模式,並且暫時還未引入jdk實現中那么多處理取消加鎖的復雜邏輯和節點狀態(status),代碼相對簡單很多,也能夠更加准確的把握住互斥模式中最核心的邏輯。
下面開始介紹MyAQS第一版的基礎結構。
1. 同步隊列頭、尾節點以及相關的CAS操作封裝
AQS的同步隊列是基於有顯式前驅結點引用的CLH鎖隊列的一個變種實現(理解CLH鎖工作原理對后續理解AQS的實現會有很大幫助,可以參考我之前的博客AQS學習(一)自旋鎖原理介紹)
和CLH鎖一樣,AQS的底層隊列同樣有一個虛擬的Dummy頭節點(head),以及一個標識當前隊尾節點的tail節點。由於需要實現隊列的無鎖並發,因此head節點與tail節點都是使用volatile關鍵字修飾的,並且在會出現並發的臨界區代碼中使用CAS操作+樂觀重試的機制來保證隊列並發訪問時的線程安全。
相關方法為:compareAndSetHead和compareAndSetTail。
2. 需要上層同步器實現的個性化嘗試加鎖、嘗試解鎖抽象方法
AQS作為一個底層的框架,是無法在內部同時去為各種類型的同步器實現線程何為加鎖成功/失敗的邏輯的,因此AQS將這些個性化的邏輯提取為了幾個抽象方法交給特定的同步器按照約定去實現,而AQS的內部會在需要的時候調用這些方法,這也是AQS被定義為抽象類的原因。
其中在互斥模式下需要子類去實現兩個方法:tryAcquire和tryRelease。
- tryAcquire用於嘗試着去爭用互斥鎖,約定加鎖成功返回true、加鎖失敗返回false。
- tryRelease用於嘗試着去釋放釋放鎖,約定解鎖成功返回true,解鎖失敗返回false。
這兩個方法具體的使用會在下面的互斥模式工作原理中展開介紹。
3 提供給上層同步器使用的state字段
AQS提供了一個使用volatile關鍵字修飾的int類型屬性state,特定的同步器可以利用state屬性來靈活的實現自己的需求。
相關方法為:getState、setState和compareAndSetState(CAS設置state的值)。
MyAQS基礎結構代碼:
public abstract class MyAqsV1 { private volatile int state; private transient volatile Node head; private transient volatile Node tail; private transient Thread exclusiveOwnerThread; private static final Unsafe unsafe; private static final long stateOffset; private static final long headOffset; private static final long tailOffset; static { try { // 由於提供給cas內存中字段偏移量的unsafe類只能在被jdk信任的類中直接使用,這里使用反射來繞過這一限制 Field getUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); getUnsafe.setAccessible(true); unsafe = (Unsafe) getUnsafe.get(null); stateOffset = unsafe.objectFieldOffset(MyAqsV1.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset(MyAqsV1.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset(MyAqsV1.class.getDeclaredField("tail")); } catch (Exception var1) { throw new Error(var1); } } /** * 設置獨占當前aqs的線程 * */ protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } /** * 獲取獨占當前aqs的線程 * */ protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } private boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } private boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } /** * 嘗試着去申請互斥鎖(抽象方法,由具體的實現類控制) * */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /** * 嘗試着去釋放互斥鎖(抽象方法,由具體的實現類控制) * */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } }
2. MyAQS同步隊列
2.1 MyAQS隊列節點
MyAQS隊列節點中的成員屬性可能會被多個線程並發的訪問,所以需要用volatile關鍵字修飾以保證其在不同線程間內存的可見性。
最初的版本中節點屬性比較簡單,后續隨着所要支持功能的增多,MyAQS中Node節點類也會隨之拓展而變得復雜。
AQS節點類實現:
/** * 內部同步隊列的節點 * */ static final class Node { volatile Node prev; volatile Node next; volatile Thread thread; public Node() { } Node(Thread thread) { this.thread = thread; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) { throw new NullPointerException(); } else { return p; } } }
MyAQS同步隊列示意圖:
2.2 AQS隊列入隊
AQS作為互斥鎖等同步器的底層支撐框架,當線程爭用鎖失敗時需要令當前線程節點插入同步隊列的隊尾,入隊之后線程會陷入阻塞而讓出CPU,等待其前驅節點在釋放鎖后將其喚醒。
AQS隊列入隊相關實現:
/** * 嘗試加互斥鎖,如果加鎖失敗則當前線程進入阻塞狀態 * */ public final boolean acquire(int arg) { // 嘗試着去申請互斥鎖 boolean acquireResult = tryAcquire(arg); if(!acquireResult){ // 申請互斥鎖失敗,新創建一個綁定當前線程的節點,並令其插入隊尾 Node newWaiterNode = addWaiter(); // 嘗試着加入同步隊列 acquireQueued(newWaiterNode,arg); } // 不支持中斷功能,返回false return false; } /** * 創建當前線程對應的同步隊列節點 * 令該隊列節點插入隊尾 * */ private Node addWaiter() { Node node = new Node(Thread.currentThread()); Node pred = tail; if (pred != null) { node.prev = pred; // 當隊列不為空時,執行一次快速的入隊操作(因為少了一次enq方法調用,會快一點?) if (compareAndSetTail(pred, node)) { // 快速入隊成功,直接返回 pred.next = node; return node; } } // 上面的快速入隊操作失敗了,使用enq循環cas直到入隊(隊列為空,利用enq方法初始化同步隊列) enq(node); return node; } /** * 入隊操作 * 使用CAS操作+無限重試的方式來解決並發沖突的問題 * @return 返回新的隊尾節點 * */ private Node enq(final Node node) { for (;;) { Node currentTailNode = tail; if (currentTailNode == null) { // AQS的同步隊列是惰性加載的,如果tail為null說明隊列為空(head=null && tail=null) if (compareAndSetHead(new Node())) { // 使用cas的方式先創建一個新節點,令tail和head同時指向這一新節點 // 並發時多個線程同時執行,只會有一個線程成功執行compareAndSetHead這一cas操作 tail = head; } } else { // 令當前入隊節點node插入隊尾 node.prev = currentTailNode; // 使用cas的方式令aqs的tail指向node,使得新節點node成為新的隊尾元素 if (compareAndSetTail(currentTailNode, node)) { // 並發時多個線程同時執行,獲取到的tail引用值是一樣的,只有最先執行compareAndSetTail的線程會成功 // compareAndSetTail執行成功后令tail引用指向了一個新的節點,因此同一時刻獲取到相同tail引用的線程cas插入隊尾的操作會失敗(expect不對了) currentTailNode.next = node; return currentTailNode; } // compareAndSetTail執行失敗的線程會進入新的循環,反復嘗試compareAndSetTail的cas操作直到最終成功 } } } /** * 令已經入隊后的節點陷入阻塞態 * */ private void acquireQueued(final Node node, int arg) { for (; ; ) { final Node p = node.predecessor();
// 如果需要當前節點是aqs頭節點的next節點,則嘗試tryAcquire獲取鎖
if (p == head && tryAcquire(arg)) { // tryAcquire獲取鎖成功成功,說明頭節點對應的線程已經釋放了鎖 // 令當前入隊的節點成為新的head節點 setHead(node); p.next = null; // help GC return; }else{ // 阻塞當前線程 LockSupport.park(this); } } }
上述入隊相關邏輯中有幾個需要重點關注的邏輯:
2.2.1 acquire入隊總控方法
對外提供的public方法acquire是AQS互斥模式下加鎖的總控方法。當子類實現的tryAcquire嘗試加鎖失敗而返回false時,會先調用addWaiter方法獲得已入隊的線程節點,再調用acquireQueued方法令對應線程陷入阻塞態。
2.2.2 創建同步隊列節點並入隊
addWaiter方法用於將當前線程節點置入隊尾。而其中head節點是懶加載的,即只有當第一個線程節點需要入隊時才會被初始化。這樣做的主要好處是當AQS中只有一個線程占用鎖,但不存在其它爭用鎖失敗的線程時,通過推遲head節點的創建時機而提高AQS的空間利用率。
2.2.3 CAS的無鎖並發隊列
由於是無鎖並發的隊列,addWaiter和enq方法中如果存在多個線程同時並發的入隊並通過CAS設置tail節點引用時,至多只會有一個線程CAS返回成功,而其它線程則會CAS失敗。
失敗的線程由於處於for的無限循環中,會重新讀取新的tail節點引用值並再次嘗試着CAS加入隊尾(因為之前成功的節點已經成為了新的隊尾節點,當前線程需要掛在最新的隊尾節點之后)。在多個線程並發操作時,部分的線程可能會失敗很多次,但只要並發的線程數不再持續增加,最終所有的入隊CAS操作都將成功。
2.2.4 當前入隊節點其前驅節點不是頭節點時,入隊后會陷入阻塞態
一般的,爭用鎖失敗而加入同步隊列成功的線程在acquireQueued中會通過LockSupport.park方法而陷入阻塞態,等待前驅節點在釋放鎖時將其重新喚醒。
2.2.5 當前節點前驅節點恰好為頭節點時,陷入阻塞態前先嘗試一次tryAcquire
特別的,如果acquireQueued中當前節點node其前驅恰好是頭節點,則會再嘗試一次tryAcquire方法,若此時子類實現的tryAcquire返回true則認為加鎖成功。加鎖成功后,當前線程不必進入阻塞態而是直接返回,返回前會通過setHead方法令自己成為新的線程節點。setHead和p.next = null這兩行代碼會將之前老的head節點相關引用全部設置為null,之后老的head節點便會被自動GC給回收掉。
雖然在總控函數acquire中已經先執行過了一次tryAcquire,並且當時返回false時才會執行到acquireQueued方法中。但由於多線程並發時存在非常多的臨界情況,頭節點所對應的線程可能在當前線程執行到acquireQueued時就已經釋放了鎖,此時再次執行tryAcquire便會返回true了。
2.3 AQS隊列出隊
AQS和CLH鎖在線程釋放鎖時的行為略有不同。
CLH鎖在釋放鎖時只是簡單的將當前節點的isLocked鎖標記修改為false,如果此時后繼節點存在的話,就會在自旋循環中及時的感知到這一變化后而加鎖成功。
但AQS中加鎖失敗並入隊的后繼節點一般是位於阻塞態的,無法主動發現這一變化,因此需要由釋放鎖的線程負責去將其喚醒。這也是為什么AQS的隊列要在CLH鎖的基礎上引入顯式next后繼節點引用的原因(CLH鎖隊列的變種)。因為這樣可以在大多數情況下將獲取后繼節點引用的時間復雜度由O(n)(從tail隊尾通過prev引用一個一個找過來)優化為O(1)(head.next即可)。
AQS隊列出隊相關實現:
/** * 嘗試着釋放互斥鎖 * @return true:釋放成功;false:釋放失敗 * */ @Override public final boolean release(int arg) { // 嘗試着釋放鎖 if (tryRelease(arg)) { // 成功釋放 Node h = this.head; if (h != null) { unparkSuccessor(h); } return true; } return false; } /** * 喚醒后繼節點 * */ private void unparkSuccessor(Node node) { Node next = node.next; if(next != null) { LockSupport.unpark(next.thread); } }
對外提供的public方法release是AQS互斥模式下釋放鎖的總控方法。當子類實現的tryRelease嘗試解鎖成功而返回true時,若當前同步隊列head頭節點存在,且頭節點的next后繼也存在,則將頭節點的直接后繼節點對應的線程喚醒。
2.4 AQS多線程入隊/出隊時的臨界狀態分析
將加鎖失敗入隊阻塞的acquireQueued方法和解鎖成功時喚醒后繼的unparkSuccessor方法結合起來就會發現,大多數時候互斥模式下head節點對應線程在釋放鎖時,會通過LockSupport.unpark方法將此前被LockSupport.park阻塞的后繼節點喚醒。被喚醒的后繼節點由於處於循環中,會再度嘗試tryAcquire方法,如果返回true則加鎖成功成為新的頭節點;如果返回false則將被LockSupport.park再次阻塞,等待被喚醒。
但在上述場景之外還存在幾個微妙的臨界狀態需要仔細分析。
1. 加鎖失敗的線程節點還未完全入隊時,恰好此時head節點對應線程釋放了互斥鎖
已經獲取到鎖的線程A在釋放互斥鎖時,爭用鎖失敗的線程B正在入隊(假設當前只有A、B兩個線程存在),但其前驅head頭節點還未與之建立next關聯(線程B的節點已經成為新的隊尾節點但在addWaiter中的pred.next = node執行前)。此時線程A執行release方法,發現頭節點的next引用不存在,因此不會去執行LockSupport.unpark方法。但只要子類在tryRelease中正確的進行了處理,那么將不會出現lost wakeup問題。
具體來說,當線程B執行acquireQueued時,tryAcquire要么返回true,線程B成功加鎖而無需陷入阻塞態;或者非公平模式下有別的線程C已經搶先獲得了鎖,那么線程B節點執行accuireQueued時已經是head節點的直接后繼了,tryAcquire由於C已經搶險加鎖成功而返回false后線程B陷入阻塞,當搶先獲得鎖的線程C釋放鎖時執行release方法時便能正確的將線程A喚醒。
2. 入隊線程執行acquiredQueued與持有鎖的線程解鎖時unparkSuccessor並發執行時
現在考慮這樣一種場景,爭用鎖線程A調用acquire方法時加鎖失敗,其線程節點作為head頭節點的直接后繼已經入隊完成,並且已經執行到acquireQueued方法內部,正准備執行LockSupport.park將自己阻塞(還未執行park方法,且此時head.next = NodeA);另一方面之前持有鎖的線程B正在通過release釋放鎖,在tryRelease返回true后,准備通過unparkSuccessor中的LockSupport.unpark將后繼喚醒。
那么如果在線程A執行LockSupport.park(this)阻塞自己之前,線程B先一步執行了LockSupport.unpark(next.thread),線程A會不會出現lost wakeup問題呢?
答案是不會的,LockSupport.park/unpark一般成對的使用用於線程同步,其內部有一個基於線程級別的、允許預先設置的許可標記,且默認情況下許可是不存在的。
LockSupport.park時如果發現許可不存在會令當前線程陷入阻塞態;若許可已存在則不必阻塞直接返回,並消費掉許可位(再次執行的話就會被阻塞了)。而LockSupport.unpark(thread)時如果發現對應線程thread此時沒有因為執行了LockSupport.park陷入阻塞,則會為參數線程thread預先設置一個許可;若對應線程已經先執行了LockSupport.park,則會將線程從阻塞態喚醒。
若線程A先LockSupport.park則會先陷入阻塞,線程B隨后會通過LockSupport.unpark(thread)將線程A喚醒;若線程B先執行LockSupport.unpark(thread),則會為線程A預先設置許可,當線程A后執行LockSupport.park時發現許可已存在,則消費掉許可並直接返回而不會陷入阻塞。
綜上所屬,無論並發時線程A的線程是先於線程B調用LockSupport.unpark(thread)前調用LockSupport.park還是之后調用,都不會有問題。
2.5 AQS提供的hasQueuedPredecessors方法
對於需要先來先服務的公平鎖,如果之前同步隊列中已經存在其它需要爭用鎖的線程節點,則當前加鎖線程作為后來者將不能先於之前的線程獲得鎖。通過AQS提供的hasQueuedPredecessors方法就能很簡單的輔助同步器實現先來先服務的公平鎖機制。
hasQueuedPredecessors實現:
public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; if(h == t){ // tail=head兩種情況,都為null說明隊列為空;或者都指向同一個節點,隊列長度為1 // 說明此時隊列中並沒有其它等待鎖的線程,返回false return false; } Node secondNode = h.next; if(secondNode != null){ if(secondNode.thread == Thread.currentThread()){ // 頭節點存在后繼節點,且后繼節點就是當前線程自己,因此不需要排隊 return false; }else{ // 頭節點存在后繼節點,但后繼節點不是當前線程,因此需要排隊 return true; } }else{ // tail != head,但是頭節點卻沒有next節點,這是一種特殊的場景 // 在enq入隊操作的初始化隊列操作時可能會出現,先通過compareAndSetHead設置了頭節點,但是還沒執行tail = head操作前的瞬間會出現 // 此時,說明已經有一個別的線程正在執行入隊操作,而當前線程此時還未進行入隊,相對進度更慢,所以還是需要去排隊的 return true; } }
熟悉jdk AQS源碼的讀者可能會注意到上述代碼形式於jdk中的代碼結構不太一樣,但實際上這里只是將jdk源碼中的代碼在保持邏輯不變的情況下進行了適當改寫,使得各個判斷的邏輯分支更為清晰而更易理解。
3.ReentrantLock可重入互斥鎖工作原理
在理解了AQS的同步隊列工作原理后,AQS互斥模式的工作原理就比較容易理解了。對於一個基於AQS互斥模式的同步鎖,AQS要做的就是在線程爭用鎖失敗時為對應線程創建一個線程節點並使其插入隊尾,同時令其暫時進入阻塞態以等待之前已獲得鎖的線程在釋放鎖之后將其喚醒。
雖然已經進行了關於AQS的同步隊列與互斥模式工作原理的介紹,但AQS抽象同步隊列無愧於抽象之名,光是站在設計者的角度對內部各個模塊進行拆解學習是不夠的,還需要站在使用者的角度將上面關於AQS的各個模塊串聯起來,從AQS的外部以一個更全面的視角來學習和加深對其的理解。而同樣位於juc包下的可重入互斥鎖ReentrantLock就是一個基於AQS互斥模式,非常常見的同步器。通過對ReentrantLock實現的學習,將其與AQS的工作原理結合起來,看看ReentrantLock是如何基於AQS框架實現互斥、可重入、公平/非公平等特性的。
為了更好地測試我們實現的AQS,這里將ReentrantLock的源碼進行一定的簡化(MyReentrantLock),暫時去除了條件變量Condition等內容,僅保留了最基本的互斥加鎖功能,同時將使用到jdk中AQS的地方替換成我們自己實現的MyAQS版本。
MyReentrantLock類實現:
/** * 將jdk的ReentrantLock中的aqs改成MyAqsV1 */ public class MyReentrantLockV1 { private final MyReentrantLockV1.Sync sync; public MyReentrantLockV1(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } abstract static class Sync extends MyAqsV1 { abstract void lock(); final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } } static final class NonfairSync extends Sync { /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } static final class FairSync extends Sync { final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } public void lock() { sync.lock(); } public void unlock() { sync.release(1); } }
juc包中並沒有對同步器的API做一個統一的定義,因此不同的同步器可以有不同的api,所以AQS在ReentrantLock中是以組合,而不是被直接繼承的形式使用的。ReentrantLock內部抽象出了一個叫做Sync的類用於繼承並實現AQS中的抽象方法,將AQS的acquire、release等較為抽象的方法名給封裝起來,以更加符合語義的方法名lock、unlock對外暴露接口。
ReentrantLock中利用AQS提供的state屬性判斷當前是否有線程已經獲得互斥鎖,以及存儲重入加鎖的次數。具體來說,state=0代表當前沒有線程持有該互斥鎖,state>0時代表當前已經有線程持有了鎖;ReentrantLock是可重入的,持有鎖的線程在每次加鎖成功時state自增1,解鎖時state自減1,當state減至0時便代表釋放了鎖。因此ReentrantLock在使用時,需要如果線程多次調用lock加鎖成功時,也需要調用同樣次數的unlock方法才能成功釋放鎖。
ReentrantLock支持以公平互斥鎖和非公平互斥鎖兩種模式工作,可以通過構造方法傳入boolean類型參數指定。
3.1 ReentrantLock公平鎖工作原理
公平模式下的ReentrantLock是由FairSync來實現sync抽象類的。
公平模式下FairSync加鎖
ReentrantLock公平模式下lock加鎖時執行sync.lock,實際上是執行子類FairSync的lock方法。FairSync的lock實現中會直接調用AQS的acquire方法,並且傳入參數1(acquire(1))。而AQS的acquire方法中將arg參數1透傳給由子類FairSync實現的tryAcquire方法。
FairSync的tryAcquire實現中,首先判斷state的值.
如果state為0,則先通過AQS的hasQueuedPredecessors判斷當前同步隊列是否已經存在其它的線程節點。作為公平鎖hasQueuedPredecessors如果返回true,則tryAcquire直接返回false嘗試加鎖失敗;如果hasQueuedPredecessors返回false,則進一步通過CAS的方式嘗試着將state由0變為1,如果CAS成功則代表加鎖成功,CAS失敗則代表有其它線程也在這個瞬間並發的進行了加鎖操作且CAS設置state由0到1成功,則當前線程也是tryAcquire加鎖失敗返回false。
如果state不為0,說明當前ReentrantLock已經有線程持有鎖了,則進一步的通過current == getExclusiveOwnerThread()判斷當前加鎖的線程是否就是持有鎖的線程,如果加鎖線程不是持有鎖的線程,基於互斥性則加鎖失敗tryAcquire返回false;而如果當前加鎖的線程正是持有鎖的線程,則將當前state自增1,返回true代表加鎖成功。
加鎖失敗時tryAcquire返回false,則會接着執行AQS的acquire方法中后續的線程節點入隊和線程阻塞邏輯(addWaiter、acquireQueued)。
公平模式下解鎖
ReentrantLock公平模式下解鎖時執行sync.unlock方法,實際上是執行父類中AQS的release方法。AQS的release方法傳入arg參數1,release方法中調用並透傳arg參數1給由子類實現的tryRelease方法。ReentrantLock中sync類實現了tryRelease方法中,先將當前state減去1(arg參數),若發現減去1之后的state為0,則代表當前線程已經成功釋放了鎖,tryRelease返回true;若state減去1之后state依然大於0,說明當前可重入鎖還不能釋放鎖,tryRelease返回false。
如果解鎖時tryRelease了返回true,則AQS的release方法中會接着喚醒當前持有鎖節點的(head節點)直接后繼節點(unparkSuccessor)。
3.2 ReentrantLock非公平鎖工作原理
非公平模式下的ReentrantLock是由NonFairSync來實現sync抽象類的。
非公平模式下NonFairSync加鎖
與公平模式下FairSync的實現不同的是,非公平鎖下線程搶占鎖時不需要考慮公平性,即加鎖時不需要去考慮當前是否已經有別的線程先一步等待加鎖。
因此NonFairSync的lock實現中首先嘗試着通過CAS的方式去嘗試着獲得鎖(compareAndSetState(0,1)),如果CAS操作成功則直接令當前線程獲得鎖(setExclusiveOwnerThread(Thread.currentThread()))。而如果CAS操作失敗則和FairSync一樣會去調用AQS的acquire方法,並且傳入參數1(acquire(1))。而AQS的acquire方法中將arg參數1透傳給由子類NonFairSync實現的tryAcquire方法。
NonFairSync的tryAcquire實現和公平鎖中的FairSync基本一致,唯一的區別在於當發現state為0時NonFairSync不會通過hasQueuedPredecessors校驗隊列中是否已存在其它等待獲得鎖的線程,而是允許當前線程直接通過CAS操作嘗試爭搶鎖。因此非公平模式下,當之前獲取到鎖的線程通過unlock釋放鎖將state設置為0時,即使當前隊列中已經存在其它被阻塞等待被喚醒的線程時,新來爭用鎖的線程也可能搶在隊列中線程被喚醒並拿到鎖之前搶先一步獲得鎖(state+1)。此時被喚醒的線程接着執行acquireQueued方法時再一次嘗試tryAcquire,因為state不為0而返回失敗后便又會繼續陷入阻塞態。
非公平模式下解鎖
ReentrantLock非公平模式下的解鎖與公平模式下的解鎖邏輯完全一致,不再贅述。
3.3 ReentrantLock公平鎖與非公平鎖的差別
- 從功能上來說,ReentrantLock的公平鎖和非公平鎖的主要區別在於非公平鎖允許后申請爭用鎖的線程搶先獲得鎖,而公平鎖不能。
- 從代碼實現上來說,非公平模式下允許爭搶鎖的時機主要有兩處:一是在lock時允許嘗試一次CAS操作進行爭搶(NonFairSync.lock方法);二是在tryAcquire時如果發現state為0,允許當前線程再一次的通過CAS嘗試爭搶(NonFairSync.tryAcquire方法)。但是非互斥模式下加鎖時若這兩次爭用都未能成功獲取到鎖,則當前線程依然會和公平鎖模式下一樣乖乖的加入同步隊列,等待之前已在隊列中的線程依次釋放鎖后來將其喚醒。
-
從性能上來說,由於頭節點線程釋放鎖時,喚醒隊列中被阻塞的線程時由於涉及到操作系統中線程的上下文切換而存在一定的延遲。若此時能允許新的線程在加鎖時搶先獲得鎖去執行被互斥鎖保護的臨界區邏輯,則總體的加鎖/解鎖吞吐量會較之不允許搶占的策略更高。因此非公平鎖的性能會略高於公平鎖,但在並發較高的場景下新加鎖的線程可能會頻繁的搶先,導致已存在於隊列中的線程長時間無法獲得鎖而出現飢餓問題。
總結
MyAQSV1完整實現:

import sun.misc.Unsafe; import java.lang.reflect.Field; import java.util.concurrent.locks.LockSupport; /** * 自己實現的aqs,v1版本 * 只支持互斥鎖模式(無法處理被阻塞線程發生被中斷) */ public abstract class MyAqsV1 implements MyAqs { private volatile int state; private transient volatile Node head; private transient volatile Node tail; private transient Thread exclusiveOwnerThread; private static final Unsafe unsafe; private static final long stateOffset; private static final long headOffset; private static final long tailOffset; static { try { // 由於提供給cas內存中字段偏移量的unsafe類只能在被jdk信任的類中直接使用,這里使用反射來繞過這一限制 Field getUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); getUnsafe.setAccessible(true); unsafe = (Unsafe) getUnsafe.get(null); stateOffset = unsafe.objectFieldOffset(MyAqsV1.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset(MyAqsV1.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset(MyAqsV1.class.getDeclaredField("tail")); } catch (Exception var1) { throw new Error(var1); } } /** * 設置獨占當前aqs的線程 * */ protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } /** * 獲取獨占當前aqs的線程 * */ protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } /** * 嘗試加互斥鎖,如果加鎖失敗則當前線程進入阻塞狀態 * */ @Override public final boolean acquire(int arg) { // 嘗試着去申請互斥鎖 boolean acquireResult = tryAcquire(arg); if(!acquireResult){ // 申請互斥鎖失敗,新創建一個綁定當前線程的節點 Node newWaiterNode = addWaiter(); // 嘗試着加入同步隊列 acquireQueued(newWaiterNode,arg); } // 不支持中斷功能,返回false return false; } /** * 嘗試着釋放互斥鎖 * @return true:釋放成功;false:釋放失敗 * */ @Override public final boolean release(int arg) { // 嘗試着釋放鎖 if (tryRelease(arg)) { // 成功釋放 Node h = this.head; if (h != null) { unparkSuccessor(h); } return true; } return false; } @Override public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; if(h == t){ // tail=head兩種情況,都為null說明隊列為空;或者都指向同一個節點,隊列長度為1 // 說明此時隊列中並沒有其它等待鎖的線程,返回false return false; } Node secondNode = h.next; if(secondNode != null){ if(secondNode.thread == Thread.currentThread()){ // 頭節點存在后繼節點,且就是當前線程,因此不需要排隊 return false; }else{ // 頭節點存在后繼節點,但不是當前線程,因此需要排隊 return true; } }else{ // tail != head,但是頭節點卻沒有next節點,這是一種特殊的場景 // 在enq入隊操作的初始化隊列操作時可能會出現,先通過compareAndSetHead設置了頭節點,但是還沒執行tail = head操作前的瞬間會出現 // 此時,說明已經有一個別的線程正在執行入隊操作,當前線程此時還未進行入隊,進度更慢,所以還是需要去排隊的 return true; } } /** * 嘗試着加入隊列 * */ private void acquireQueued(final Node node, int arg) { for (; ; ) { final Node p = node.predecessor(); // 如果需要入隊的節點是aqs頭節點的next節點,則最后嘗試一次tryAcquire獲取鎖 // 這里的判斷有兩個作用 // 1 當前線程第一次執行acquireQueued還未被LockSupport.park阻塞前,若當前線程的前驅恰好是頭節點則 // 最后再通過tryAcquire判斷一次,若恰好這個臨界點上頭節點對應的線程已經釋放了鎖,則可以免去一次LockSupport.park // 2 當前線程已經不是第一次執行acquireQueued,而是已經至少被LockSupport.park阻塞過一次 // 則在被前驅節點喚醒后在for的無限循環中通過tryAcquired再嘗試一次加鎖 // 若是公平鎖模式下,則此時tryAcquire應該會返回true而加鎖成功return退出 // 若是非公平鎖模式下,若此時有別的線程搶先獲得了鎖,則tryAcquire返回false,當前被喚醒的線程再一次通過LockSupport.park陷入阻塞 if (p == head && tryAcquire(arg)) { // tryAcquire獲取鎖成功成功,說明此前的瞬間頭節點對應的線程已經釋放了鎖 // 令當前入隊的節點成為aqs中新的head節點 setHead(node); p.next = null; // help GC return; }else{ // 阻塞當前線程 LockSupport.park(this); } } } /** * 喚醒后繼節點 * */ private void unparkSuccessor(Node node) { Node next = node.next; if(next != null) { LockSupport.unpark(next.thread); } } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } /** * 創建當前線程對應的同步隊列節點 * 令該隊列節點插入隊尾 * */ private Node addWaiter() { Node node = new Node(Thread.currentThread()); Node pred = tail; if (pred != null) { node.prev = pred; // 當隊列不為空時,執行一次快速的入隊操作(因為少了一次enq方法調用,會快一點?) if (compareAndSetTail(pred, node)) { // 快速入隊成功,直接返回 pred.next = node; return node; } } // 上面的快速入隊操作失敗了,使用enq循環cas直到入隊(隊列為空,利用enq方法初始化同步隊列) enq(node); return node; } /** * 入隊操作 * 使用CAS操作+無限重試的方式來解決並發沖突的問題 * @return 返回新的隊尾節點 * */ private Node enq(final Node node) { for (;;) { Node currentTailNode = tail; if (currentTailNode == null) { // AQS的同步隊列是惰性加載的,如果tail為null說明隊列為空(head=null && tail=null) if (compareAndSetHead(new Node())) { // 使用cas的方式先創建一個新節點,令tail和head同時指向這一新節點 // 並發時多個線程同時執行,只會有一個線程成功執行compareAndSetHead這一cas操作 tail = head; } } else { // 令當前入隊節點node插入隊尾 node.prev = currentTailNode; // 使用cas的方式令aqs的tail指向node,使得新節點node成為新的隊尾元素 if (compareAndSetTail(currentTailNode, node)) { // 並發時多個線程同時執行,獲取到的tail引用值是一樣的,只有最先執行compareAndSetTail的線程會成功 // compareAndSetTail執行成功后令tail引用指向了一個新的節點,因此同一時刻獲取到相同tail引用的線程cas插入隊尾的操作會失敗(expect不對了) currentTailNode.next = node; return currentTailNode; } // compareAndSetTail執行失敗的線程會進入新的循環,反復嘗試compareAndSetTail的cas操作直到最終成功 } } } /** * 嘗試着去申請互斥鎖(抽象方法,由具體的實現類控制) * */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /** * 嘗試着去釋放互斥鎖(抽象方法,由具體的實現類控制) * */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } /** * 內部同步隊列的節點 * */ static final class Node { volatile Node prev; volatile Node next; volatile Thread thread; public Node() { } Node(Thread thread) { this.thread = thread; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) { throw new NullPointerException(); } else { return p; } } } private boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } private boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } }
本篇博客通過模仿jdk的AQS,實現了一個暫時只支持互斥模式的簡易版AQS,即MyAQSV1。MyAQSV1在實現功能的基礎上,盡可能的裁剪了jdk實現中與互斥功能不相關的邏輯,以一個最小子集的形式工作;同時也對jdk實現中一些不易理解的地方進行了改寫使之更易理解。MyAQSV1從內部解析了AQS互斥模式的工作原理,而簡易版的互斥鎖MyReentrantLock則讓我們能夠以外部使用者的角度來觀察AQS的行為。
希望這篇博客能夠幫助讀者更好的理解AQS中無鎖並發的同步隊列和互斥模式下爭用鎖和釋放鎖的機制,以及ReentrantLock中公平鎖與非公平鎖具體實現上和性能上的差異。
本篇博客的完整代碼在我的github上:https://github.com/1399852153/Reinventing-the-wheel-for-learning(AQS模塊)。
由於AQS無鎖並發機制的復雜性,可能MyAQSV1在裁剪、改寫jdk實現的過程中無意中引入了一些bug,如有錯誤,還請多多指教。