概述
並發編程中,ReentrantLock
的使用是比較多的,包括之前講的LinkedBlockingQueue
和ArrayBlockQueue
的內部都是使用的ReentrantLock
,談到它又不能的不說AQS,AQS的全稱是AbstractQueuedSynchronizer
,這個類也是在java.util.concurrent.locks
下面,提供了一個FIFO的隊列,可以用於構建鎖的基礎框架,內部通過原子變量state
來表示鎖的狀態,當state
大於0的時候表示鎖被占用,如果state等於0時表示沒有占用鎖,ReentrantLock
是一個重入鎖,表現在state
上,如果持有鎖的線程重復獲取鎖時,它會將state
狀態進行遞增,也就是獲得一個信號量,當釋放鎖時,同時也是釋放了信號量,信號量跟隨減少,如果上一個線程還沒有完成任務,則會進行入隊等待操作。
本文分析內容主要是針對jdk1.8版本
約束:文中圖片的ref-xxx代表引用地址
圖片中的內容prve更正為prev,由於文章不是一天寫的所以有些圖片更正了有些沒有。
AQS主要字段
/**
* 頭節點指針,通過setHead進行修改
*/
private transient volatile Node head;
/**
* 隊列的尾指針
*/
private transient volatile Node tail;
/**
* 同步器狀態
*/
private volatile int state;
AQS需要子類實現的方法
AQS是提供了並發的框架,它內部提供一種機制,它是基於模板方法的實現,整個類中沒有任何一個abstract的抽象方法,取而代之的是,需要子類去實現的那些方法通過一個方法體拋出UnsupportedOperationException異常來讓子類知道,告知如果沒有實現模板的方法,則直接拋出異常。
方法名 | 方法描述 |
---|---|
tryAcquire | 以獨占模式嘗試獲取鎖,獨占模式下調用acquire,嘗試去設置state的值,如果設置成功則返回,如果設置失敗則將當前線程加入到等待隊列,直到其他線程喚醒 |
tryRelease | 嘗試獨占模式下釋放狀態 |
tryAcquireShared | 嘗試在共享模式獲得鎖,共享模式下調用acquire,嘗試去設置state的值,如果設置成功則返回,如果設置失敗則將當前線程加入到等待隊列,直到其他線程喚醒 |
tryReleaseShared | 嘗試共享模式下釋放狀態 |
isHeldExclusively | 是否是獨占模式,表示是否被當前線程占用 |
AQS是基於FIFO隊列實現的,那么隊列的Node節點又是存放的什么呢?
Node字段信息
字段名 | 類型 | 默認值 | 描述 |
---|---|---|---|
SHARED | Node | new Node() | 一個標識,指示節點使用共享模式等待 |
EXCLUSIVE | Nodel | Null | 一個標識,指示節點使用獨占模式等待 |
CANCELLED |
int | 1 | 節點因超時或被中斷而取消時設置狀態為取消狀態 |
SIGNAL |
int | -1 | 當前節點的后節點被park,當前節點釋放時,必須調用unpark通知后面節點,當后面節點競爭時,會將前面節點更新為SIGNAL |
CONDITION |
int | -2 | 標識當前節點已經處於等待中,通過條件進行等待的狀態 |
PROPAGATE |
int | -3 | 共享模式下釋放節點時設置的狀態,被標記為當前狀態是表示無限傳播下去 |
0 |
int | 不屬於上面的任何一種狀態 | |
waitStatus | int | 0 | 等待狀態,默認初始化為0,表示正常同步等待, |
pre | Node | Null | 隊列中上一個節點 |
next | Node | Null | 隊列中下一個節點 |
thread | Thread | Null | 當前Node操作的線程 |
nextWaiter | Node | Null | 指向下一個處於阻塞的節點 |
通過上面的內容我們可以看到waitStatus其實是有5個狀態的,雖然這里面0並不是什么字段,但是他是waitStatus狀態的一種,表示不是任何一種類型的字段,上面也講解了關於AQS中子類實現的方法,AQS提供了獨占模式和共享模式兩種,但是ReentrantLock
實現的是獨占模式的方式,下面來通過源碼的方式解析ReentrantLock
。
ReentrantLock源碼分析
首先在源碼分析之前我們先來看一下ReentrantLock的類的繼承關系,如下圖所示:
可以看到ReentrantLock
繼承自Lock
接口,它提供了一些獲取鎖和釋放鎖的方法,以及條件判斷的獲取的方法,通過實現它來進行鎖的控制,它是顯示鎖,需要顯示指定起始位置和終止位置,Lock
接口的方法介紹:
方法名稱 | 方法描述 |
---|---|
lock | 用來獲取鎖,如果鎖已被其他線程獲取,則進行等待。 |
tryLock | 表示用來嘗試獲取鎖,如果獲取成功,則返回true,如果獲取失敗(即鎖已被其他線程獲取),則返回false,也就說這個方法無論如何都會立即返回。在拿不到鎖時不會一直在那等待 |
tryLock(long time, TimeUnit unit) | 和tryLock()類似,區別在於它在拿不到鎖時會等待一定的時間,在時間期限之內如果還拿不到鎖,就返回false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則返回true |
lockInterruptibly | 獲取鎖,如果獲取鎖失敗則進行等到,如果等待的線程被中斷會相應中斷信息。 |
unlock | 釋放鎖的操作 |
newCondition | 獲取Condition對象,該組件和當前的鎖綁定,當前線程只有獲得了鎖,才能調用該組件wait()方法,而調用后,當前線程釋放鎖。 |
ReentrantLock也實現了上面接口的內容,前面講解了很多理論行的內容,接下來我們以一個簡單的例子來進行探討
public class ReentrantLockDemo {
public static void main(String[] args) throws Exception {
AddDemo runnalbeDemo = new AddDemo();
Thread thread = new Thread(runnalbeDemo::add);
thread.start();
Thread thread1 = new Thread(runnalbeDemo::add);
thread1.start();
Thread.sleep(1000);
System.out.println(runnalbeDemo.getCount());
}
private static class AddDemo {
private final AtomicInteger count = new AtomicInteger();
private final ReentrantLock reentrantLock = new ReentrantLock();
private void add() {
try {
reentrantLock.lock();
count.getAndIncrement();
} finally {
// reentrantLock.unlock();
}
}
int getCount() {
return count.get();
}
}
}
- 首先聲明內部類AddDemo,AddDemo的主要作用是將原子變量count進行遞增的操作
- AddDemo內部聲明了ReentrantLock對象進行同步操作
- AddDemo的add方法,進行遞增操作,細心地同學發現,使用了lock方法獲取鎖,但是沒有釋放鎖,這里面沒有釋放鎖可以更讓我們清晰的分析內部結構的變化。
- 主線程開啟了兩個線程進行同步進行遞增的操作,最后讓線程休眠一會輸出累加的最后結果。
ReentrantLock
內部提供了兩種AQS的實現,一種公平模式,一種是非公平模式,如果沒有特別指定在構造器中,默認是非公平的模式,我們可以看一下無參的構造函數。
public ReentrantLock() {
sync = new NonfairSync();
}
當調用有參構造函數時,指定使用哪種模式來進行操作,參數為布爾類型,如果指定為false的話代表非公平模式,如果指定為true的話代表的是公平模式,如下所示:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
我們使用的是非公平模式,后面再來進行分析公平模式,上面也講到了分為兩種模式,這兩種模式為FairSync
和NonfairSync
兩個內部靜態類不可變類,不能被繼承和實例化,這兩個類是我們今天分析的重點,為什么說是重點呢,這里講的內容是有關於AQS的,而FairSync
和NonfairSync
實現了抽象內部類Sync
,Sync
實現了AbstractQueuedSynchronizer
這個類,這個類就是我們說的AQS也是主要同步操作的類,下面我們來看一下公平模式和非公平模式下類的繼承關系,如下圖所示:
非公平模式:
公平模式:
通過上面兩個繼承關系UML來看其實無差別,差別在於內部實現的原理不一樣,回到上面例子中使用的是非公平模式,那先以非公平模式來進行分析,
假設第一個線程啟動調用AddDemo的add方法時,首先執行的事reentrantLock.lock()
方法,這個lock方法調用了sync.lock()
,sync就是我們上面提到的兩種模式的對象,來看一下源碼內容:
public void lock() {
sync.lock();
}
內部調用了sync.lock()
,其實是調用了NonfairSync
對象的lock
方法,也就是下面的方法內容。
/**
* 非公平模式鎖
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 執行鎖動作,先進行修改狀態,如果鎖被占用則進行請求申請鎖,申請鎖失敗則將線程放到隊列中
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 繼承自AQS的tryAcquire方法,嘗試獲取鎖操作,這個方法會被AQS的acquire調用
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
我們看到lock
方法首先先對state
狀態進行修改操作,如果鎖沒有被占用則獲取鎖,並設置當前線程獨占鎖資源,如果嘗試獲取鎖失敗了,則進行acqurie
方法的調用,例子中第一個線程當嘗試獲取鎖是內部state
狀態為0
,進行修改操作的時候,發現鎖並沒有被占用,則獲得鎖,此時我們來看一下內部變化的情況,如下圖所示:
此時只是將state
的狀態更新為1
,表示鎖已經被占用了,獨占鎖資源的線程是Thread0
,也就是exclusiveOwnerThread
的內容,頭節點和尾節點都沒有被初始化,當第二個線程嘗試去獲取鎖的時候,發現鎖已經被占用了,因為上一個線程並沒有釋放鎖,所以第二線程直接獲取鎖時獲取失敗則進入到acquire
方法中,這個方法是AbstractQueuedSynchronizer
中的方法acquire
,先來看一下具體的實現源碼如下所示:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我個人理解acquire
方法不間斷的嘗試獲取鎖,如果鎖沒有獲取到則現將節點加入到隊列中,並將當前線程設置為獨占鎖資源,也就是獨占了鎖的意思,別的線程不能擁有鎖,然后如果當前節點的前節點是頭節點話,再去嘗試爭搶鎖,則設置當前節點為頭節點,並將原頭節點的下一個節點設置為null,幫助GC回收它,如果不是頭節點或爭搶鎖不成功,則會現將前面節點的狀態設置直到設置為SIGNAL
為止,代表下面有節點被等待了等待上一個線程發來的信號,然后就掛起當前線程。
我們接下來慢慢一步一步的分析,我們先來看一下NonfairSync
中的tryAcquire
,如下所示:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
它調用的是他的父類方法,也就是ReentrantLock
下Sync
中的nonfairTryAcquire
方法,這個方法主要就是去申請鎖的操作,來看一下具體源碼:
final boolean nonfairTryAcquire(int acquires) { //首先是一個被final修飾的方法
final Thread current = Thread.currentThread(); //獲取當前線程
int c = getState(); //獲取state的狀態值
if (c == 0) { //如果狀態等於0代表線程沒有被占用
if (compareAndSetState(0, acquires)) { //cas修改state值
setExclusiveOwnerThread(current); //設置當前線程為獨占模式
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//如果state狀態不等於0則先判斷是否是當前線程占用鎖,如果是則進行下面的流程。
int nextc = c + acquires; //這個地方就說明重入鎖的原理,如果擁有鎖的是當前線程,則每次獲取鎖state值都會跟隨遞增
if (nextc < 0) // overflow //溢出了
throw new Error("Maximum lock count exceeded");
setState(nextc); //直接設置state值就可以不需要CAS
return true;
}
return false; //都不是就返回false
}
通過源碼我們可以看到其實他是有三種操作邏輯:
- 如果
state
為0,則代表鎖沒有被占用,嘗試去修改state狀態,並且將當前線程設置為獨占鎖資源,表示獲得鎖成功 - 如果
state
大於0並且擁有鎖的線程和當前申請鎖的線程一致,則代表重入了鎖,state
值會進行遞增,表示獲得鎖成功 - 如果
state
大於0並且擁有鎖的線程和當前申請鎖的線程不一致則直接返回false,代表申請鎖失敗
當第二個線程去爭搶鎖的時候,state值已經設置為1了也就是已經被第一個線程占用了鎖,所以這里它會返回false,而通過acquire
方法內容可以看到if語句中是!tryAcquire(arg)
,也就是!false=ture
,它會進行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法,這個方法里面又有一個addWaiter
方法,從方法語義上能看到是添加等待隊列的操作,方法的參數代表的是模式,Node.EXCLUSIVE
表示的是在獨占模式下等待,我們先來看一下addWaiter
里面是如何進行操作,如下所示:
private Node addWaiter(Node mode) {
//首先生成當前線程擁有的節點
Node node = new Node(Thread.currentThread(), mode);
// 下面的內容是嘗試快速進行插入末尾的操作,在沒有其他線程同時操作的情況
Node pred = tail; //獲取尾節點
if (pred != null) { //尾節點不為空,代表隊列不為空
node.prev = pred; //尾節點設置為當前節點的前節點
if (compareAndSetTail(pred, node)) {//修改尾節點為當前節點
pred.next = node; //原尾節點的下一個節點設置為當前節點
return node; //返回node節點
}
}
enq(node); //如果前面入隊失敗,這里進行循環入隊操作,直到入隊成功
return node;
}
前面代碼中可以看到,它有一個快速入隊的操作,如果快速入隊失敗則進行死循環進行入隊操作,當然我們上面例子中發現隊列其實是為空的,也就是pred==null,不能進行快速入隊操作,則進入到enq
進行入隊操作,下面看一下enq
方法實現,如下所示:
private Node enq(final Node node) {
for (;;) { //死循環進行入隊操作,直到入隊成功
Node t = tail; //獲取尾節點
if (t == null) { // Must initialize //判斷尾節點為空,則必須先進行初始化
if (compareAndSetHead(new Node()))//生成一個Node,並將當前Node作為頭節點
tail = head; //head和tail同時指向上面Node節點
} else {
node.prev = t; //設置入隊的當前節點的前節點設置為尾節點
if (compareAndSetTail(t, node)) { //將當前節點設置為尾節點
t.next = node; //修改原有尾節點的下一個節點為當前節點
return t; //返回最新的節點
}
}
}
}
通過上面入隊操作,可以清晰的了解入隊操作其實就是Node節點的prev節點和next節點之前的引用,運行到這里我們應該能看到入隊的狀態了,如下圖所示:
如上圖可以清晰的看到,此時擁有鎖的線程是Thread0,而當前線程是Threa1,頭節點為初始化的節點,Ref-707
引用地址所在的Node節點操作當前操作的節點信息,入隊操作后並沒有完成,而是繼續往下進行,此時則進行acquireQueued
這個方法,這個方法是不間斷的去獲取已經入隊隊列中的前節點的狀態,如果前節點的狀態為大於0,則代表當前節點被取消了,會一直往前面的節點進行查找,如果節點狀態小於0並且不等於SIGNAL
則將其設置為SIGNAL
狀態,設置成功后將當前線程掛起,掛起線程后也有可能會反復喚醒掛起操作,原因后面會講到。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //取消節點標志位
try {
boolean interrupted = false; //中斷標志位
for (;;) {
final Node p = node.predecessor(); //獲取前節點
if (p == head && tryAcquire(arg)) { //這里的邏輯是如果前節點為頭結點並且獲取到鎖則進行頭結點變換
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //設置waitStatus狀態
parkAndCheckInterrupt()) //掛起線程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); //取消操作
}
}
前面的源碼可以看到它在acquireQueued
中對已經入隊的節點進行嘗試鎖的獲取,如果鎖獲得就修改頭節點的指針,如果不是頭節點或者爭搶鎖失敗時,此時會進入到shouldParkAfterFailedAcquire
方法,這個方法是獲取不到鎖時需要停止繼續無限期等待鎖,其實就是內部的操作邏輯也很簡單,就是如果前節點狀態為0
時,需要將前節點修改為SIGNAL
,如果前節點大於0
則代表前節點已經被取消了,應該移除隊列,並將前前節點作為當前節點的前節點,一直循環直到前節點狀態修改為SIGNAL
或者前節點被釋放鎖,當前節點獲取到鎖停止循環。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 此節點已經設置了狀態,要求對當前節點進行掛起操作
*/
return true;
if (ws > 0) {
/*
* 如果前節點被取消,則將取消節點移除隊列操作
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus=0或者PROPAGATE時,表示當前節點還沒有被掛起停止,需要等待信號來通知節點停止操作。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
上面的方法其實很容易理解就是等待掛起信號,如果前節點的狀態為0或PROPAGATE則將前節點修改為SIGNAL
,則代表后面前節點釋放鎖后會通知下一個節點,也就是說喚醒下一個可以喚醒的節點繼續爭搶所資源,如果前節點被取消了那就繼續往前尋找不是被取消的節點,這里不會找到前節點為null的情況,因為它默認會有一個空的頭結點,也就是上圖內容,此時的隊列狀態是如何的我們看一下,這里它會進來兩次,以為我們上圖可以看到當前節點前節點是Ref-724
此時waitStatus=0
,他需要先將狀態更改為SIGNAL
也就是運行最有一個else語句,此時又會回到外面的for循環中,由於方法返回的是false則不會運行parkAndCheckInterrupt
方法,而是又循環了一次,此時發現當前節點爭搶鎖又失敗了,然后此時隊列的狀態如下圖所示:
再次進入到方法之后發現前驅節點的waitStatus=-1,表示當前節點需要進行掛起等到,此時返回的結果是true,則會運行parkAndCheckInterrupt
方法,這個方法很簡單就是將當前線程進行掛起操作,如下所示:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //掛起線程
return Thread.interrupted(); //判斷是否被中斷,獲取中斷標識
}
park
掛起線程並且響應中斷信息,其實我們從這里就能發現一個問題,Thread.interrupted方法是用來獲取是否被中斷的標志,如果被中斷則返回true,如果沒有被中斷則返回false,當當前節點被中斷后,其實就會返回true,返回true這里並沒有結束,而是跳到調用地方,也就是acquireQueued
方法內部:
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
以一個案例來進行分析:
public class ReentrantLockDemo {
public static void main(String[] args) throws Exception {
AddDemo runnalbeDemo = new AddDemo();
Thread thread = new Thread(runnalbeDemo::add);
thread.start();
Thread thread1 = new Thread(runnalbeDemo::add);
thread1.start();
Thread thread2 = new Thread(runnalbeDemo::add);
thread2.start();
Thread.sleep(10000);
thread1.interrupt();
System.out.println(runnalbeDemo.getCount());
}
private static class AddDemo {
private final AtomicInteger count = new AtomicInteger();
private final ReentrantLock reentrantLock = new ReentrantLock();
private final Condition condition = reentrantLock.newCondition();
private void add() {
try {
reentrantLock.lock();
count.getAndIncrement();
} finally {
// reentrantLock.unlock();
}
}
int getCount() {
return count.get();
}
}
}
通過上面的例子可以發現,thread1調用中斷方法interrupt(),當調用第一次方法的時候,它會進入到parkAndCheckInterrupt
方法,然后線程響應中斷,最后返回true,最后返回到acquireQueued
方法內部,整個if語句為true,則開始設置interrupted=true,僅僅是設置了等於true,但是這離還會進入下一輪的循環,假如說上次的線程沒有完成任務,則沒有獲取到鎖,還是會進入到shouldParkAfterFailedAcquire
由於已經修改了上一個節點的waitStatus=-1,直接返回true,然后再進入到parkAndCheckInterrupt
又被掛起線程,但是如果上步驟操作他正搶到鎖,則會返回ture,外面也會清除中斷標志位,從這里可以清楚地看到acquire
方法是一個不間斷獲得鎖的操作,可能重復阻塞和解除阻塞操作。
上面阻塞隊列的內容已經講完了,接下來我們看一下unlock都為我們做了什么工作:
public void unlock() {
sync.release(1);
}
我們可以看到他直接調用了獨占模式的release
方法,看一下具體源碼:
public final boolean release(int arg) {
if (tryRelease(arg)) { //調用ReentrantLock中的Sync里面的tryRelease方法
Node h = head; //獲取頭節點
if (h != null && h.waitStatus != 0) //頭節點不為空且狀態不為0時進行unpark方法
unparkSuccessor(h); //喚醒下一個未被取消的節點
return true;
}
return false;
}
release方法,首先先進行嘗試去釋放鎖,如果釋放鎖仍然被占用則直接返回false,如果嘗試釋放鎖時,發現鎖已經釋放,當前線程不在占用鎖資源時,則會進入的下面進行一些列操作后返回true,接下來我們先來看一下ReentrantLock
的Sync
下的tryRelease
方法,如下所示:
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //獲取state狀態,標志信息減少1
if (Thread.currentThread() != getExclusiveOwnerThread()) //線程不一致拋出異常
throw new IllegalMonitorStateException();
boolean free = false; //是否已經釋放鎖
if (c == 0) { //state=0時表示鎖已經釋放
free = true; //將標志free設置為true
setExclusiveOwnerThread(null); //取消獨占鎖信息
}
setState(c); //設置鎖標志信息
return free;
}
看上面的源碼,表示首先先獲取state
狀態,如果state
狀態減少1之后和0不相等則代表有重入鎖,則表示當前線程還在占用所資源,直到線程釋放鎖返回ture標識,還是以上例子為主(此時AddDemo
中的unlock
不在被注釋),分析其現在的隊列中的狀態
釋放鎖后,進入到if語句中,判斷當前頭節點不為空且waitStatus!=0
,通過上圖也可以發現頭節點為-1,則進入到unparkSuccessor
方法內:
private void unparkSuccessor(Node node) {
/*
* 獲取節點的waitStatus狀態
*/
int ws = node.waitStatus;
// 如果小於0則設置為0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 喚醒下一個節點,喚醒下一個節點之前需要判斷節點是否存在或已經被取消了節點,如果沒有節點則不需喚醒操作,如果下一個節點被取消了則一直一個沒有被取消的節點。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
可以看到它是現將頭節點的狀態更新為0,然后再喚醒下一個節點,如果下一個節點為空則直接返回不喚醒任何節點,如果下一個節點被取消了,那么它會從尾節點往前進行遍歷,遍歷與頭節點最近的沒有被取消的節點進行喚醒操作,在喚醒前看一下隊列狀態:
然后喚醒節點后他會進入到parkAndCheckInterrupt
方法里面,再次去執行下面的方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //取消節點標志位
try {
boolean interrupted = false; //中斷標志位
for (;;) {
final Node p = node.predecessor(); //獲取前節點
if (p == head && tryAcquire(arg)) { //這里的邏輯是如果前節點為頭結點並且獲取到鎖則進行頭結點變換
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //設置waitStatus狀態
parkAndCheckInterrupt()) //掛起線程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); //取消操作
}
}
此時獲取p==head成立,並且可以正搶到所資源,所以它會進入到循環體內,進行設置頭結點為當前節點,前節點的下一個節點設置為null,返回中斷標志,看一下此時隊列情況,如下圖所示:
AbstractQueuedSynchronizer
的獨占模式其實提供了三種不同的形式進行獲取鎖操作,看一下下表所示:
方法名稱 | 方法描述 | 對應調用的內部方法 |
---|---|---|
acquire | 以獨占模式進行不間斷的獲取鎖 | tryAcquire,acquireQueued |
acquireInterruptibly | 以獨占模式相應中斷的方式獲取鎖,發生中斷拋出異常 | tryAcquire,doAcquireInterruptibly |
tryAcquireNanos | 以獨占模式相應中斷的方式並且在指定時間內獲取鎖,會阻塞一段時間,如果還未獲得鎖直接返回,發生中斷拋出異常 | tryAcquire,doAcquireNanos |
通過上面圖可以發現,他都會調用圖表一中需要用戶實現的方法,ReentrantLock
實現了獨占模式則內部實現的是tryAcquire
和tryRelease
方法,用來嘗試獲取鎖和嘗試釋放鎖的操作,其實上面內容我們用的是ReentrantLock
中的lock
方法作為同步器,細心的朋友會發現,這個lock
,方法是ReentrantLock實現的,它內部調用了acquire
方法,實現了不間斷的獲取鎖機制,ReentrantLock
中還有一個lockInterruptibly
方法,它內部直接調用的是AbstractQueuedSynchronizer
的acquireInterruptibly
方法,兩個之間的區別在於,兩者都會相應中斷信息,前者不會做任何處理還會進入等待狀態,而后者則拋出異常終止操作,
這里為了詳細看清楚它內部關系我這里用張圖來進行闡述,如下所示:
- 左側代表的事ReentrantLock,右側代表的AQS
- 左側內部黃色區域代表
NonfairSync
- 圖中1和2代表AQS調用其他方法的過程
接下來我們來看一下源碼信息:
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
發現他調用的Sync
類中的acquireInterruptibly
方法,但其實這個方法是AQS中的方法,源碼如下所示:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) //判斷線程是否被中斷
throw new InterruptedException(); //中斷則拋出異常
if (!tryAcquire(arg)) //嘗試獲取鎖
doAcquireInterruptibly(arg); //進行添加隊列,並且修改前置節點狀態,且響應中斷拋出異常
}
通過上面的源碼,它也調用了子類實現的tryAcquire
方法,這個方法和我們上文提到的tryAcquire
是一樣,ReentrantLock
下的NonfairSync
下的tryAcquire
方法,這里這個方法就不多說了詳細請看上文內容,這里主要講一下doAcquireInterruptibly
這個方法:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE); //將節點添加到隊列尾部
boolean failed = true; //失敗匹配機制
try {
for (;;) {
final Node p = node.predecessor(); //獲取前節點
if (p == head && tryAcquire(arg)) { //如果前節點為頭節點並且獲得了鎖
setHead(node); //設置當前節點為頭節點
p.next = null; // help GC //頭節點的下一個節點設置為null
failed = false; //匹配失敗變為false
return;
}
if (shouldParkAfterFailedAcquire(p, node) && //將前節點設置為-1,如果前節點為取消節點則往前一直尋找直到修改為-1為止。
parkAndCheckInterrupt()) //掛起線程返回是否中斷
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
其實這個方法和acquireQueued
區別在於以下幾點:
acquireQueued
是在方法內部添加節點到隊列尾部,而doAcquireInterruptibly
是在方法內部進行添加節點到尾部,這個區別點並不是很重要- 重點是
acquireQueued
響應中斷,但是他不會拋出異常,而后者會拋出異常throw new InterruptedException()
分析到這里我們來用前面的例子來進行模擬一下中中斷的操作,詳細代碼如下所示:
public class ReentrantLockDemo {
public static void main(String[] args) throws Exception {
AddDemo runnalbeDemo = new AddDemo();
Thread thread = new Thread(runnalbeDemo::add);
thread.start();
Thread.sleep(500);
Thread thread1 = new Thread(runnalbeDemo::add);
thread1.start();
Thread.sleep(500);
Thread thread2 = new Thread(runnalbeDemo::add);
thread2.start();
Thread.sleep(500);
Thread thread3 = new Thread(runnalbeDemo::add);
thread3.start();
Thread.sleep(10000);
thread1.interrupt();
System.out.println(runnalbeDemo.getCount());
}
private static class AddDemo {
private final AtomicInteger count = new AtomicInteger();
private final ReentrantLock reentrantLock = new ReentrantLock();
private final Condition condition = reentrantLock.newCondition();
private void add() {
try {
reentrantLock.lockInterruptibly();
count.getAndIncrement();
} catch (Exception ex) {
System.out.println("線程被中斷了");
} finally {
// reentrantLock.unlock();
}
}
int getCount() {
return count.get();
}
}
}
上面的例子其實和前面提到的例子沒有什么太大的差別主要的差別是將lock
替換為lockInterruptibly
,其次就是在三個線程后面講線程1進行中斷操作,這里入隊的操作不在多說,因為操作內容和上面大致相同,下面是四個個線程操作完成的狀態信息:
如果線程等待的過程中拋出異常,則當前線程進入到finally中的時候failed為true,因為修改該字段只有獲取到鎖的時候才會修改為false,進來之后它會運行cancelAcquire
來進行取消當前節點,下面我們先來分析下源碼內容:
private void cancelAcquire(Node node) {
// 如果節點為空直接返回,節點不存在直接返回
if (node == null)
return;
// 設置節點所在的線程為空,清除線程操作
node.thread = null;
// 獲取當前節點的前節點
Node pred = node.prev;
// 如果前節點是取消節點則跳過前節點,一直尋找一個不是取消節點為止
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 獲取頭節點下一個節點
Node predNext = pred.next;
// 這里直接設置為取消節點狀態,沒有使用CAS原因是因為直接設置只有其他線程可以跳過取消的節點
node.waitStatus = Node.CANCELLED;
// 如果當前節點為尾節點,並且設置尾節點為找到的合適的前節點時,修改前節點的下一個節點為null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果不是尾節點,則說明是中間節點,則需要通知后續節點,嘿,伙計你被喚醒了。
int ws;
if (pred != head && //前節點不是頭結點
((ws = pred.waitStatus) == Node.SIGNAL || // 前節點的狀態為SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) //或者前節點狀態小於0而且修改前節點狀態為SIGNAL成功
&& pred.thread != null) { //前節點線程不為空
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//喚醒下一個不是取消的節點
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
- 首先找到當前節點的前節點,如果前節點為取消節點則一直往前尋找一個節點。
- 取消的是尾節點,則直接將前節點的下一個節點設置為null
- 如果取消的是頭節點的下一個節點,且不是尾節點的情況時,它是喚醒下一個節點,喚醒之前並沒有將其移除隊列,而是在喚醒下一個節點的時候,
shouldParkAfterFailedAcquire
里面將取消的節點移除隊列,喚醒之后,當前節點的下一個節點也設置成自己,幫助GC回收它。 - 如果取消節點是中間的節點,則直接將其前節點的下一個節點設置為取消節點的下下個節點即可。
第一種情況如果我們取消的節點是前節點是頭節點,此時線程1的節點應該是被中斷操作,此時進入到cancelAcquire
之后會進入else語句中,然后進去到unparkSuccessor
方法,當進入到這個方法之前我們看一下狀態變化:
我們發現線程1的Node節點的waitStatus變為1也就是Node.CANCELLED
節點,然后運行unparkSuccessor
方法,該方法上面就已經講述了其中的源碼,這里就不在貼源碼了,就是要喚醒下一個沒有被取消的節點,這里是Ref-695
這個線程,當Ref-695
被喚醒之后它會繼續運行下面的內容:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //再一次循環發現還是沒有爭搶到鎖
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && //再一次循環之后有運行到這里了
parkAndCheckInterrupt()) //這里被喚醒了,又要進行循環操作了
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
發現再一次循環操作后,還是沒有正搶到鎖,這時候還是會運行shouldParkAfterFailedAcquire
方法,這個方法內部發現前節點的狀態是Node.CANCELLED
這時候它會在內部先將節點給干掉,也就是這個代碼:
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
最后還是會被掛起狀態,因為沒有釋放鎖操作,最后移除的節點如下所示:
如果取消的事尾節點,也就是線程3被中斷操作,這個是比較簡單的直接將尾節點刪除即可,其中會走如下代碼:
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
}
如果取消的節點是中間的節點,通過上例子中則是取消線程2,其實它內部只是將線程取消線程的前節點的下一個節點指向了取消節點的下節點,如下圖所示:
結束語
這章節分析的主要是ReentrantLock
的內部原理,本來公平模式和非公平模式想放在一起來寫,無奈發現篇幅有點長了,所以就分開進行寫,這樣讀取來不會那么費勁,內部還有條件內容等待下章節分析,如果有分析不到位的請大家指正。