ReentrantLock:表示重入鎖,它是唯一一個實現了Lock接口的類。重入鎖指的是 線程在獲得鎖之后,再次獲取該鎖不需要阻塞,而是直接關聯一次計數器增加重入次;
syschronized和reenttrantlock都支持重入鎖;
重入鎖的設計目的
比如調用demo方法獲得了當前的對象鎖,然后在這個方法中再去調用 demo2,demo2中的存在同一個實例鎖,這個時候當前線程會因為無法獲得 demo2的對象鎖而阻塞,就會產生死鎖。重入鎖的設計目的是避免線程的死 鎖。
ReentrantReadWriteLock
我們以前理解的鎖,基本都是排他鎖(互斥鎖),也就是這些鎖在同一時刻只允許一個線程進 行訪問,而讀寫所在同一時刻可以允許多個線程訪問,但是在寫線程訪問時,所有 的讀線程和其他寫線程都會被阻塞。讀寫鎖維護了一對鎖,一個讀鎖、一個寫鎖; 一般情況下,讀寫鎖的性能都會比排它鎖好,因為大多數場景讀是多於寫的。在讀 多於寫的情況下,讀寫鎖能夠提供比排它鎖更好的並發性和吞吐量.
public class LockDemo {
static Map<String,Object> cacheMap=new HashMap<>();
static ReentrantReadWriteLock rwl=new
ReentrantReadWriteLock();
static Lock read=rwl.readLock();
static Lock write=rwl.writeLock();
public static final Object get(String key) {
System.out.println("開始讀取數據");
read.lock(); // 讀鎖
try {
return cacheMap.get(key);
}finally {
read.unlock();
}
}
public static final Object put(String key,Object value){
write.lock();
System.out.println("開始寫數據");
try{
return cacheMap.put(key,value);
}finally {
write.unlock();
}
}
}
在這個案例中,通過hashmap來模擬了一個內存緩存,然后使用讀寫所來保證這 個內存緩存的線程安全性。當執行讀操作的時候,需要獲取讀鎖,在並發訪問的時候,讀鎖不會被阻塞,因為讀操作不會影響執行結果。 在執行寫操作是,線程必須要獲取寫鎖,當已經有線程持有寫鎖的情況下,當前線 程會被阻塞,只有當寫鎖釋放以后,其他讀寫操作才能繼續執行。使用讀寫鎖提升 讀操作的並發性,也保證每次寫操作對所有的讀寫操作的可見性
⚫ 讀鎖與讀鎖可以共享
⚫ 讀鎖與寫鎖不可以共享(排他)
⚫ 寫鎖與寫鎖不可以共享(排他)
ReentrantLock 的實現原理
我們知道鎖的基本原理是,基於將多線程並行任務通過某一種機制實現線程的串 行執行,從而達到線程安全性的目的。在 synchronized 中,我們分析了偏向鎖、 輕量級鎖、樂觀鎖。基於樂觀鎖以及自旋鎖來優化了 synchronized 的加鎖開銷, 同時在重量級鎖階段,通過線程的阻塞以及喚醒來達到線程競爭和同步的目的。 那么在ReentrantLock中,也一定會存在這樣的需要去解決的問題。就是在多線程 競爭重入鎖時,競爭失敗的線程是如何實現阻塞以及被喚醒的呢?
AQS 是什么
在 Lock 中,用到了一個同步隊列 AQS,全稱 AbstractQueuedSynchronizer,它 是一個同步工具也是Lock用來實現線程同步的核心組件。如果你搞懂了AQS,那 么J.U.C中絕大部分的工具都能輕松掌握
AQS 的兩種功能
從使用層面來說,AQS的功能分為兩種:獨占和共享 獨占鎖,
每次只能有一個線程持有鎖,比如前面給大家演示的ReentrantLock就是 以獨占方式實現的互斥鎖
共 享 鎖 , 允 許 多 個 線 程 同 時 獲 取 鎖 , 並 發 訪 問 共 享 資 源 , 比 如 ReentrantReadWriteLock
AQS 的內部實現
AQS 隊列內部維護的是一個 FIFO 的雙向鏈表,這種結構的特點是每個數據結構都有兩個指針,分別指向直接的后繼節點和直接前驅節點。所以雙向鏈表可以從任 意一個節點開始很方便的訪問前驅和后繼。每個 Node 其實是由線程封裝,當線 程爭搶鎖失敗后會封裝成Node加入到ASQ隊列中去;當獲取鎖的線程釋放鎖以 后,會從隊列中喚醒一個阻塞的節點(線程)。
ReentrantLock 的源碼分析
以ReentrantLock作為切入點,來看看在這個場景中是如何使用AQS來實現線程 的同步的
ReentrantLock 的時序圖
調用ReentrantLock中的lock()方法,源碼的調用過程我使用了時序圖來展現。
ReentrantLock.lock()
這個是reentrantLock獲取鎖的入口 public void lock() { sync.lock(); }
sync實際上是一個抽象的靜態內部類,它繼承了AQS來實現重入鎖的邏輯,我們前面說過AQS是一個同步隊列,它能夠實現線程的阻塞以及喚醒,但它並不具備 業務功能,所以在不同的同步場景中,會繼承AQS來實現對應場景的功能 Sync有兩個具體的實現類,
分別是: NofairSync:表示可以存在搶占鎖的功能,也就是說不管當前隊列上是否存在其他 線程等待,新線程都有機會搶占鎖
FailSync:表示所有線程嚴格按照FIFO來獲取鎖
NofairSync.lock(Reentrantlock.lock的具體實現)
以非公平鎖為例,來看看lock中的實現
1. 非公平鎖和公平鎖最大的區別在於,在非公平鎖中我搶占鎖的邏輯是,不管有 沒有線程排隊,我先上來cas去搶占一下
2. CAS成功,就表示成功獲得了鎖
3. CAS失敗,調用acquire(1)走鎖競爭邏輯
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
state 是 AQS 中的一個屬性,它在不同的實現中所表達的含義不一樣,對於重入 鎖的實現來說,表示一個同步狀態。它有兩個含義的表示
1. 當state=0時,表示無鎖狀態
2. 當 state>0 時,表示已經有線程獲得了鎖,也就是 state=1,
但是因為 ReentrantLock允許重入,所以同一個線程多次獲得同步鎖的時候,state會遞增, 比如重入5次,那么state=5。 而在釋放鎖的時候,
同樣需要釋放5次直到state=0 其他線程才有資格獲得鎖 ;
acquire 是 AQS 中的方法,如果 CAS 操作未能成功,說明 state 已經不為 0,此 時繼續acquire(1)操作 ➢ 大家思考一下,acquire方法中的1的參數是用來做什么呢?
這個方法的主要邏輯是
1. 通過tryAcquire嘗試獲取獨占鎖,如果成功返回true,失敗返回false
2. 如果tryAcquire失敗,則會通過addWaiter方法將當前線程封裝成Node添加 到AQS隊列尾部
3. acquireQueued,將Node作為參數,通過自旋去嘗試獲取鎖。
AQS.accquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire 是 AQS 中的方法,如果 CAS 操作未能成功,說明 state 已經不為 0,此 時繼續acquire(1)操作
➢ 大家思考一下,acquire方法中的1的參數是用來做什么呢? 這個方法的主要邏輯是
1. 通過tryAcquire嘗試獲取獨占鎖,如果成功返回true,失敗返回false
2. 如果tryAcquire失敗,則會通過addWaiter方法將當前線程封裝成Node添加 到AQS隊列尾部
3. acquireQueued,將Node作為參數,通過自旋去嘗試獲取鎖。
NonfairSync.tryAcquire
這個方法的作用是嘗試獲取鎖,如果成功返回true,不成功返回false 它是重寫 AQS 類中的 tryAcquire 方法,並且大家仔細看一下 AQS 中 tryAcquire 方法的定義,
並沒有實現,而是拋出異常。按照一般的思維模式,既然是一個不實 現的模版方法,那應該定義成abstract,讓子類來實現呀?大家想想為什么
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
ReentrantLock.nofairTryAcquire
1. 獲取當前線程,判斷當前的鎖的狀態 2. 如果state=0表示當前是無鎖狀態,通過cas更新state狀態的值 3. 當前線程是屬於重入,則增加重入次數
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();//獲取當前執行的線程
int c = getState();//獲取state的值 cas就是更新state的值
if (c == 0) {//表示無鎖狀態
if (compareAndSetState(0, acquires)) {//cas替換state的值,cas成功表示獲取鎖成功
setExclusiveOwnerThread(current);//保存當前獲得鎖的線 程,下次再來的時候不要再嘗試競爭鎖
return true;
}
}
else if (current == getExclusiveOwnerThread()) {{//如果同一個線程來獲得鎖,直接增加重入次數
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
AQS.addWaiter
當 tryAcquire 方法獲取鎖失敗以后,則會先調用 addWaiter 將當前線程封裝成 Node. 入參 mode 表示當前節點的狀態,傳遞的參數是 Node.EXCLUSIVE,表示獨占狀 態。意味着重入鎖用到了AQS的獨占鎖功能
1. 將當前線程封裝成Node
2. 當前鏈表中的 tail 節點是否為空,如果不為空,則通過 cas 操作把當前線程的 node添加到AQS隊列
3. 如果為空或者cas失敗,調用enq將節點添加到AQS隊列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//將當前線程封裝為Node
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;////tail 是 AQS 中表示同比隊列隊尾的屬性,默認 是 null
if (pred != null) {//tail 不為空的情況下,說明隊列中存在節點
node.prev = pred;//把當前線程的 Node 的 prev 指向 tail
if (compareAndSetTail(pred, node)) {//通過 cas 把 node 加入到 AQS 隊列,也就是設置為 tail
pred.next = node;//設置成功以后,把原 tail 節點的 next 指向當前 node
return node;
}
}
enq(node);//tail=null,把 node 添加到同步隊列
return node;
}
enq
enq就是通過自旋操作把當前節點加入到隊列中
private Node enq(final Node node) {
for (;;) {通過自旋
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
圖解分析
假設3個線程來爭搶鎖,那么截止到enq方法運行結束之后,或者調用addwaiter 方法結束后,AQS中的鏈表結構圖
AQS.acquireQueued
通過 addWaiter 方法把線程添加到鏈表后,會接着把 Node 作為參數傳遞給 acquireQueued方法,去競爭鎖
1. 獲取當前節點的prev節點
2. 如果prev節點為head節點,那么它就有資格去爭搶鎖,調用tryAcquire搶占 鎖
3. 搶占鎖成功以后,把獲得鎖的節點設置為 head,並且移除原來的初始化 head 節點
4. 如果獲得鎖失敗,則根據waitStatus決定是否需要掛起線程
5. 最后,通過cancelAcquire取消獲得鎖的操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//獲 取當前節點的 prev 節點
if (p == head && tryAcquire(arg)) {//如 果是 head 節點,說明有資格去爭搶鎖
setHead(node);//獲取鎖成功,也就是 ThreadA 已經釋放了鎖,然后設置 head 為 ThreadB 獲得執行權 限
p.next = null; // // 把原 head節點從鏈表中移除
failed = false;
return interrupted;
}
//ThreadA 可能還沒釋放鎖,使得 ThreadB 在執 行 tryAcquire 時會返回 false
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
NofairSync.tryAcquire
這個方法在前面分析過,就是通過state的狀態來判斷是否處於無鎖狀態,然后在 通過cas進行競爭鎖操作。成功表示獲得鎖,失敗表示獲得鎖失敗
shouldParkAfterFailedAcquire
如果ThreadA的鎖還沒有釋放的情況下,ThreadB和ThreadC來爭搶鎖肯定是會 失敗,那么失敗以后會調用shouldParkAfterFailedAcquire方法 Node 有 5 中狀態,
分別是:CANCELLED(1),SIGNAL(-1) 、CONDITION(2)、PROPAGATE(-3)、默認狀態(0) ;這個方法的主要作用是,通過 Node 的狀態來判斷,ThreadA 競爭鎖失敗以后是 否應該被掛起。
1. 如果ThreadA的pred節點狀態為SIGNAL,那就表示可以放心掛起當前線程
2. 通過循環掃描鏈表把CANCELLED狀態的節點移除
3. 修改pred節點的狀態為SIGNAL,返回false. 返回false時,也就是不需要掛起,返回true,則需要調用parkAndCheckInterrupt 掛起當前線程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//前置節點的 waitStatus
if (ws == Node.SIGNAL)//如果前置節點為 SIGNAL,意 味着只需要等待其他前置節點的線程被釋放
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;//返回 true,意味着可以直接放心的掛 起了
if (ws > 0) {//ws 大於 0,意味着 prev 節點取消了排 隊,直接移除這個節點就行
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;//相當於: pred=pred.prev; node.prev=pred;
} while (pred.waitStatus > 0); //這里采用循 環,從雙向列表中移除 CANCELLED 的節點
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//利用 cas 設置 prev 節點的狀態為 SIGNAL(1)
}
return false;
}
parkAndCheckInterrupt
使用LockSupport.park掛起當前線程編程WATING狀態
使用LockSupport.park掛起當前線程編程WATING狀態 Thread.interrupted,返回當前線程是否被其他線程觸發過中斷請求,
也就是 thread.interrupt(); 如果有觸發過中斷請求,那么這個方法會返回當前的中斷標識 true,
並且對中斷標識進行復位標識已經響應過了中斷請求。如果返回true,意味 着在acquire方法中會執行selfInterrupt()。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
通過acquireQueued方法來競爭鎖,如果ThreadA還在執行中沒有釋放鎖的話, 意味着ThreadB和ThreadC只能掛起了。
鎖的釋放流程
如果這個時候ThreadA釋放鎖了,那么我們來看鎖被釋放后會產生什么效果
ReentrantLock.unlock
在unlock中,會調用release方法來釋放鎖
public final boolean release(int arg) {
if (tryRelease(arg)) {//釋放鎖成功
Node h = head;//得到 aqs 中 head 節點
if (h != null && h.waitStatus != 0)//如果 head 節點不 為空並且狀態!=0.調用 unparkSuccessor(h)喚醒后續節點
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantLock.tryRelease
這個方法可以認為是一個設置鎖狀態的操作,通過將state狀態減掉傳入的參數值 (參數是1),如果結果狀態為0,就將排它鎖的Owner設置為null,
以使得其它的線程有機會進行執行。 在排它鎖中,加鎖的時候狀態會增加 1(當然可以自己修改這個值),在解鎖的時 候減掉1,同一個鎖,在可以重入后,
可能會被疊加為2、 3、 4這些值,
只有unlock() 的次數與 lock()的次數對應才會將 Owner 線程設置為空,而且也只有這種情況下 才會返回true。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;//獲得 head 節點的狀態
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);// 設置 head 節點 狀態為 0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {//如果下一個節點為 null 或者 status>0 表示 cancelled 狀態. //通過從尾部節點開始掃描,找到距離 head 最近的一個 waitStatus<=0 的節點
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); //next 節點不為空,直接喚醒這個線程即可
}
原本掛起的線程繼續執行
通過ReentrantLock.unlock,原本掛起的線程被喚醒以后繼續執行,應該從哪里執 行大家還有印象吧。 原來被掛起的線程是在 acquireQueued 方法中,所以被喚 醒以后繼續從這個方法開始執行
AQS.acquireQueued
這個方法前面已經完整分析過了,我們只關注一下 ThreadB 被喚醒以后的執行流 程。 由於ThreadB的prev節點指向的是head,並且ThreadA已經釋放了鎖。所以這 個時候調用tryAcquire方法時,可以順利獲取到鎖
1. 把ThreadB節點當成head 2. 把原head節點的next節點指向為null
公平鎖和非公平鎖的區別
鎖的公平性是相對於獲取鎖的順序而言的,如果是一個公平鎖,那么鎖的獲取順序 就應該符合請求的絕對時間順序,也就是 FIFO。 在上面分析的例子來說,只要 CAS 設置同步狀態成功,則表示當前線程獲取了鎖,而公平鎖則不一樣,差異點 有兩個
FairSync.tryAcquire
final void lock() {
acquire(1);
} 非公平鎖在獲取鎖的時候,會先通過CAS進行搶占,而公平鎖則不會
FairSync.tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
} 這個方法與 nonfair.TryAcquire(int acquires)比較,不同的地方在於判斷條件多了 hasQueuedPredecessors()方法,也就是加入了[同步隊列中當前節點是否有前驅節點]的判斷,如果該方法返回true,則表示有線程比當前線程更早地請求獲取鎖, 因此需要等待前驅線程獲取並釋放鎖之后才能繼續獲取鎖。
這個方法的主要作用是,通過 Node 的狀態來判斷,ThreadA 競爭鎖失敗以后是 否應該被掛起。 1. 如果ThreadA的pred節點狀態為SIGNAL,那就表示可以放心掛起當前線程 2. 通過循環掃描鏈表把CANCELLED狀態的節點移除 3. 修改pred節點的狀態為SIGNAL,返回false. 返回false時,也就是不需要掛起,返回true,則需要調用parkAndCheckInterrupt 掛起當前線程