ReentrantLock源碼
-
JUC 指java.util.concurrent包下,一系列關於並發的類,JUC就是包名的首字母
-
CAS 比較並交換,可以看另一篇文章
-
AQS 指主要利用CAS來實現的輕量級多線程同步機制,並且不會在CPU上出現上下文切換和調度的情況
自定義鎖
如何在自己實現一個鎖?
可以定義一個屬性來判斷當前是否有其線程在運行,如果正在運行那么其他線程需要等待
如何實現? 例如有兩個線程T1和T2,都執行同一段代碼
自定義兩個方法
public void lock();
public void unlock();
public void addI(){
i++;
}
將上面的addI方法更改為下面的
public void addI(){
lock();
i++;
unlock();
}
這里忽略程序出錯導致死鎖的情況,正常解鎖需要放在finally代碼塊中
當T1進入代碼,將鎖的改為被持有的狀態
/**
* 0為未持有
* 1為被持有
*/
private volatile int i=0;
public void lock(){
//CAS修改成功返回true
while(CAS(i,1)){
return
}
}
public void unlock(){
i=0;
}
上面的偽代碼當T1進入lock方法后,因為是第一個進入的,鎖的狀態還是0,通過cas可以改為1,修改成功返回true,進入循環return到addI方法,執行i++操作,然后進入unLock方法,將狀態改為0,方法結束
假設當T1進入方法將狀態改為1,那么T2進入會一直循環CAS修改,線程一直在自旋不會走下面的代碼,直到鎖的狀態改為0,才會繼續業務代碼
那么我們就實現了一個簡單的鎖,但是這個鎖有什么缺點呢? 沒有獲取到鎖的線程會一直自旋,消耗系統資源,這個是我們不想看到的
在java中還有一個類LockSupport
,其中有一個park方法
public static void park() {
UNSAFE.park(false, 0L);
}
public native void park(boolean var1, long var2);
里面繼續調用UNSAFE類,這個類里的方法是使用C/C++實現,park方法的作用是將當前線程立即休眠,讓出CPU,直到被喚醒,還有一個喚醒的方法
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
public native void unpark(Object var1);
這個同樣也是其他語言實現,傳入需要被喚醒的線程,那么我們上面的代碼可以改造為
/**
* 0為未持有
* 1為被持有
*/
private volatile int i=0;
//存放等待獲取鎖的線程
private Thread t;
public void lock(){
//CAS修改成功返回true
if(CAS(i,1)){
return
}
//將沒有獲取到鎖的線程存放
t=Thread.currentThread()
//如果沒有獲取到鎖則進行休眠
LockSupport.park();
}
public void unlock(){
i=0;
if(t!=null){
LockSupport.unpark(t);
}
}
我們修改完后即使沒有獲取到鎖的線程也不會占用CPU的資源,但是如果出現2個以上的線程同時進行操作,那么會出現丟失線程的情況,可以再進行優化,將等待的線程存放到隊列中,就不再演示了,而ReentrantLock就是主要使用CAS,park,自旋來實現的,接下來看ReentrantLock的源碼
ReentrantLock
當初始化一個ReentrantLock使用默認構造時創建的是一個非公平鎖
public ReentrantLock() {
sync = new NonfairSync();
}
如果想創建一個公平鎖則使用有參構造
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
這篇文章先來看公平鎖的實現
public void service() {
//創建一個公平鎖
ReentrantLock reentrantLock = new ReentrantLock(true);
reentrantLock.lock();
try {
System.out.println("==這里有一堆的業務===");
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
加鎖
沒有競爭情況
public void lock() {
sync.lock();
}
調用的sync是一個ReentrantLock的內部抽象類
abstract static class Sync extends AbstractQueuedSynchronizer{
......
}
它的公平鎖的實現方法,是FairSync類中的,也是一個內部類,在ReentrantLock中,繼承了Sync類,實現lock方法
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
}
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
點進tryAcquire方法
protected final boolean tryAcquire(int acquires) {
//獲取當前執行的線程
final Thread current = Thread.currentThread();
//得到鎖的狀態
int c = getState();
//如果鎖狀態為0說明當前鎖沒有被其他線程持有
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
繼續點進hasQueuedPredecessors方法,該方法定義在AbstractQueuedSynchronizer抽象類中的
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
其中tail和head這兩個變量是在AbstractQueuedSynchronizer抽象類中定義的,用來存放等待線程頭和尾部
因為當前線程執行前鎖的狀態是未被持有的,所以還沒有初始化過隊列,那么等待隊列的頭和尾部都為null,return的第一個判斷h!=t為false,后面的&&運算符,所以直接返回
那么回到tryAcquire方法,hasQueuedPredecessors返回false,而前面有一個取反!符號,則繼續執行compareAndSetState(0, acquires)
方法,通過cas改變當前鎖的狀態為1,然后執行setExclusiveOwnerThread
方法,該方法就是簡單的賦值
protected final void setExclusiveOwnerThread(Thread thread) {
//當前持有鎖的線程
exclusiveOwnerThread = thread;
}
繼續返回到acquire
方法,為true,取反false,使用了&&阻斷符,則不會執行后面的acquireQueued
方法,直接結束lock()方法,執行自定義的業務代碼
tryAcquire方法什么時候走到 else if (current == getExclusiveOwnerThread()) 判斷呢
ReentrantLock的特性之一就是體現在這里-重入鎖
啥叫重入鎖?簡單講就是在加鎖后又加鎖
public void addI(){
ReentrantLock rLock =new ReentrantLock(true);
rLock.lock();
//執行業務==
rLock.lock();
//執行業務==
//解鎖最后加鎖的
rLock.unlock();
//解鎖最先加鎖的
rLock.unlock();
}
當線程和該鎖已經持有的線程相同時則會進入這個判斷,將鎖的狀態加1,賦值給state,下面的判斷state小於0可能是判斷溢出的問題,即數值超出int類型最大容量則為負數,一般這種情況很少見吧
存在競爭情況
那么上面是沒有其他線程競爭的情況,如果在T1加鎖后,T2,T3..來嘗試獲取鎖改怎么辦呢?->進等待隊列
這個還是tryAcquire方法的代碼,拿下來方便查看
protected final boolean tryAcquire(int acquires) {
//獲取當前執行的線程
final Thread current = Thread.currentThread();
//得到鎖的狀態
int c = getState();
//如果鎖狀態為0說明當前鎖沒有被其他線程持有
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果在T1進行完加鎖后T2來嘗試獲取鎖,因為state狀態不為0,而當前線程和鎖持有的線程又不同,則直接返回false
那么返回acquire方法中
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Node.EXCLUSIVE 返回一個Node節點
取反為true,則執行acquireQueued方法,而acquireQueued方法中有執行了addWaiter方法,先來看addWaiter方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
使用鏈表的形式來存儲阻塞排隊的線程,來看node的內部結構
主要的三個屬性
//存放上一個節點
volatile Node prev;
//存放下一個節點
volatile Node next;
//存放當前等待的線程
volatile Thread thread;
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
當進入這個方法后,首先將AbstractQueuedSynchronizer類中的尾部節點賦值給一個臨時變量,判斷尾部是否為空,假設現在線程為T2,隊列還沒有被初始化,尾部為空,則進入enq
方法,繼續點進
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//CAS設置頭節點
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//CAS設置尾巴節點
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
還是將AbstractQueuedSynchronizer類中尾部節點賦值給臨時變量t
然后判斷t是否為空,因為隊列還沒有初始化,所以尾巴節點為空,則使用cas來設置 AbstractQueuedSynchronizer類中的頭節點,之后將設置的頭節點賦值給尾部
當執行完節點的關系如下
這時候有個疑問,怎么沒有設置傳入的Node節點呢?而是設置新new出來的Node,和參數傳入的Node節點沒有一點關系?
注意看上面的代碼for(;;)
死循環,當下次循環的時候t已經不為空了,因為上次循環給加了一個空節點,然后將傳入的Node節點的上一個賦值為t,然后通過CAS獲取AbstractQueuedSynchronizer類中的尾部節點,如果尾部節點還是為t,則更改為傳入的node對象,如果CAS失敗,即在CAS設置前被其他線程對AbstractQueuedSynchronizer類中的尾部節點進行了修改,則進行下一次for循環,直至設置成功,當操作完成后,節點結構如下圖
之后代碼返回到acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
還是一個for死循環,首先獲取上一個節點和AbstractQueuedSynchronizer類中的頭節點進行判斷,如果相同則調用tryAcquire()方法嘗試獲取鎖,因為在初始化隊列過程中可能獲取鎖執行的線程已經執行完了,並且釋放了鎖,所以這里嘗試一下獲取鎖,假設沒有獲取到鎖,則不會進入if (p == head && tryAcquire(arg)) {}代碼塊,繼續下面的判斷,進入shouldParkAfterFailedAcquire()方法,從名稱可以看到[在獲取鎖失敗后應該睡眠]
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
判斷上一個node節點的狀態,將上一個節點的Node.SIGNAL狀態的值為-1,而上面的代碼中並沒有對waitStatus的值進行更改,默認初始化為0,則進入最后的else代碼塊,通過CAS將waitStatus的值改為-1,方法返回false結束,回到acquireQueued方法中,繼續進行for循環,假設還是沒有獲取到鎖,則再次進入shouldParkAfterFailedAcquire方法中,因為上次for循環將waitStatus的值改為了-1,則這次進入了if (ws == Node.SIGNAL)
的代碼塊,返回true,返回到 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())判斷中,因為shouldParkAfterFailedAcquire方法返回了true,則繼續執行parkAndCheckInterrupt方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
當執行完parkAndCheckInterrupt方法后,T2線程就在這里進行休眠
為什么不開始就把waitStatus設置為-1呢?還要多自旋一次,有一個原因是盡量不使用park,能嘗試獲取到鎖最好
那么假設現在又來一個線程T3
public final void acquire(int arg) {
//嘗試獲取鎖肯定不會成功,則進入acquireQueued,addWaiter方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//這時tail已經是t2節點了
Node pred = tail;
//不為空進入
if (pred != null) {
//將當前節點上一個節點設置為t2
node.prev = pred;
//通過CAS來設置AQS類中的尾節點
if (compareAndSetTail(pred, node)) {
//然后設置T2的下一個節點
pred.next = node;
return node;
}
}
enq(node);
return node;
}
完成操作后節點關系如下
之后繼續執行acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取上一個節點:T2
final Node p = node.predecessor();
//T2不是頭節點,則不進入下面的代碼塊
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//之后調用shouldParkAfterFailedAcquire方法
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//同樣的代碼,第一次獲取T3的前一個節點T2,判斷T2的ws值為0,
//CAS修改后返回,外層循環再次進入這時T2的ws值為-1,返回true,方法結束
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
解鎖
假設現在T1執行unlock方法,T2,T3在隊列中
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
進入tryRelease方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
//把當前持有鎖的線程清空
setExclusiveOwnerThread(null);
}
//設置鎖的狀態
setState(c);
return free;
}
首先將狀態數值-1,判斷如果當前線程和持有鎖的線程不是同一個則拋出異常,即解鎖的線程和加鎖的不是同一個線程
判斷如果c==0,也就是沒有重入鎖的情況,將free改為true,然后進入setExclusiveOwnerThread方法
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final void setState(int newState) {
state = newState;
}
方法返回,沒有重入鎖的情況,則free為true,獲取AQS類中的頭節點,假設不為空,ws=-1,則進入unparkSuccessor(h)
方法
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
首先獲取頭結點的狀態,小於0進入代碼塊,將頭結點的鎖狀態改為0,獲取下一個節點,那么s就是t2,而t2的ws也是-1,所以直接進入最下面的代碼塊,if(s!=null),unpark(t2)線程
那么回到t2線程休眠的地方
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
//在這里醒來
return Thread.interrupted();
}
下面的是判斷線程是否被中斷過,native方法,無法看到實現了,那么假設沒有被中斷過則返回false,那么返回上一個方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//返回代碼后繼續執行這里
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
因為parkAndCheckInterrupt方法返回false,所以進不去代碼塊,那么繼續執行for,當執行if (p == head && tryAcquire(arg))
時p==head成立,而調用tryAcquire方法嘗試獲取鎖成功,因為t1已經釋放了,那么進入下面的代碼塊
if (p == head && tryAcquire(arg)) {
setHead(node);
//設置t2上一個節點,也就是空節點的下一個節點設置為null
p.next = null; // help GC p節點沒有任何引用指向了,幫助垃圾回收
failed = false;
return interrupted;
}
private void setHead(Node node) {
//將t2節點設置為頭部
head = node;
//然后將t2節點的thread設置為null
node.thread = null;
//節點的上一個節點設置為null
node.prev = null;
}
經過上面的操作后節點關系如下
如果這個節點在頭說明它正在執行代碼,而不是排隊,即使初始化時T1沒有進隊列,但是給它添加了一個空node,來代替它正在執行
例如有T2,T3在排隊,T1線程unpark后T2線程執行,上面的代碼也能說明T2會先把當前節點的線程,上下節點都設置為null,而T2線程去執行代碼去了,已經在運行過程中了
看別的博客有一段解釋:比如你去買車票,你如果是第一個這個時候售票員已經在給你服務了,你不算排隊,你后面的才算排隊
注意一點:隊列頭始終為空Node
如何保證公平
情況1
T1執行完unpark后,釋放完鎖,還沒來的及喚醒隊列中的T2,這時T3線程來嘗試獲取到鎖
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
這種情況隊列中肯定有節點排隊,如果沒有節點直接獲取到鎖也是公平的,那么有節點排隊h就不等於t,true,&&運算符繼續判斷,h的next節點也不為null,返回false
s.thread != Thread.currentThread()
如果當前來嘗試獲取鎖的對象不是在排隊的第一個(也就是頭結點的下一個節點,頭結點正在運行,不算在排隊的隊列中)也就是其他線程插隊的情況,則返回true,結果就是(true&&(false||true)) 整體返回true,外層代碼取反為false,不會嘗試CAS獲取鎖,則T3去排隊
情況2
T2嘗試獲取鎖時發現T1持有鎖,於是去初始化隊列,在初始化過程中T1執行完釋放鎖,T2執行初始化隊列代碼時間片用完,這時T3來嘗試獲取鎖
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))<------假設T2初始化隊列執行到這里CPU時間片用完
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
此時節點關系如下
那么回到hasQueuedPredecessors
方法,看最后的return
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
h頭節點為一個空node,而t為節點為null,不等於true繼續判斷,h頭結點下一個為null,整體返回true,外層代碼取反為false,則去排隊
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
遺留問題
1 初始化隊列以及后面的入隊為什么要設置空的頭節點
2 在parkAndCheckInterrupt()方法中最后調用的Thread.interrupted();一系列方法最后不改變任何東西,不明白它這個的作用,也有說是為了復用lockInterruptibly()方法,但是感覺有點牽強
太笨了看不明白,希望不吝賜教,也可以加qq群一起探討:737698533