目錄
📦 本文以及示例源碼已歸檔在 javacore
一、並發鎖簡介
確保線程安全最常見的做法是利用鎖機制(Lock
、sychronized
)來對共享數據做互斥同步,這樣在同一個時刻,只有一個線程可以執行某個方法或者某個代碼塊,那么操作必然是原子性的,線程安全的。
在工作、面試中,經常會聽到各種五花八門的鎖,聽的人雲里霧里。鎖的概念術語很多,它們是針對不同的問題所提出的,通過簡單的梳理,也不難理解。
可重入鎖
可重入鎖又名遞歸鎖,是指 同一個線程在外層方法獲取了鎖,在進入內層方法會自動獲取鎖。
可重入鎖可以在一定程度上避免死鎖。
ReentrantLock
、ReentrantReadWriteLock
是可重入鎖。這點,從其命名也不難看出。synchronized
也是一個可重入鎖。
synchronized void setA() throws Exception{ Thread.sleep(1000); setB(); } synchronized void setB() throws Exception{ Thread.sleep(1000); }
上面的代碼就是一個典型場景:如果使用的鎖不是可重入鎖的話,setB
可能不會被當前線程執行,從而造成死鎖。
公平鎖與非公平鎖
- 公平鎖 - 公平鎖是指 多線程按照申請鎖的順序來獲取鎖。
- 非公平鎖 - 非公平鎖是指 多線程不按照申請鎖的順序來獲取鎖 。這就可能會出現優先級反轉(后來者居上)或者飢餓現象(某線程總是搶不過別的線程,導致始終無法執行)。
公平鎖為了保證線程申請順序,勢必要付出一定的性能代價,因此其吞吐量一般低於非公平鎖。
公平鎖與非公平鎖 在 Java 中的典型實現:
synchronized
只支持非公平鎖。ReentrantLock
、ReentrantReadWriteLock
,默認是非公平鎖,但支持公平鎖。
獨享鎖與共享鎖
獨享鎖與共享鎖是一種廣義上的說法,從實際用途上來看,也常被稱為互斥鎖與讀寫鎖。
- 獨享鎖 - 獨享鎖是指 鎖一次只能被一個線程所持有。
- 共享鎖 - 共享鎖是指 鎖可被多個線程所持有。
獨享鎖與共享鎖在 Java 中的典型實現:
synchronized
、ReentrantLock
只支持獨享鎖。ReentrantReadWriteLock
其寫鎖是獨享鎖,其讀鎖是共享鎖。讀鎖是共享鎖使得並發讀是非常高效的,讀寫,寫讀 ,寫寫的過程是互斥的。
悲觀鎖與樂觀鎖
樂觀鎖與悲觀鎖不是指具體的什么類型的鎖,而是處理並發同步的策略。
- 悲觀鎖 - 悲觀鎖對於並發采取悲觀的態度,認為:不加鎖的並發操作一定會出問題。悲觀鎖適合寫操作頻繁的場景。
- 樂觀鎖 - 樂觀鎖對於並發采取樂觀的態度,認為:不加鎖的並發操作也沒什么問題。對於同一個數據的並發操作,是不會發生修改的。在更新數據的時候,會采用不斷嘗試更新的方式更新數據。樂觀鎖適合讀多寫少的場景。
悲觀鎖與樂觀鎖在 Java 中的典型實現:
-
悲觀鎖在 Java 中的應用就是通過使用
synchronized
和Lock
顯示加鎖來進行互斥同步,這是一種阻塞同步。 -
樂觀鎖在 Java 中的應用就是采用 CAS 機制(CAS 操作通過
Unsafe
類提供,但這個類不直接暴露為 API,所以都是間接使用,如各種原子類)。
輕量級鎖、重量級鎖與偏向鎖
所謂輕量級鎖與重量級鎖,指的是鎖控制粒度的粗細。顯然,控制粒度越細,阻塞開銷越小,並發性也就越高。
Java 1.6 以前,重量級鎖一般指的是 synchronized
,而輕量級鎖指的是 volatile
。
Java 1.6 以后,針對 synchronized
做了大量優化,引入 4 種鎖狀態: 無鎖狀態、偏向鎖、輕量級鎖和重量級鎖。鎖可以單向的從偏向鎖升級到輕量級鎖,再從輕量級鎖升級到重量級鎖 。
- 偏向鎖 - 偏向鎖是指一段同步代碼一直被一個線程所訪問,那么該線程會自動獲取鎖。降低獲取鎖的代價。
-
輕量級鎖 - 是指當鎖是偏向鎖的時候,被另一個線程所訪問,偏向鎖就會升級為輕量級鎖,其他線程會通過自旋的形式嘗試獲取鎖,不會阻塞,提高性能。
-
重量級鎖 - 是指當鎖為輕量級鎖的時候,另一個線程雖然是自旋,但自旋不會一直持續下去,當自旋一定次數的時候,還沒有獲取到鎖,就會進入阻塞,該鎖膨脹為重量級鎖。重量級鎖會讓其他申請的線程進入阻塞,性能降低。
分段鎖
分段鎖其實是一種鎖的設計,並不是具體的一種鎖。所謂分段鎖,就是把鎖的對象分成多段,每段獨立控制,使得鎖粒度更細,減少阻塞開銷,從而提高並發性。這其實很好理解,就像高速公路上的收費站,如果只有一個收費口,那所有的車只能排成一條隊繳費;如果有多個收費口,就可以分流了。
Hashtable
使用 synchronized
修飾方法來保證線程安全性,那么面對線程的訪問,Hashtable 就會鎖住整個對象,所有的其它線程只能等待,這種阻塞方式的吞吐量顯然很低。
Java 1.7 以前的 ConcurrentHashMap
就是分段鎖的典型案例。ConcurrentHashMap
維護了一個 Segment
數組,一般稱為分段桶。
final Segment<K,V>[] segments;
當有線程訪問 ConcurrentHashMap
的數據時,ConcurrentHashMap
會先根據 hashCode 計算出數據在哪個桶(即哪個 Segment),然后鎖住這個 Segment
。
顯示鎖和內置鎖
Java 1.5 之前,協調對共享對象的訪問時可以使用的機制只有 synchronized
和 volatile
。這兩個都屬於內置鎖,即鎖的申請和釋放都是由 JVM 所控制。
Java 1.5 之后,增加了新的機制:ReentrantLock
、ReentrantReadWriteLock
,這類鎖的申請和釋放都可以由程序所控制,所以常被稱為顯示鎖。
💡
synchronized
的用法和原理可以參考:Java 並發基礎機制 - synchronized 。🔔 注意:如果不需要
ReentrantLock
、ReentrantReadWriteLock
所提供的高級同步特性,應該優先考慮使用synchronized
。理由如下:
- Java 1.6 以后,
synchronized
做了大量的優化,其性能已經與ReentrantLock
、ReentrantReadWriteLock
基本上持平。- 從趨勢來看,Java 未來更可能會優化
synchronized
,而不是ReentrantLock
、ReentrantReadWriteLock
,因為synchronized
是 JVM 內置屬性,它能執行一些優化。ReentrantLock
、ReentrantReadWriteLock
申請和釋放鎖都是由程序控制,如果使用不當,可能造成死鎖,這是很危險的。
以下對比一下顯示鎖和內置鎖的差異:
- 主動獲取鎖和釋放鎖
synchronized
不能主動獲取鎖和釋放鎖。獲取鎖和釋放鎖都是 JVM 控制的。ReentrantLock
可以主動獲取鎖和釋放鎖。(如果忘記釋放鎖,就可能產生死鎖)。
- 響應中斷
synchronized
不能響應中斷。ReentrantLock
可以響應中斷。
- 超時機制
synchronized
沒有超時機制。ReentrantLock
有超時機制。ReentrantLock
可以設置超時時間,超時后自動釋放鎖,避免一直等待。
- 支持公平鎖
synchronized
只支持非公平鎖。ReentrantLock
支持非公平鎖和公平鎖。
- 是否支持共享
- 被
synchronized
修飾的方法或代碼塊,只能被一個線程訪問(獨享)。如果這個線程被阻塞,其他線程也只能等待 ReentrantLock
可以基於Condition
靈活的控制同步條件。
- 被
- 是否支持讀寫分離
synchronized
不支持讀寫鎖分離;ReentrantReadWriteLock
支持讀寫鎖,從而使阻塞讀寫的操作分開,有效提高並發性。
二、AQS
AbstractQueuedSynchronizer
(簡稱 AQS)是隊列同步器,顧名思義,其主要作用是處理同步。它是並發鎖和很多同步工具類的實現基石(如ReentrantLock
、ReentrantReadWriteLock
、Semaphore
等)。因此,要想深入理解
ReentrantLock
、ReentrantReadWriteLock
等並發鎖和同步工具,必須先理解 AQS 的要點和原理。
AQS 的要點
在 java.util.concurrent.locks
包中的相關鎖(常用的有 ReentrantLock
、 ReadWriteLock
)都是基於 AQS 來實現。這些鎖都沒有直接繼承 AQS,而是定義了一個 Sync
類去繼承 AQS。為什么要這樣呢?因為鎖面向的是使用用戶,而同步器面向的則是線程控制,那么在鎖的實現中聚合同步器而不是直接繼承 AQS 就可以很好的隔離二者所關注的事情。
AQS 提供了對獨享鎖與共享鎖的支持。
獨享鎖 API
獲取、釋放獨享鎖的主要 API 如下:
public final void acquire(int arg) public final void acquireInterruptibly(int arg) public final boolean tryAcquireNanos(int arg, long nanosTimeout) public final boolean release(int arg)
acquire
- 獲取獨占鎖。acquireInterruptibly
- 獲取可中斷的獨占鎖。tryAcquireNanos
- 嘗試在指定時間內獲取可中斷的獨占鎖。在以下三種情況下回返回:- 在超時時間內,當前線程成功獲取了鎖;
- 當前線程在超時時間內被中斷;
- 超時時間結束,仍未獲得鎖返回 false。
release
- 釋放獨占鎖。
共享鎖 API
獲取、釋放共享鎖的主要 API 如下:
public final void acquireShared(int arg) public final void acquireSharedInterruptibly(int arg) public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) public final boolean releaseShared(int arg)
acquireShared
- 獲取共享鎖。acquireSharedInterruptibly
- 獲取可中斷的共享鎖。tryAcquireSharedNanos
- 嘗試在指定時間內獲取可中斷的共享鎖。release
- 釋放共享鎖。
AQS 的原理
AQS 的數據結構
閱讀 AQS 的源碼,可以發現:AQS 繼承自 AbstractOwnableSynchronize
。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** 等待隊列的隊頭,懶加載。只能通過 setHead 方法修改。 */ private transient volatile Node head; /** 等待隊列的隊尾,懶加載。只能通過 enq 方法添加新的等待節點。*/ private transient volatile Node tail; /** 同步狀態 */ private volatile int state; }
state
- AQS 使用一個整型的volatile
變量來 維護同步狀態。- 這個整數狀態的意義由子類來賦予,如
ReentrantLock
中該狀態值表示所有者線程已經重復獲取該鎖的次數,Semaphore
中該狀態值表示剩余的許可數量。
- 這個整數狀態的意義由子類來賦予,如
head
和tail
- AQS 維護了一個Node
類型(AQS 的內部類)的雙鏈表來完成同步狀態的管理。這個雙鏈表是一個雙向的 FIFO 隊列,通過head
和tail
指針進行訪問。當 有線程獲取鎖失敗后,就被添加到隊列末尾。
再來看一下 Node
的源碼
static final class Node { /** 該等待同步的節點處於共享模式 */ static final Node SHARED = new Node(); /** 該等待同步的節點處於獨占模式 */ static final Node EXCLUSIVE = null; /** 線程等待狀態,狀態值有: 0、1、-1、-2、-3 */ volatile int waitStatus; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /** 前驅節點 */ volatile Node prev; /** 后繼節點 */ volatile Node next; /** 等待鎖的線程 */ volatile Thread thread; /** 和節點是否共享有關 */ Node nextWaiter; }
很顯然,Node 是一個雙鏈表結構。
waitStatus
-Node
使用一個整型的volatile
變量來 維護 AQS 同步隊列中線程節點的狀態。waitStatus
有五個狀態值:CANCELLED(1)
- 此狀態表示:該節點的線程可能由於超時或被中斷而 處於被取消(作廢)狀態,一旦處於這個狀態,表示這個節點應該從等待隊列中移除。SIGNAL(-1)
- 此狀態表示:后繼節點會被掛起,因此在當前節點釋放鎖或被取消之后,必須喚醒(unparking
)其后繼結點。CONDITION(-2)
- 此狀態表示:該節點的線程 處於等待條件狀態,不會被當作是同步隊列上的節點,直到被喚醒(signal
),設置其值為 0,再重新進入阻塞狀態。PROPAGATE(-3)
- 此狀態表示:下一個acquireShared
應無條件傳播。- 0 - 非以上狀態。
獨占鎖的獲取和釋放
獲取獨占鎖
AQS 中使用 acquire(int arg)
方法獲取獨占鎖,其大致流程如下:
- 先嘗試獲取同步狀態,如果獲取同步狀態成功,則結束方法,直接返回。
- 如果獲取同步狀態不成功,AQS 會不斷嘗試利用 CAS 操作將當前線程插入等待同步隊列的隊尾,直到成功為止。
- 接着,不斷嘗試為等待隊列中的線程節點獲取獨占鎖。
詳細流程可以用下圖來表示,請結合源碼來理解(一圖勝千言):
釋放獨占鎖
AQS 中使用 release(int arg)
方法釋放獨占鎖,其大致流程如下:
- 先嘗試獲取解鎖線程的同步狀態,如果獲取同步狀態不成功,則結束方法,直接返回。
- 如果獲取同步狀態成功,AQS 會嘗試喚醒當前線程節點的后繼節點。
獲取可中斷的獨占鎖
AQS 中使用 acquireInterruptibly(int arg)
方法獲取可中斷的獨占鎖。
acquireInterruptibly(int arg)
實現方式相較於獲取獨占鎖方法( acquire
)非常相似,區別僅在於它會通過 Thread.interrupted
檢測當前線程是否被中斷,如果是,則立即拋出中斷異常(InterruptedException
)。
獲取超時等待式的獨占鎖
AQS 中使用 tryAcquireNanos(int arg)
方法獲取超時等待的獨占鎖。
doAcquireNanos 的實現方式 相較於獲取獨占鎖方法( acquire
)非常相似,區別在於它會根據超時時間和當前時間計算出截止時間。在獲取鎖的流程中,會不斷判斷是否超時,如果超時,直接返回 false;如果沒超時,則用 LockSupport.parkNanos
來阻塞當前線程。
共享鎖的獲取和釋放
獲取共享鎖
AQS 中使用 acquireShared(int arg)
方法獲取共享鎖。
acquireShared
方法和 acquire
方法的邏輯很相似,區別僅在於自旋的條件以及節點出隊的操作有所不同。
成功獲得共享鎖的條件如下:
tryAcquireShared(arg)
返回值大於等於 0 (這意味着共享鎖的 permit 還沒有用完)。- 當前節點的前驅節點是頭結點。
釋放共享鎖
AQS 中使用 releaseShared(int arg)
方法釋放共享鎖。
releaseShared
首先會嘗試釋放同步狀態,如果成功,則解鎖一個或多個后繼線程節點。釋放共享鎖和釋放獨享鎖流程大體相似,區別在於:
對於獨享模式,如果需要 SIGNAL,釋放僅相當於調用頭節點的 unparkSuccessor
。
獲取可中斷的共享鎖
AQS 中使用 acquireSharedInterruptibly(int arg)
方法獲取可中斷的共享鎖。
acquireSharedInterruptibly
方法與 acquireInterruptibly
幾乎一致,不再贅述。
獲取超時等待式的共享鎖
AQS 中使用 tryAcquireSharedNanos(int arg)
方法獲取超時等待式的共享鎖。
tryAcquireSharedNanos
方法與 tryAcquireNanos
幾乎一致,不再贅述。
三、ReentrantLock
ReentrantLock
類是Lock
接口的具體實現,它是一個可重入鎖。與內置鎖synchronized
不同,ReentrantLock
提供了一組無條件的、可輪詢的、定時的以及可中斷的鎖操作,所有獲取鎖、釋放鎖的操作都是顯式的操作。
ReentrantLock 的特性
ReentrantLock
的特性如下:
ReentrantLock
提供了與synchronized
相同的互斥性、內存可見性和可重入性。ReentrantLock
支持公平鎖和非公平鎖(默認)兩種模式。ReentrantLock
實現了Lock
接口,支持了synchronized
所不具備的靈活性。synchronized
無法中斷一個正在等待獲取鎖的線程synchronized
無法在請求獲取一個鎖時無休止地等待
Lock
的接口定義如下:
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
lock()
- 獲取鎖。unlock()
- 釋放鎖。tryLock()
- 嘗試獲取鎖,僅在調用時鎖未被另一個線程持有的情況下,才獲取該鎖。tryLock(long time, TimeUnit unit)
- 和tryLock()
類似,區別僅在於限定時間,如果限定時間內未獲取到鎖,視為失敗。lockInterruptibly()
- 鎖未被另一個線程持有,且線程沒有被中斷的情況下,才能獲取鎖。newCondition()
- 返回一個綁定到Lock
對象上的Condition
實例。
ReentrantLock 的用法
前文了解了 ReentrantLock
的特性,接下來,我們要講述其具體用法。
ReentrantLock 的構造方法
ReentrantLock
有兩個構造方法:
public ReentrantLock() {} public ReentrantLock(boolean fair) {}
ReentrantLock()
- 默認構造方法會初始化一個非公平鎖(NonfairSync);ReentrantLock(boolean)
-new ReentrantLock(true)
會初始化一個公平鎖(FairSync)。
lock 和 unlock 方法
lock()
- 無條件獲取鎖。如果當前線程無法獲取鎖,則當前線程進入休眠狀態不可用,直至當前線程獲取到鎖。如果該鎖沒有被另一個線程持有,則獲取該鎖並立即返回,將鎖的持有計數設置為 1。unlock()
- 用於釋放鎖。
🔔 注意:請務必牢記,獲取鎖操作
lock()
必須在try catch
塊中進行,並且將釋放鎖操作unlock()
放在finally
塊中進行,以保證鎖一定被被釋放,防止死鎖的發生。
示例:ReentrantLock
的基本操作
public class ReentrantLockDemo { public static void main(String[] args) { Task task = new Task(); MyThread tA = new MyThread("Thread-A", task); MyThread tB = new MyThread("Thread-B", task); MyThread tC = new MyThread("Thread-C", task); tA.start(); tB.start(); tC.start(); } static class MyThread extends Thread { private Task task; public MyThread(String name, Task task) { super(name); this.task = task; } @Override public void run() { task.execute(); } } static class Task { private ReentrantLock lock = new ReentrantLock(); public void execute() { lock.lock(); try { for (int i = 0; i < 3; i++) { System.out.println(lock.toString()); // 查詢當前線程 hold 住此鎖的次數 System.out.println("\t holdCount: " + lock.getHoldCount()); // 查詢正等待獲取此鎖的線程數 System.out.println("\t queuedLength: " + lock.getQueueLength()); // 是否為公平鎖 System.out.println("\t isFair: " + lock.isFair()); // 是否被鎖住 System.out.println("\t isLocked: " + lock.isLocked()); // 是否被當前線程持有鎖 System.out.println("\t isHeldByCurrentThread: " + lock.isHeldByCurrentThread()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } finally { lock.unlock(); } } } }
輸出結果:
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-A] holdCount: 1 queuedLength: 2 isFair: false isLocked: true isHeldByCurrentThread: true java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-C] holdCount: 1 queuedLength: 1 isFair: false isLocked: true isHeldByCurrentThread: true // ...
tryLock 方法
與無條件獲取鎖相比,tryLock 有更完善的容錯機制。
tryLock()
- 可輪詢獲取鎖。如果成功,則返回 true;如果失敗,則返回 false。也就是說,這個方法無論成敗都會立即返回,獲取不到鎖(鎖已被其他線程獲取)時不會一直等待。tryLock(long, TimeUnit)
- 可定時獲取鎖。和tryLock()
類似,區別僅在於這個方法在獲取不到鎖時會等待一定的時間,在時間期限之內如果還獲取不到鎖,就返回 false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則返回 true。
示例:ReentrantLock
的 tryLock()
操作
修改上個示例中的 execute()
方法
public void execute() { if (lock.tryLock()) { try { for (int i = 0; i < 3; i++) { // 略... } } finally { lock.unlock(); } } else { System.out.println(Thread.currentThread().getName() + " 獲取鎖失敗"); } }
示例:ReentrantLock
的 tryLock(long, TimeUnit)
操作
修改上個示例中的 execute()
方法
public void execute() { try { if (lock.tryLock(2, TimeUnit.SECONDS)) { try { for (int i = 0; i < 3; i++) { // 略... } } finally { lock.unlock(); } } else { System.out.println(Thread.currentThread().getName() + " 獲取鎖失敗"); } } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " 獲取鎖超時"); e.printStackTrace(); } }
lockInterruptibly 方法
lockInterruptibly()
- 可中斷獲取鎖。可中斷獲取鎖可以在獲得鎖的同時保持對中斷的響應。可中斷獲取鎖比其它獲取鎖的方式稍微復雜一些,需要兩個try-catch
塊(如果在獲取鎖的操作中拋出了InterruptedException
,那么可以使用標准的try-finally
加鎖模式)。- 舉例來說:假設有兩個線程同時通過
lock.lockInterruptibly()
獲取某個鎖時,若線程 A 獲取到了鎖,則線程 B 只能等待。若此時對線程 B 調用threadB.interrupt()
方法能夠中斷線程 B 的等待過程。由於lockInterruptibly()
的聲明中拋出了異常,所以lock.lockInterruptibly()
必須放在try
塊中或者在調用lockInterruptibly()
的方法外聲明拋出InterruptedException
。
- 舉例來說:假設有兩個線程同時通過
🔔 注意:當一個線程獲取了鎖之后,是不會被
interrupt()
方法中斷的。單獨調用interrupt()
方法不能中斷正在運行狀態中的線程,只能中斷阻塞狀態中的線程。因此當通過lockInterruptibly()
方法獲取某個鎖時,如果未獲取到鎖,只有在等待的狀態下,才可以響應中斷。
示例:ReentrantLock
的 lockInterruptibly()
操作
修改上個示例中的 execute()
方法
public void execute() { try { lock.lockInterruptibly(); for (int i = 0; i < 3; i++) { // 略... } } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + "被中斷"); e.printStackTrace(); } finally { lock.unlock(); } }
newCondition 方法
newCondition()
- 返回一個綁定到 Lock
對象上的 Condition
實例。Condition
的特性和具體方法請閱讀下文 Condition
。
ReentrantLock 的原理
ReentrantLock 的數據結構
閱讀 ReentrantLock
的源碼,可以發現它有一個核心字段:
private final Sync sync;
sync
- 內部抽象類ReentrantLock.Sync
對象,Sync
繼承自 AQS。它有兩個子類:ReentrantLock.FairSync
- 公平鎖。ReentrantLock.NonfairSync
- 非公平鎖。
查看源碼可以發現,ReentrantLock
實現 Lock
接口其實是調用 ReentrantLock.FairSync
或 ReentrantLock.NonfairSync
中各自的實現,這里不一一列舉。
ReentrantLock 的獲取鎖和釋放鎖
ReentrantLock 獲取鎖和釋放鎖的接口,從表象看,是調用 ReentrantLock.FairSync
或 ReentrantLock.NonfairSync
中各自的實現;從本質上看,是基於 AQS 的實現。
仔細閱讀源碼很容易發現:
void lock()
調用 Sync 的 lock() 方法。-
void lockInterruptibly()
直接調用 AQS 的 獲取可中斷的獨占鎖 方法lockInterruptibly()
。 boolean tryLock()
調用 Sync 的nonfairTryAcquire()
。boolean tryLock(long time, TimeUnit unit)
直接調用 AQS 的 獲取超時等待式的獨占鎖 方法tryAcquireNanos(int arg, long nanosTimeout)
。-
void unlock()
直接調用 AQS 的 釋放獨占鎖 方法release(int arg)
。
直接調用 AQS 接口的方法就不再贅述了,其原理在 AQS 的原理 中已經用很大篇幅進行過講解。
nonfairTryAcquire
方法源碼如下:
// 公平鎖和非公平鎖都會用這個方法區嘗試獲取鎖 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { // 如果同步狀態為0,將其設為 acquires,並設置當前線程為排它線程 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
處理流程很簡單:
- 如果同步狀態為 0,設置同步狀態設為 acquires,並設置當前線程為排它線程,然后返回 true,獲取鎖成功。
- 如果同步狀態不為 0 且當前線程為排它線程,設置同步狀態為當前狀態值+acquires 值,然后返回 true,獲取鎖成功。
- 否則,返回 false,獲取鎖失敗。
lock 方法在公平鎖和非公平鎖中的實現:
二者的區別僅在於申請非公平鎖時,如果同步狀態為 0,嘗試將其設為 1,如果成功,直接將當前線程置為排它線程;否則和公平鎖一樣,調用 AQS 獲取獨占鎖方法 acquire
。
// 非公平鎖實現 final void lock() { if (compareAndSetState(0, 1)) // 如果同步狀態為0,將其設為1,並設置當前線程為排它線程 setExclusiveOwnerThread(Thread.currentThread()); else // 調用 AQS 獲取獨占鎖方法 acquire acquire(1); } // 公平鎖實現 final void lock() { // 調用 AQS 獲取獨占鎖方法 acquire acquire(1); }
四、ReentrantReadWriteLock
ReentrantReadWriteLock
類是ReadWriteLock
接口的具體實現,它是一個可重入的讀寫鎖。ReentrantReadWriteLock
維護了一對讀寫鎖,將讀寫鎖分開,有利於提高並發效率。
ReentrantLock
實現了一種標准的互斥鎖:每次最多只有一個線程能持有ReentrantLock
。但對於維護數據的完整性來說,互斥通常是一種過於強硬的加鎖策略,因此也就不必要地限制了並發性。大多數場景下,讀操作比寫操作頻繁,只要保證每個線程都能讀取到最新數據,並且在讀數據時不會有其它線程在修改數據,那么就不會出現線程安全問題。這種策略減少了互斥同步,自然也提升了並發性能,ReentrantReadWriteLock
就是這種策略的具體實現。
ReentrantReadWriteLock 的特性
ReentrantReadWriteLock 的特性如下:
ReentrantReadWriteLock
適用於讀多寫少的場景。如果是寫多讀少的場景,由於ReentrantReadWriteLock
其內部實現比ReentrantLock
復雜,性能可能反而要差一些。如果存在這樣的問題,需要具體問題具體分析。由於ReentrantReadWriteLock
的讀寫鎖(ReadLock
、WriteLock
)都實現了Lock
接口,所以要替換為ReentrantLock
也較為容易。ReentrantReadWriteLock
實現了ReadWriteLock
接口,支持了ReentrantLock
所不具備的讀寫鎖分離。ReentrantReadWriteLock
維護了一對讀寫鎖(ReadLock
、WriteLock
)。將讀寫鎖分開,有利於提高並發效率。ReentrantReadWriteLock
的加鎖策略是:允許多個讀操作並發執行,但每次只允許一個寫操作。ReentrantReadWriteLock
為讀寫鎖都提供了可重入的加鎖語義。ReentrantReadWriteLock
支持公平鎖和非公平鎖(默認)兩種模式。
ReadWriteLock
接口定義如下:
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }
readLock
- 返回用於讀操作的鎖(ReadLock
)。writeLock
- 返回用於寫操作的鎖(WriteLock
)。
在讀寫鎖和寫入鎖之間的交互可以采用多種實現方式,ReadWriteLock
的一些可選實現包括:
- 釋放優先 - 當一個寫入操作釋放寫鎖,並且隊列中同時存在讀線程和寫線程,那么應該優先選擇讀線程、寫線程,還是最先發出請求的線程?
- 讀線程插隊 - 如果鎖是由讀線程持有,但有寫線程正在等待,那么新到達的讀線程能否立即獲得訪問權,還是應該在寫線程后面等待?如果允許讀線程插隊到寫線程之前,那么將提高並發性,但可能造成線程飢餓問題。
- 重入性 - 讀鎖和寫鎖是否是可重入的?
- 降級 - 如果一個線程持有寫入鎖,那么它能否在不釋放該鎖的情況下獲得讀鎖?這可能會使得寫鎖被降級為讀鎖,同時不允許其他寫線程修改被保護的資源。
- 升級 - 讀鎖能否優先於其他正在等待的讀線程和寫線程而升級為一個寫鎖?在大多數的讀寫鎖實現中並不支持升級,因為如果沒有顯式的升級操作,那么很容易造成死鎖。
ReentrantReadWriteLock 的用法
前文了解了 ReentrantReadWriteLock
的特性,接下來,我們要講述其具體用法。
ReentrantReadWriteLock 的構造方法
ReentrantReadWriteLock
和 ReentrantLock
一樣,也有兩個構造方法,且用法相似。
public ReentrantReadWriteLock() {} public ReentrantReadWriteLock(boolean fair) {}
ReentrantReadWriteLock()
- 默認構造方法會初始化一個非公平鎖(NonfairSync)。在非公平的鎖中,線程獲得鎖的順序是不確定的。寫線程降級為讀線程是可以的,但讀線程升級為寫線程是不可以的(這樣會導致死鎖)。ReentrantReadWriteLock(boolean)
-new ReentrantLock(true)
會初始化一個公平鎖(FairSync)。對於公平鎖,等待時間最長的線程將優先獲得鎖。如果這個鎖是讀線程持有,則另一個線程請求寫鎖,那么其他讀線程都不能獲得讀鎖,直到寫線程釋放寫鎖。
ReentrantReadWriteLock 的使用實例
在 ReentrantReadWriteLock
的特性 中已經介紹過,ReentrantReadWriteLock
的讀寫鎖(ReadLock
、WriteLock
)都實現了 Lock
接口,所以其各自獨立的使用方式與 ReentrantLock
一樣,這里不再贅述。
ReentrantReadWriteLock
與 ReentrantLock
用法上的差異,主要在於讀寫鎖的配合使用。本文以一個典型使用場景來進行講解。
示例:基於 ReentrantReadWriteLock
實現一個簡單的本地緩存
/** * 簡單的無界緩存實現 * <p> * 使用 WeakHashMap 存儲鍵值對。WeakHashMap 中存儲的對象是弱引用,JVM GC 時會自動清除沒有被引用的弱引用對象。 */ static class UnboundedCache<K, V> { private final Map<K, V> cacheMap = new WeakHashMap<>(); private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(); public V get(K key) { cacheLock.readLock().lock(); V value; try { value = cacheMap.get(key); String log = String.format("%s 讀數據 %s:%s", Thread.currentThread().getName(), key, value); System.out.println(log); } finally { cacheLock.readLock().unlock(); } return value; } public V put(K key, V value) { cacheLock.writeLock().lock(); try { cacheMap.put(key, value); String log = String.format("%s 寫入數據 %s:%s", Thread.currentThread().getName(), key, value); System.out.println(log); } finally { cacheLock.writeLock().unlock(); } return value; } public V remove(K key) { cacheLock.writeLock().lock(); try { return cacheMap.remove(key); } finally { cacheLock.writeLock().unlock(); } } public void clear() { cacheLock.writeLock().lock(); try { this.cacheMap.clear(); } finally { cacheLock.writeLock().unlock(); } } }
說明:
- 使用
WeakHashMap
而不是HashMap
來存儲鍵值對。WeakHashMap
中存儲的對象是弱引用,JVM GC 時會自動清除沒有被引用的弱引用對象。 - 向
Map
寫數據前加寫鎖,寫完后,釋放寫鎖。 - 向
Map
讀數據前加讀鎖,讀完后,釋放讀鎖。
測試其線程安全性:
/** * @author <a href="mailto:forbreak@163.com">Zhang Peng</a> * @since 2020-01-01 */ public class ReentrantReadWriteLockDemo { static UnboundedCache<Integer, Integer> cache = new UnboundedCache<>(); public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 20; i++) { executorService.execute(new MyThread()); cache.get(0); } executorService.shutdown(); } /** 線程任務每次向緩存中寫入 3 個隨機值,key 固定 */ static class MyThread implements Runnable { @Override public void run() { Random random = new Random(); for (int i = 0; i < 3; i++) { cache.put(i, random.nextInt(100)); } } } }
說明:示例中,通過線程池啟動 20 個並發任務。任務每次向緩存中寫入 3 個隨機值,key 固定;然后主線程每次固定讀取緩存中第一個 key 的值。
輸出結果:
main 讀數據 0:null pool-1-thread-1 寫入數據 0:16 pool-1-thread-1 寫入數據 1:58 pool-1-thread-1 寫入數據 2:50 main 讀數據 0:16 pool-1-thread-1 寫入數據 0:85 pool-1-thread-1 寫入數據 1:76 pool-1-thread-1 寫入數據 2:46 pool-1-thread-2 寫入數據 0:21 pool-1-thread-2 寫入數據 1:41 pool-1-thread-2 寫入數據 2:63 main 讀數據 0:21 main 讀數據 0:21 // ...
ReentrantReadWriteLock 的原理
前面了解了 ReentrantLock
的原理,理解 ReentrantReadWriteLock
就容易多了。
ReentrantReadWriteLock 的數據結構
閱讀 ReentrantReadWriteLock 的源碼,可以發現它有三個核心字段:
/** Inner class providing readlock */ private final ReentrantReadWriteLock.ReadLock readerLock; /** Inner class providing writelock */ private final ReentrantReadWriteLock.WriteLock writerLock; /** Performs all synchronization mechanics */ final Sync sync; public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
sync
- 內部類ReentrantReadWriteLock.Sync
對象。與ReentrantLock
類似,它有兩個子類:ReentrantReadWriteLock.FairSync
和ReentrantReadWriteLock.NonfairSync
,分別表示公平鎖和非公平鎖的實現。readerLock
- 內部類ReentrantReadWriteLock.ReadLock
對象,這是一把讀鎖。writerLock
- 內部類ReentrantReadWriteLock.WriteLock
對象,這是一把寫鎖。
ReentrantReadWriteLock 的獲取鎖和釋放鎖
public static class ReadLock implements Lock, java.io.Serializable { // 調用 AQS 獲取共享鎖方法 public void lock() { sync.acquireShared(1); } // 調用 AQS 釋放共享鎖方法 public void unlock() { sync.releaseShared(1); } } public static class WriteLock implements Lock, java.io.Serializable { // 調用 AQS 獲取獨占鎖方法 public void lock() { sync.acquire(1); } // 調用 AQS 釋放獨占鎖方法 public void unlock() { sync.release(1); } }
五、Condition
前文中提過 Lock
接口中 有一個 newCondition()
方法用於返回一個綁定到 Lock
對象上的 Condition
實例。Condition
是什么?有什么作用?本節將一一講解。
在單線程中,一段代碼的執行可能依賴於某個狀態,如果不滿足狀態條件,代碼就不會被執行(典型的場景,如:if ... else ...)。在並發環境中,當一個線程判斷某個狀態條件時,其狀態可能是由於其他線程的操作而改變,這時就需要有一定的協調機制來確保在同一時刻,數據只能被一個線程鎖修改,且修改的數據狀態被所有線程所感知。
Java 1.5 之前,主要是利用 Object
類中的 wait
、notify
、notifyAll
配合 synchronized
來進行線程間通信(如果不了解其特性,可以參考:Java 線程基礎 - wait/notify/notifyAll)。
wait
、notify
、notifyAll
需要配合 synchronized
使用,不適用於 Lock
。而使用 Lock
的線程,彼此間通信應該使用 Condition
。這可以理解為,什么樣的鎖配什么樣的鑰匙。內置鎖(synchronized
)配合內置條件隊列(wait
、notify
、notifyAll
),顯式鎖(Lock
)配合顯式條件隊列(Condition
)。
Condition 的特性
Condition
接口定義如下:
public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
其中,await
、signal
、signalAll
與 wait
、notify
、notifyAll
相對應,功能也相似。除此以外,Condition
相比內置條件隊列( wait
、notify
、notifyAll
),提供了更為豐富的功能:
- 每個鎖(
Lock
)上可以存在多個Condition
,這意味着鎖的狀態條件可以有多個。 - 支持公平的或非公平的隊列操作。
- 支持可中斷的條件等待,相關方法:
awaitUninterruptibly()
。 - 支持可定時的等待,相關方法:
awaitNanos(long)
、await(long, TimeUnit)
、awaitUntil(Date)
。
Condition 的用法
這里以 Condition
來實現一個消費者、生產者模式。
🔔 注意:事實上,解決此類問題使用
CountDownLatch
、Semaphore
等工具更為便捷、安全。想了解詳情,可以參考 Java 並發工具類。
產品類
class Message { private final Lock lock = new ReentrantLock(); private final Condition producedMsg = lock.newCondition(); private final Condition consumedMsg = lock.newCondition(); private String message; private boolean state; private boolean end; public void consume() { //lock lock.lock(); try { // no new message wait for new message while (!state) { producedMsg.await(); } System.out.println("consume message : " + message); state = false; // message consumed, notify waiting thread consumedMsg.signal(); } catch (InterruptedException ie) { System.out.println("Thread interrupted - viewMessage"); } finally { lock.unlock(); } } public void produce(String message) { lock.lock(); try { // last message not consumed, wait for it be consumed while (state) { consumedMsg.await(); } System.out.println("produce msg: " + message); this.message = message; state = true; // new message added, notify waiting thread producedMsg.signal(); } catch (InterruptedException ie) { System.out.println("Thread interrupted - publishMessage"); } finally { lock.unlock(); } } public boolean isEnd() { return end; } public void setEnd(boolean end) { this.end = end; } }
消費者
class MessageConsumer implements Runnable { private Message message; public MessageConsumer(Message msg) { message = msg; } @Override public void run() { while (!message.isEnd()) { message.consume(); } } }
生產者
class MessageProducer implements Runnable { private Message message; public MessageProducer(Message msg) { message = msg; } @Override public void run() { produce(); } public void produce() { List<String> msgs = new ArrayList<>(); msgs.add("Begin"); msgs.add("Msg1"); msgs.add("Msg2"); for (String msg : msgs) { message.produce(msg); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } message.produce("End"); message.setEnd(true); } }
測試
public class LockConditionDemo { public static void main(String[] args) { Message msg = new Message(); Thread producer = new Thread(new MessageProducer(msg)); Thread consumer = new Thread(new MessageConsumer(msg)); producer.start(); consumer.start(); } }