昨天有說過后面講ReentrantLock,今天我們這篇幅就全局的講解下,我們在Lock出來前,解決並發問題沒得選只能用Synchronized。
一.ReentrantLock PK synchronized
(1)synchronized是獨占鎖,加鎖和解鎖的過程自動進行,易於操作,但不夠靈活。ReentrantLock也是獨占鎖,加鎖和解鎖的過程需要手動進行,不易操作,但非常靈活。
(2)synchronized可重入,因為加鎖和解鎖自動進行,不必擔心最后是否釋放鎖;ReentrantLock也可重入,但加鎖和解鎖需要手動進行,且次數需一樣,否則其他線程無法獲得鎖。
(3)synchronized不可響應中斷,一個線程獲取不到鎖就一直等着;ReentrantLock可以相應中斷。
ReentrantLock好像比synchronized關鍵字沒好太多,我們再去看看synchronized所沒有的,一個最主要的就是ReentrantLock還可以實現公平鎖機制。什么叫公平鎖呢?也就是在鎖上等待時間最長的線程將獲得鎖的使用權。通俗的理解就是誰排隊時間最長誰先執行獲取鎖。
Lock接口的一些方法:

- lock():是最常用的獲取鎖的方法,若鎖被其他線程獲取,則等待(阻塞)。
- lockInterruptibly():獲取鎖,如果鎖可用則線程繼續執行;如果鎖不可用則線程進入阻塞狀態,此時可以在其它線程執行時調用這個線程的interrupt方法打斷它的阻塞狀態。
- tryLock():嘗試非阻塞地獲取鎖,立即返回。獲取成功返回true;獲取失敗返回false,但不會阻塞。 (這個方法比synchronized好)
- tryLock(long time, TimeUnit unit):阻塞嘗試鎖。參數代表時長,在指定時長內嘗試鎖。
- unlock():如果沒有獲取鎖標記就放鎖,會拋出異常。
二. Lock實現類介紹
public class ReentrantLockDemo {
private static int count=0;
//重入鎖(如何實現的?)
static Lock lock=new ReentrantLock();
public static void inc(){
lock.lock(); //獲得鎖(互斥鎖) ThreadA 獲得了鎖
try {
Thread.sleep(1);
count++;
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();//釋放鎖 ThreadA釋放鎖 state=1-1=0
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000; i++) {
new Thread(()-> ReentrantLockDemo.inc()).start();
}
Thread.sleep(4000);
System.out.println("result:"+count);
}
}
2.ReentrantReadWriteLock(重入讀寫鎖)
讀多寫少的情況下,讀和讀不互斥,讀和寫互斥,寫和寫互斥
public class ReentrantReadWriteLockDemo {
static Map<String,Object> cacheMap=new HashMap<>();
static ReentrantReadWriteLock rwl=new ReentrantReadWriteLock();
static Lock read=rwl.readLock();
static Lock write=rwl.writeLock();
public static Object get(String key){
read.lock(); //讀鎖 ThreadA 阻塞
try{
return cacheMap.get(key);
}finally {
read.unlock(); //釋放讀鎖
}
}
public static Object write(String key,Object value){
write.lock(); //Other Thread 獲得了寫鎖
try{
return cacheMap.put(key,value);
}finally {
write.unlock();
}
}
}
三.思考鎖的實現
關於鎖我們講了很多,也寫了很多案例,下面我們就底層是怎么實現鎖的機制來進行一個猜想設計然后帶着我們的猜想去看大佬們的源碼是不是和我們的猜想一樣:
1.首先鎖的互斥的原理是多個線程訪問同一個共享資源只有一個能進去訪問,我們這里要分析鎖的互斥特性是怎么實現的:要實現互斥性首先我們要有一個共享變量,然后在設計時用一個狀態來標記共享資源的狀態(例如0,1)
2.沒有搶占到鎖的線程怎么玩,沒有搶到鎖的線程就要阻塞等待,想到等待就很容易想起前面篇幅講的wait(等待、喚醒),但是這里不是用wait因為wait/notify不能喚醒指定的線程,所以我們想到了另一個方案,LockSupport.park()
3.等待中的線程是怎么存儲的,這里面想到的是雙向鏈表
4.公平和非公平(能否插隊)
5.鎖的重入的特性(識別是否是同一個線程)重入次數可以用數字累加
下面我們就lock.lock(); 是怎么實現的進行深入分析下:l
我們在多線程訪問lock.lock()方法時如果獲取lock權限的線程就可以向下執行,沒有獲取權限的線程就會阻塞,這個方向是大方向

下面我們就lock.lock()方法里面做了什么事情,首先看到他調用了sync.lock();

我們看下類的關系圖,其中ReentantLock是Lock的一個實現我們從下面關系圖片中可以看出ReentrantLockK中定義了一個sync

我們可以看到Sync是一個靜態的抽像內部類,他繼承了AbstractQueuedSynchronizer

我 們回退到sync.lock();方法,他實現了兩種鎖,一種是共平鎖一種是非公平鎖,類關系圖如下

在sync.lock()中默認是非公平鎖,那么我們在sync.lock()中進入NonfairSync方法中,首先他進來第一件事是搶占資源,在這里的判斷compareAndSetState保證了多線程下的原子性,這里的compareAndSetState判斷是采用了樂觀鎖機制來進行加鎖,在很多源碼中都有用到CAS操作,其中expect是預期值,update是更改值,這個操作是直接跟內存交互,這樣做的好處是保證只有一個線程能進入,進入后操作setExclusiveOwnerThread(Thread.currentThread());保存當前線程

我們進入他的判斷方法共享資源compareAndSetState中看下他是怎么修改預期值的,stateOffset是當前state屬性成員在內存中的偏移量,他會通過內存中的偏移量去拿到內存中的值 和我們的預期值對比,如果相等就修改,這里面設計的好處是直接跟內存交互,不讓我們java代碼操作,可以在java層面解決多線程問題


上面圖片是線程搶占成功的邏輯,其它線程搶占失敗就走下面acquire(1)的邏輯了,這個acquire邏輯是由AQS來實現的;
- ! tryAcquire(arg)
- addWaiter 將未獲得鎖的線程加入到隊列
- acquireQueued(); 去搶占鎖或者阻塞.

我們先看下tryAcquire(arg)的實現,我們選擇它NonfairSync實現

這下面的邏輯是繼續去搶占鎖的邏輯,
final boolean nonfairTryAcquire(int acquires) {
//獲取當前線程
final Thread current = Thread.currentThread();
//判斷其狀態
int c = getState();
//條件成立表示無鎖,
if (c == 0) {
//無鎖的操作一定要變成CAS操作,因為修改本身存在原子性問題
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 這里面是判斷重入的,判斷當前線程和我們有鎖的線程是否相等
else if (current == getExclusiveOwnerThread()) {
//如果相等就加一個次數
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 因為當前是有鎖狀態,所以不用再用CAS操作
setState(nextc);
return true;
}
return false;
}
條件! tryAcquire(arg)不成立就會進入acquireQueued(addWaiter(Node.EXCLUSIVE), arg))判斷中來,將未獲得鎖的線程加入到隊列;addWaiter是做一個鏈表然后加入acquireQueued中進行循環的判斷;Node.EXCLUSIVE表示節點互斥的一個特性;我們進入addWaiter方法
private Node addWaiter(Node mode) {
//進來第一件事是先構造一個節點,這個節點會先把當前線程和mode(表示獨占)傳進來,如果有多個線程沒有搶到鎖那就有多個線程進入這個方法,也就代表了有多個Node節點
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//這里會拿到一個tail節點,tail表示尾部節點,一般鏈表都會有一個頭節點Head和尾節點Tail,這一步的頭尾節點還沒有初始化,還是空指向
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//第一次進來尾節點一定是空的,所以第一次進來是走enq方法
enq(node);
return node;
}
我們進入enq(node)方法
private Node enq(final Node node) {
//通過自旋的方式進行FOR循環
for (;;) {
//得到一個尾節點,此時尾節點還是空
Node t = tail;
if (t == null) { // Must initialize
//初始化一個空的Node節點,這個compareAndSetHead只有在空的情況下才會替換,CAS保證只有一個線程能替換成功
if (compareAndSetHead(new Node()))
//將頭和尾都指向這個剛剛初始化的空節點,到這一步的時序圖如圖一;這一步完成后初始化就完成了,然后進入下一次循環t就不為空了走else邏輯
tail = head;
} else {
//node表示當前進來的線程,我們假設是B線程進來了,此時因為t不為空了,所以當前線程的prev指向空的Node節點
node.prev = t;
if (compareAndSetTail(t, node)) {
//操作尾部節點t.next表示上一個節點的指向指向當前節點,這樣一個雙向鏈表就形成了,在多個for循環后的時序圖就如圖二
t.next = node;
return t;
}
}
}
}

圖一

圖二
addWaiter(Node.EXCLUSIVE), arg)代碼執行完成后,他會把參數返回添加到acquireQueued里面去,我們進入acquireQueued,這里面一定會做的一件事就是阻塞列表中的線程
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//又是自旋
for (;;) {
// 假設這里面的node是我們線程B的話,他的predecessor()方法可以點進去看下,會發現是當前線程的prev,由上面時序圖會發現其實就是Head節點
final Node p = node.predecessor();
//如果頭節點是head節點就會去搶占一次鎖,成功就獲得鎖,失敗走下面
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//是否要掛起一個線程,我們進入shouldParkAfterFailedAcquire方法
if (shouldParkAfterFailedAcquire(p, node) &&
//parkAndCheckInterrupt是掛起(阻塞)
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
阻塞狀態是沒有必要去搶占鎖的,下面就是通過判斷是不是偏鎖狀態來決定要不要去釋放鎖,如果是偏鎖就釋放鎖
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//等待狀態,如果線程出現異常會出來偏鎖狀態
int ws = pred.waitStatus;
//SIGNAL是喚醒狀態成立就可以放心掛起(-1)
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//偏鎖狀態ws會大於o
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
//將取消狀態的移除節點
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//替換節點狀態改成SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

前面的掛起完成后代表lock.lock()方法執行完成了,接下來我們就講下lock.unlock()釋放鎖的過程,這時候釋放鎖是線程A來釋放鎖,我們來看lock.unlock()的ReentrantLock實現
public final boolean release(int arg) {
//進入tryRelease方法
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//重置信息完成后會通過下面方法進行喚醒阻塞線程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//將state恢復原有值
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
//如果剛好c==0就釋放線程並把線程清空,如圖三
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

圖三
我們進入unparkSuccessor方法中
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
//如果成立
if (ws < 0)
//先恢復成初始狀態
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//獲取下一個節點
Node s = node.next;
//如果下個節點為空,則除去無效節點
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒下一個節點,喚醒后的線程又要搶占鎖又會進入前面的acquireQueued方法進行自旋,搶占失敗的線程又要掛起
//喚醒完成后喚醒的線會去執行代碼程序
LockSupport.unpark(s.thread);
}

