多線程(四) AQS底層原理分析


J.U.C 簡介

Java.util.concurrent 是在並發編程中比較常用的工具類,里面包含很多用來在並發
場景中使用的組件。比如線程池、阻塞隊列、計時器、同步器、並發集合等等。並
發包的作者是大名鼎鼎的 Doug Lea。我們在接下來的課程中,回去剖析一些經典
的比較常用的組件的設計思想

Lock

Lock 在 J.U.C 中是最核心的組件,前面我們講 synchronized 的時候說過,鎖最重
要的特性就是解決並發安全問題。為什么要以 Lock 作為切入點呢?如果有同學看
過 J.U.C 包中的所有組件,一定會發現絕大部分的組件都有用到了 Lock。所以通
過 Lock 作為切入點使得在后續的學習過程中會更加輕松。
Lock 簡介
在 Lock 接口出現之前,Java 中的應用程序對於多線程的並發安全處理只能基於
synchronized 關鍵字來解決。但是 synchronized 在有些場景中會存在一些短板,
也就是它並不適合於所有的並發場景。但是在 Java5 以后,Lock 的出現可以解決
synchronized 在某些場景中的短板,它比 synchronized 更加靈活。
Lock 的實現
Lock 本質上是一個接口,它定義了釋放鎖和獲得鎖的抽象方法,定義成接口就意
味着它定義了鎖的一個標准規范,也同時意味着鎖的不同實現。實現 Lock 接口的類有很多,以下為幾個常見的鎖實現
 
ReentrantLock:表示重入鎖,它是唯一一個實現了 Lock 接口的類。
重入鎖指的是線程在獲得鎖之后,再次獲取該鎖不需要阻塞,而是直接關聯一次計數器增加重入次數
 
ReentrantReadWriteLock:重入讀寫鎖,它實現了 ReadWriteLock 接口,在這個
類中維護了兩個鎖,一個是 ReadLock,一個是 WriteLock,他們都分別實現了 Lock接口。
讀寫鎖是一種適合讀多寫少的場景下解決線程安全問題的工具,基本原則是: 讀和讀不互斥、讀和寫互斥、寫和寫互斥。
也就是說涉及到影響數據變化的操作都會存在互斥。
 
StampedLock: stampedLock 是 JDK8 引入的新的鎖機制,可以簡單認為是讀寫鎖的一個改進版本,讀寫鎖雖然通過分離讀和寫的功能使得讀和讀之間可以完全
並發,但是讀和寫是有沖突的,如果大量的讀線程存在,可能會引起寫線程的飢餓。
stampedLock 是一種樂觀的讀策略,使得樂觀鎖完全不會阻塞寫線程
 
Lock 的類關系圖
Lock 有很多的鎖的實現,但是直觀的實現是 ReentrantLock 重入鎖

 

 Lock接口

void lock() // 如果鎖可用就獲得鎖,如果鎖不可用就阻塞直到鎖釋放
void lockInterruptibly() // 和lock()方法相似, 但阻塞的線程 可 中 斷 , 拋 出java.lang.InterruptedException 異常
boolean tryLock() // 非阻塞獲取鎖;嘗試獲取鎖,如果成功返回 true
boolean tryLock(long timeout, TimeUnit timeUnit) //帶有超時時間的獲取鎖方法
void unlock() // 釋放鎖
ReentrantLock 重入鎖
重入鎖,表示支持重新進入的鎖,也就是說,如果當前線程 t1 通過調用 lock 方法獲取了鎖之后,再次調用 lock,
是不會再阻塞去獲取鎖的,直接增加重試次數就行了。synchronized 和 ReentrantLock 都是可重入鎖。
很多同學不理解為什么鎖會存在重入的特性,那是因為對於同步鎖的理解程度還不夠,比如在下面這類
的場景中,存在多個加鎖的方法的相互調用,其實就是一種重入特性的場景。重入鎖的設計目的
比如調用 demo 方法獲得了當前的對象鎖,然后在這個方法中再去調用demo2,demo2 中的存在同一個實例鎖,這個時候當前線程
會因為無法獲得demo2 的對象鎖而阻塞,就會產生死鎖。重入鎖的設計目的是避免線程的死鎖。 
public class ReentrantDemo{
 public synchronized void demo(){
 System.out.println("begin:demo");
 demo2();
 }
 public void demo2(){
 System.out.println("begin:demo1");
 synchronized (this){
 }
 }
 public static void main(String[] args) {
 ReentrantDemo rd=new ReentrantDemo();
 new Thread(rd::demo).start();
 } }
ReentrantLock 的使用案例
public class AtomicDemo {
    private static int count=0;
    static Lock lock=new ReentrantLock();
    public static void inc(){
        lock.lock();
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        count++;
        lock.unlock();
    }
    public static void main(String[] args) throws
            InterruptedException {
        for(int i=0;i<1000;i++){
            new Thread(()->{AtomicDemo.inc();}).start();;
        }
        Thread.sleep(3000);
        System.out.println("result:"+count);
    }
}
ReentrantReadWriteLock
我們以前理解的鎖,基本都是排他鎖,也就是這些鎖在同一時刻只允許一個線程進行訪問,而讀寫所在同一時刻可以允許多個線程訪問,
但是在寫線程訪問時,所有的讀線程和其他寫線程都會被阻塞。讀寫鎖維護了一對鎖,一個讀鎖、一個寫鎖;一般情況下,讀寫鎖的性能都會比排它鎖好,
因為大多數場景讀是多於寫的。在讀多於寫的情況下,讀寫鎖能夠提供比排它鎖更好的並發性和吞吐量.
package com.lf.threaddemo;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class LockDemo {
    static Map<String, Object> cacheMap = new HashMap<>();
    static ReentrantReadWriteLock rwl = new
            ReentrantReadWriteLock();
    static Lock read = rwl.readLock();
    static Lock write = rwl.writeLock();

    public static final Object get(String key) {
        System.out.println("開始讀取數據");
        read.lock(); //讀鎖
        try {
            return cacheMap.get(key);
        } finally {
            read.unlock();
        }
    }

    public static final Object put(String key, Object value) {
        write.lock();
        System.out.println("開始寫數據");
        try {
            return cacheMap.put(key, value);
        } finally {
            write.unlock();
        }
    }
}
在這個案例中,通過 hashmap 來模擬了一個內存緩存,然后使用讀寫所來保證這
個內存緩存的線程安全性。當執行讀操作的時候,需要獲取讀鎖,在並發訪問的時
候,讀鎖不會被阻塞,因為讀操作不會影響執行結果。
在執行寫操作是,線程必須要獲取寫鎖,當已經有線程持有寫鎖的情況下,當前線
程會被阻塞,只有當寫鎖釋放以后,其他讀寫操作才能繼續執行。使用讀寫鎖提升
讀操作的並發性,也保證每次寫操作對所有的讀寫操作的可見性
⚫ 讀鎖與讀鎖可以共享
⚫ 讀鎖與寫鎖不可以共享(排他)
⚫ 寫鎖與寫鎖不可以共享(排他)ReentrantLock 的實

StampedLock

 StampedLock 支持三種模式,分別是:寫鎖、悲觀讀鎖和樂觀讀。其中,寫鎖、悲觀讀鎖的語義和 ReadWriteLock 的寫鎖、讀鎖的語義非常類似,

允許多個線程同時獲取悲觀讀鎖,但是只允許一個線程獲取寫鎖,寫鎖和悲觀讀鎖是互斥的。不同的是:StampedLock 里的寫鎖和悲觀讀鎖加鎖成功之后,

都會返回一個 stamp;然后解鎖的時候,需要傳入這個 stamp。相關的示例代碼如下。

final StampedLock sl = new StampedLock();
  
// 獲取/釋放悲觀讀鎖示意代碼
long stamp = sl.readLock();
try {
  //省略業務相關代碼
} finally {
  sl.unlockRead(stamp);
}

// 獲取/釋放寫鎖示意代碼
long stamp = sl.writeLock();
try {
  //省略業務相關代碼
} finally {
  sl.unlockWrite(stamp);
}

StampedLock 的性能之所以比 ReadWriteLock 還要好,其關鍵是 StampedLock 支持樂觀讀的方式。

ReadWriteLock 支持多個線程同時讀,但是當多個線程同時讀的時候,
所有的寫操作會被阻塞;而 StampedLock 提供的樂觀讀,是允許一個線程獲取寫鎖的,也就是說不是所有的寫操作都被阻塞。
注意這里,我們用的是“樂觀讀”這個詞,而不是“樂觀讀鎖”,是要提醒你,樂觀讀這個操作是無鎖的,所以相比較 ReadWriteLock 的讀鎖,樂觀讀的性能更好一些。

StampedLock 使用注意事項對於讀多寫少的場景 StampedLock 性能很好,簡單的應用場景基本上可以替代 ReadWriteLock,

但是 StampedLock 的功能僅僅是 ReadWriteLock 的子集,在使用的時候,還是有幾個地方需要注意一下。
StampedLock 在命名上並沒有增加 Reentrant,想必你已經猜測到 StampedLock 應該是不可重入的。

事實上,的確是這樣的,StampedLock 不支持重入。這個是在使用中必須要特別注意的。
另外,StampedLock 的悲觀讀鎖、寫鎖都不支持條件變量,這個也需要你注意。還有一點需要特別注意,

那就是:如果線程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上時,此時調用該阻塞線程的 interrupt() 方法,會導致 CPU 飆升。
例如下面的代碼中,線程 T1 獲取寫鎖之后將自己阻塞,線程 T2 嘗試獲取悲觀讀鎖,也會阻塞;

如果此時調用線程 T2 的 interrupt() 方法來中斷線程 T2 的話,你會發現線程 T2 所在 CPU 會飆升到 100%。

final StampedLock lock
  = new StampedLock();
Thread T1 = new Thread(()->{
  // 獲取寫鎖
  lock.writeLock();
  // 永遠阻塞在此處,不釋放寫鎖
  LockSupport.park();
});
T1.start();
// 保證T1獲取寫鎖
Thread.sleep(100);
Thread T2 = new Thread(()->
  //阻塞在悲觀讀鎖
  lock.readLock()
);
T2.start();
// 保證T2阻塞在讀鎖
Thread.sleep(100);
//中斷線程T2
//會導致線程T2所在CPU飆升
T2.interrupt();
T2.join();

所以,使用 StampedLock 一定不要調用中斷操作,如果需要支持中斷功能,一定使用可中斷的悲觀讀鎖 readLockInterruptibly() 和寫鎖 writeLockInterruptibly()。這個規則一定要記清楚

StampedLock 的使用看上去有點復雜,但是如果你能理解樂觀鎖背后的原理,使用起來還是比較流暢的。建議你認真揣摩 Java 的官方示例,這個示例基本上就是一個最佳實踐。

我們把 Java 官方示例精簡后,形成下面的代碼模板,建議你在實際工作中盡量按照這個模板來使用 StampedLock。

StampedLock 讀模板:

final StampedLock sl =   new StampedLock();
// 樂觀讀
long stamp =   sl.tryOptimisticRead();
// 讀入方法局部變量
......
// 校驗stamp
if (!sl.validate(stamp)){ 
 // 升級為悲觀讀鎖  
stamp = sl.readLock(); 
    try {   
    // 讀入方法局部變量   
     ..... 
     } finally { 
   //釋放悲觀讀鎖
       sl.unlockRead(stamp); 
 }
}
//使用方法局部變量執行業務操作
......

StampedLock 寫模板:

long stamp = sl.writeLock();
try {
  // 寫共享變量
  ......
} finally {
  sl.unlockWrite(stamp);
}

 

ReentrantLock 的實現原理

我們知道鎖的基本原理是,基於將多線程並行任務通過某一種機制實現線程的串行執行,
從而達到線程安全性的目的。在 synchronized 中,我們分析了 偏向鎖、輕量級鎖、樂觀鎖
基於樂觀鎖以及自旋鎖來優化了 synchronized 的加鎖開銷,同時在重量級鎖階段,通過線程的阻塞以及喚醒來達到線程競爭和同步的目的。
那么在 ReentrantLock 中,也一定會存在這樣的需要去解決的問題。就是在多線程競爭重入鎖時,競爭失敗的線程是如何實現阻塞以及被喚醒的呢?
 
AQS 是什么
在 Lock 中,用到了一個同步隊列 AQS,全稱 AbstractQueuedSynchronizer,它是一個同步工具也是 Lock 用來實現線程同步的核心組件。
如果你搞懂了 AQS,那么 J.U.C 中絕大部分的工具都能輕松掌握。
AQS 的兩種功能
從使用層面來說,AQS 的功能分為兩種: 獨占和共享
獨占鎖,每次只能有一個線程持有鎖,比如前面給大家演示的 ReentrantLock 就是以獨占方式實現的互斥鎖
共 享 鎖 , 允 許 多 個 線 程 同 時 獲 取 鎖 , 並 發 訪 問 共 享 資 源 , 比 如ReentrantReadWriteLock
 
AQS 的內部實現
AQS 隊列內部維護的是一個 FIFO 的雙向鏈表,這種結構的特點是每個數據結構都有兩個指針,分別指向直接的后繼節點和直接前驅節點。
所以雙向鏈表可以從任意一個節點開始很方便的訪問前驅和后繼。每個 Node 其實是由線程封裝,當線
程爭搶鎖失敗后會封裝成 Node 加入到 ASQ 隊列中去;當獲取鎖的線程釋放鎖以后,會從隊列中喚醒一個阻塞的節點(線程)。

 

 

Node 的組成

 

釋放鎖以及添加線程對於隊列的變化
當出現鎖競爭以及釋放鎖的時候,AQS 同步隊列中的節點會發生變化,首先看一下添加節點的場景。

 

 

里面會涉及到兩個變化
1. 新的線程封裝成 Node 節點追加到同步隊列中,設置 prev 節點以及修改當前節點的前置節點的 next 節點指向自己
2. 通過 CAS 講 tail 重新指向新的尾部節點
head 節點表示獲取鎖成功的節點,當頭結點在釋放同步狀態時,會喚醒后繼節點,
如果后繼節點獲得鎖成功,會把自己設置為頭結點,節點的變化過程如下

 

這個過程也是涉及到兩個變化
1. 修改 head 節點指向下一個獲得鎖的節點
2. 新的獲得鎖的節點,將 prev 的指針指向 null
設置 head 節點不需要用 CAS,原因是設置 head 節點是由獲得鎖的線程來完成
的,而同步鎖只能由一個線程獲得,所以不需要 CAS 保證,只需要把 head 節點設置為原首節點的后繼節點,並且斷開原 head 節點的 next 引用即可
 
ReentrantLock 的源碼分析
以 ReentrantLock 作為切入點,來看看在這個場景中是如何使用 AQS 來實現線程的同步的
 
ReentrantLock 的時序圖
調用 ReentrantLock 中的 lock()方法,源碼的調用過程我使用了時序圖來展現。

 

ReentrantLock.lock()
這個是 reentrantLock 獲取鎖的入口
public void lock() {
 sync.lock();
}
sync 實際上是一個抽象的靜態內部類,它繼承了 AQS 來實現重入鎖的邏輯,我們前面說過 AQS 是一個同步隊列,它能夠實現線程的阻塞以及喚醒,但它並不具備
業務功能,所以在不同的同步場景中,會繼承 AQS 來實現對應場景的功能。Sync 有兩個具體的實現類,分別是:
NofairSync:表示可以存在搶占鎖的功能,也就是說不管當前隊列上是否存在其他線程等待,新線程都有機會搶占鎖
FailSync: 表示所有線程嚴格按照 FIFO 來獲取鎖
NofairSync.lock
以非公平鎖為例,來看看 lock 中的實現
1. 非公平鎖和公平鎖最大的區別在於,在非公平鎖中我搶占鎖的邏輯是,不管有沒有線程排隊,我先上來 cas 去搶占一下
2. CAS 成功,就表示成功獲得了鎖
3. CAS 失敗,調用 acquire(1)走鎖競爭邏輯
final void lock() {
 if (compareAndSetState(0, 1))
 
setExclusiveOwnerThread(Thread.currentThread());
 else
 acquire(1);
}
CAS 的實現原理
protected final boolean compareAndSetState(int expect, int update) {
 // See below for intrinsics setup to support this
 return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
通過 cas 樂觀鎖的方式來做比較並替換,這段代碼的意思是,如果當前內存中的
state 的值和預期值 expect 相等,則替換為 update。更新成功返回 true,否則返回 false.
這個操作是原子的,不會出現線程安全問題,這里面涉及到Unsafe這個類的操作,
以及涉及到 state 這個屬性的意義。
state 是 AQS 中的一個屬性,它在不同的實現中所表達的含義不一樣,對於重入
鎖的實現來說,表示一個同步狀態。它有兩個含義的表示
1. 當 state=0 時,表示無鎖狀態
2. 當 state>0 時,表示已經有線程獲得了鎖,也就是 state=1,但是因為ReentrantLock 允許重入,
所以同一個線程多次獲得同步鎖的時候,state 會遞增,比如重入 5 次,那么 state=5。
而在釋放鎖的時候,同樣需要釋放 5 次直到 state=0其他線程才有資格獲得鎖。
 
Unsafe 類
Unsafe 類是在 sun.misc 包下,不屬於 Java 標准。但是很多 Java 的基礎類庫,包
括一些被廣泛使用的高性能開發庫都是基於 Unsafe 類開發的,比如 Netty、Hadoop、Kafka 等;
Unsafe 可認為是 Java 中留下的后門,提供了一些低層次操作,如 直接內存訪問、線程的掛起和恢復、CAS、線程同步、內存屏障
而 CAS 就是 Unsafe 類中提供的一個原子操作,第一個參數為需要改變的對象,
第二個為偏移量(即之前求出來的 headOffset 的值),第三個參數為期待的值,第
四個為更新后的值整個方法的作用是如果當前時刻的值等於預期值 var4 相等,則
更新為新的期望值 var5,如果更新成功,則返回 true,否則返回 false;
stateOffset
一個 Java 對象可以看成是一段內存,每個字段都得按照一定的順序放在這段內存
里,通過這個方法可以准確地告訴你某個字段相對於對象的起始內存地址的字節
偏移。用於在后面的 compareAndSwapInt 中,去根據偏移量找到對象在內存中的具體位置
所以 stateOffset 表示 state 這個字段在 AQS 類的內存中相對於該類首地址的偏移量。
compareAndSwapInt
unsafe.cpp 文件中,可以找到 compareAndSwarpInt 的實現
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
 UnsafeWrapper("Unsafe_CompareAndSwapInt");
 oop p = JNIHandles::resolve(obj); //將 Java 對象解析成 JVM 的 oop(普通對象指針),
 jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); //根據對象 p和地址偏移量找到地址
 return (jint)(Atomic::cmpxchg(x, addr, e)) == e; //基於 cas 比較並替換, x 表示需要更新的值,addr 表示 state 在內存中的地址,e 表示預期值
UNSAFE_END
AQS.accquire
acquire 是 AQS 中的方法,如果 CAS 操作未能成功,說明 state 已經不為 0,此時繼續 acquire(1)操作
➢ 大家思考一下,acquire 方法中的 1 的參數是用來做什么呢?
這個方法的主要邏輯是
1. 通過 tryAcquire 嘗試獲取獨占鎖,如果成功返回 true,失敗返回 false
2. 如果 tryAcquire 失敗,則會通過 addWaiter 方法將當前線程封裝成 Node 添加到 AQS 隊列尾部
3. acquireQueued,將 Node 作為參數,通過自旋去嘗試獲取鎖。 
public final void acquire(int arg) {
 if (!tryAcquire(arg) &&
 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 selfInterrupt();
}
NonfairSync.tryAcquire
這個方法的作用是嘗試獲取鎖,如果成功返回 true,不成功返回 false
它是重寫 AQS 類中的 tryAcquire 方法,並且大家仔細看一下 AQS 中 tryAcquire
方法的定義,並沒有實現,而是拋出異常。按照一般的思維模式,既然是一個不實
現的模版方法,那應該定義成 abstract,讓子類來實現呀?大家想想為什么
protected final boolean tryAcquire(int acquires) {
 return nonfairTryAcquire(acquires);
}
ReentrantLock.nofairTryAcquire
1. 獲取當前線程,判斷當前的鎖的狀態
2. 如果 state=0 表示當前是無鎖狀態,通過 cas 更新 state 狀態的值
3. 當前線程是屬於重入,則增加重入次數
final boolean nonfairTryAcquire(int acquires) {
 final Thread current = Thread.currentThread();//獲取當前執行的線程
 int c = getState();//獲得 state 的值
 if (c == 0) {//表示無鎖狀態
   if (compareAndSetState(0, acquires)) {//cas 替換 state 的值,cas 成功表示獲取鎖成功
     setExclusiveOwnerThread(current);//保存當前獲得鎖的線程,下次再來的時候不要再嘗試競爭鎖
     return true;
  }
 }
 else if (current == getExclusiveOwnerThread()) {//如果同一個線程來獲得鎖,直接增加重入次數
   int nextc = c + acquires;
   if (nextc < 0) // overflow
     throw new Error("Maximum lock count exceeded");
     setState(nextc);
    return true;
   }
 return false; 
}
AQS.addWaiter
當 tryAcquire 方法獲取鎖失敗以后,則會先調用 addWaiter 將當前線程封裝成Node.
入參 mode 表示當前節點的狀態,傳遞的參數是 Node.EXCLUSIVE,表示獨占狀態。意味着重入鎖用到了 AQS 的獨占鎖功能
1. 將當前線程封裝成 Node
2. 當前鏈表中的 tail 節點是否為空,如果不為空,則通過 cas 操作把當前線程的node 添加到 AQS 隊列
3. 如果為空或者 cas 失敗,調用 enq 將節點添加到 AQS 隊列
private Node addWaiter(Node mode) {
 Node node = new Node(Thread.currentThread(), mode);//當前線程封裝為 Node
 Node pred = tail; //tail 是 AQS 中表示同比隊列隊尾的屬性,默認null
 if (pred != null) {//tail 不為空的情況下,說明隊列中存在節點
   node.prev = pred;//把當前線程的 Node 的 prev 指向 tail
   if (compareAndSetTail(pred, node)) {//通過 cas 把 node加入到 AQS 隊列,也就是設置為 tail
     pred.next = node;//設置成功以后,把原 tail 節點的 next指向當前 node
     return node;
   }
 }
 enq(node);//tail=null,把 node 添加到同步隊列
 return node;
}
enq
enq 就是通過自旋操作把當前節點加入到隊列中
private Node enq(final Node node) {
 for (;;) {
   Node t = tail;
   if (t == null) { // Must initialize
     if (compareAndSetHead(new Node()))
     tail = head;
   } else {
     node.prev = t;
     if (compareAndSetTail(t, node)) {
     t.next = node;
       return t;
     }
   }
 }
}
圖解分析
假設 3 個線程來爭搶鎖,那么截止到 enq 方法運行結束之后,或者調用 addwaiter方法結束后,AQS 中的鏈表結構圖

 

 

AQS.acquireQueued
通過 addWaiter 方法把線程添加到鏈表后,會接着把 Node 作為參數傳遞給acquireQueued 方法,去競爭鎖
1. 獲取當前節點的 prev 節點
2. 如果 prev 節點為 head 節點,那么它就有資格去爭搶鎖,調用 tryAcquire 搶占鎖
3. 搶占鎖成功以后,把獲得鎖的節點設置為 head,並且移除原來的初始化 head節點
4. 如果獲得鎖失敗,則根據 waitStatus 決定是否需要掛起線程
5. 最后,通過 cancelAcquire 取消獲得鎖的操作
final boolean acquireQueued(final Node node, int arg) {
 boolean failed = true;
 try {
   boolean interrupted = false;
   for (;;) {
   final Node p = node.predecessor();//取當前節點的 prev 節點
   if (p == head && tryAcquire(arg)) {//果是 head 節點,說明有資格去爭搶鎖
   setHead(node);//獲取鎖成功,也就是ThreadA 已經釋放了鎖,然后設置 head 為 ThreadB 獲得執行權限
   p.next = null; //把原 head 節點從鏈表中移除
   failed = false;
   return interrupted;
 }//ThreadA 可能還沒釋放鎖,使得 ThreadB 在執行 tryAcquire 時會返回 false
  if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
   interrupted = true; //並且返回當前線程在等待過程中有沒有中斷過。
 }
 } finally {
   if (failed)
   cancelAcquire(node);
 } 
}
NofairSync.tryAcquire
這個方法在前面分析過,就是通過 state 的狀態來判斷是否處於無鎖狀態,然后在通過 cas 進行競爭鎖操作。
成功表示獲得鎖,失敗表示獲得鎖失敗
shouldParkAfterFailedAcquire
如果 ThreadA 的鎖還沒有釋放的情況下,ThreadB 和 ThreadC 來爭搶鎖肯定是會失敗,那么失敗以后會調用 shouldParkAfterFailedAcquire 方法
Node 有 5 中狀態,分別是:CANCELLED(1),SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、默認狀態(0)
CANCELLED: 在同步隊列中等待的線程等待超時或被中斷,需要從同步隊列中取消該 Node 的結點, 其結點的 waitStatus 為 CANCELLED,
即結束狀態,進入該狀態后的結點將不會再變化
SIGNAL: 只要前置節點釋放鎖,就會通知標識為 SIGNAL 狀態的后續節點的線程CONDITION: 和 Condition 有關系,后續會講解
PROPAGATE:共享模式下,PROPAGATE 狀態的線程處於可運行狀態
0:初始狀態
這個方法的主要作用是,通過 Node 的狀態來判斷,ThreadA 競爭鎖失敗以后是否應該被掛起。
1. 如果 ThreadA 的 pred 節點狀態為 SIGNAL,那就表示可以放心掛起當前線程
2. 通過循環掃描鏈表把 CANCELLED 狀態的節點移除
3. 修改 pred 節點的狀態為 SIGNAL,返回 false.
返回 false 時,也就是不需要掛起,返回 true,則需要調用 parkAndCheckInterrupt掛起當前線程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 int ws = pred.waitStatus;//前置節點的waitStatus
 if (ws == Node.SIGNAL)//如果前置節點為 SIGNAL,意味着只需要等待其他前置節點的線程被釋放,
   return true;//返回 true,意味着可以直接放心的掛起了
 if (ws > 0) {//ws 大於 0,意味着 prev 節點取消了排隊,直接移除這個節點就行
 do {
   node.prev = pred = pred.prev;
  //相當於: pred=pred.prev; node.prev=pred;
   } while (pred.waitStatus > 0); //這里采用循環,從雙向列表中移除 CANCELLED 的節點
   pred.next = node;
   } else {//利用 cas 設置 prev 節點的狀態為 SIGNAL(-1)
   compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
 }
 return false; 
}
parkAndCheckInterrupt
使用 LockSupport.park 掛起當前線程編程 WATING 狀態
Thread.interrupted,返回當前線程是否被其他線程觸發過中斷請求,也就是
thread.interrupt(); 如果有觸發過中斷請求,那么這個方法會返回當前的中斷標識
true,並且對中斷標識進行復位標識已經響應過了中斷請求。如果返回 true,意味
着在 acquire 方法中會執行 selfInterrupt()。
private final boolean parkAndCheckInterrupt() {
  LockSupport.park(this);
  return Thread.interrupted();
}
selfInterrupt: 標識如果當前線程在 acquireQueued 中被中斷過,則需要產生一
個中斷請求,原因是線程在調用 acquireQueued 方法的時候是不會響應中斷請求的
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
圖解分析
通過 acquireQueued 方法來競爭鎖,如果 ThreadA 還在執行中沒有釋放鎖的話,意味着 ThreadB 和 ThreadC 只能掛起了

 

 

LockSupport
LockSupport類是 Java6引入的一個類,提供了基本的線程同步原語。LockSupport實際上是調用了 Unsafe 類里的函數,歸結到 Unsafe 里,只有兩個函數
unpark 函數為線程提供“許可(permit)”,線程調用 park 函數則等待“許可”。這個有
點像信號量,但是這個“許可”是不能疊加的,“許可”是一次性的。
permit 相當於 0/1 的開關,默認是 0,調用一次 unpark 就加 1 變成了 1.調用一次park 會消費 permit,又會變成 0。 如果再調用一次 park 會阻塞,因為 permit 已
經是 0 了。直到 permit 變成 1.這時調用 unpark 會把 permit 設置為 1.每個線程都
有一個相關的 permit,permit 最多只有一個,重復調用 unpark 不會累積 。

 

鎖的釋放流程

如果這個時候 ThreadA 釋放鎖了,那么我們來看鎖被釋放后會產生什么效果
ReentrantLock.unlock
在 unlock 中,會調用 release 方法來釋放鎖
public final boolean release(int arg) {
 if (tryRelease(arg)) { //釋放鎖成功
 Node h = head; //得到 aqs 中 head 節點
 if (h != null && h.waitStatus != 0)//如果 head 節點不為空並且狀態!=0.調用 unparkSuccessor(h)喚醒后續節點
 unparkSuccessor(h);
 return true;
 }
 return false; 
}
ReentrantLock.tryRelease
這個方法可以認為是一個設置鎖狀態的操作,通過將 state 狀態減掉傳入的參數值
(參數是 1),如果結果狀態為 0,就將排它鎖的 Owner 設置為 null,以使得其它的線程有機會進行執行。
在排它鎖中,加鎖的時候狀態會增加 1(當然可以自己修改這個值),在解鎖的時候減掉 1,同一個鎖,
在可以重入后,可能會被疊加為 2、3、4 這些值,只有 unlock()的次數與 lock()的次數對應才會將 Owner 線程設置為空,
而且也只有這種情況下才會返回 true。
protected final boolean tryRelease(int releases) 
{
 int c = getState() - releases;
 if (Thread.currentThread() != getExclusiveOwnerThread())
 throw new IllegalMonitorStateException();
 boolean free = false;
 if (c == 0) {
 free = true;
 setExclusiveOwnerThread(null);
 }
 setState(c);
 return free;
}
unparkSuccessor
private void unparkSuccessor(Node node) {
 int ws = node.waitStatus;//獲得 head 節點的狀態
 if (ws < 0)
 compareAndSetWaitStatus(node, ws, 0);// 設置 head 節點狀態為 0
 Node s = node.next;//得到 head 節點的下一個節點
 if (s == null || s.waitStatus > 0) {
//如果下一個節點為 null 或者 status>0 表示 cancelled 狀態. //通過從尾部節點開始掃描,找到距離 head 最近的一個waitStatus<=0 的節點
 s = null;
 for (Node t = tail; t != null && t != node; t = t.prev)
 if (t.waitStatus <= 0)
 s = t;
 }
 if (s != null) //next 節點不為空,直接喚醒這個線程即可
 LockSupport.unpark(s.thread);
}
為什么在釋放鎖的時候是從 tail 進行掃描
這個問題有幾個同學問過我,我覺得有必要單獨擰出來說一下,我們再回到 enq那個方法。
在標注為紅色部分的代碼來看一個新的節點是如何加入到鏈表中的
1. 將新的節點的 prev 指向 tail
2. 通過 cas 將 tail 設置為新的節點,因為 cas 是原子操作所以能夠保證線程安全性
3. t.next=node;設置原 tail 的 next 節點指向新的節點
private Node enq(final Node node) {
 for (;;) {
   Node t = tail;
   if (t == null) { // Must initialize
     if (compareAndSetHead(new Node()))
       tail = head;
     } else {
       node.prev = t;
       if (compareAndSetTail(t, node)) {
         t.next = node;
         return t;
       }
     }
   } 
}

 

在 cas 操作之后,t.next=node 操作之前。存在其他線程調用 unlock 方法從 head開始往后遍歷,由於 t.next=node 還沒執行意味着鏈表的關系還沒有建立完整。
就會導致遍歷到 t 節點的時候被中斷。所以從后往前遍歷,一定不會存在這個問題。
圖解分析
通過鎖的釋放,原本的結構就發生了一些變化。head 節點的 waitStatus 變成了 0,ThreadB 被喚醒

 

原本掛起的線程繼續執行
通過 ReentrantLock.unlock,原本掛起的線程被喚醒以后繼續執行,應該從哪里執行大家還有印象吧。 原來被掛起的線程是在 acquireQueued 方法中,所以被喚
醒以后繼續從這個方法開始執行
AQS.acquireQueued
這個方法前面已經完整分析過了,我們只關注一下 ThreadB 被喚醒以后的執行流程。
由於 ThreadB 的 prev 節點指向的是 head,並且 ThreadA 已經釋放了鎖。所以這個時候調用 tryAcquire 方法時,可以順利獲取到鎖。
1. 把 ThreadB 節點當成 head
2. 把原 head 節點的 next 節點指向為 null 
final boolean acquireQueued(final Node node, int arg) {
 boolean failed = true;
 try {
   boolean interrupted = false;
   for (;;) {
     final Node p = node.predecessor();
     if (p == head && tryAcquire(arg)) {
       setHead(node);
       p.next = null; // help GC
       failed = false;
       return interrupted;
     }
     if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
       interrupted = true;
     }
 } finally {
   if (failed)
   cancelAcquire(node);
 } 
}
圖解分析
1. 設置新 head 節點的 prev=null
2. 設置原 head 節點的 next 節點為 null 

 

公平鎖和非公平鎖的區別

鎖的公平性是相對於獲取鎖的順序而言的,如果是一個公平鎖,那么鎖的獲取順序就應該符合請求的絕對時間順序,也就是 FIFO。
在上面分析的例子來說,只要CAS 設置同步狀態成功,則表示當前線程獲取了鎖,而公平鎖則不一樣,差異點有兩個
FairSync.tryAcquire 
final void lock() {
 acquire(1);
}
非公平鎖在獲取鎖的時候,會先通過 CAS 進行搶占,而公平鎖則不會
FairSync.tryAcquire 
protected final boolean tryAcquire(int acquires) {
 final Thread current = Thread.currentThread();
 int c = getState();
 if (c == 0) {
   if (!hasQueuedPredecessors() &&
     compareAndSetState(0, acquires)) {
     setExclusiveOwnerThread(current);
     return true;
   }
 }else if (current == getExclusiveOwnerThread()) {
   int nextc = c + acquires;
   if (nextc < 0)
     throw new Error("Maximum lock count exceeded");
     setState(nextc);
     return true;
   }
 return false; 
}
這個方法與 nonfairTryAcquire(int acquires)比較,不同的地方在於判斷條件多了hasQueuedPredecessors()方法,
也就是加入了 [同步隊列中當前節點是否有前驅節點]的判斷,如果該方法返回 true,則表示有線程比當前線程更早地請求獲取鎖,
因此需要等待前驅線程獲取並釋放鎖之后才能繼續獲取鎖。
 

Condition

在前面學習 synchronized 的時候,有講到 wait/notify 的基本使用,結合synchronized 可以實現對線程的通信。
那么這個時候我就在思考了,既然 J.U.C 里面提供了鎖的實現機制,那 J.U.C 里面有沒有提供類似的線程通信的工具呢? 於是找阿找,發現了一個 Condition 工具類。
Condition 是一個多線程協調通信的工具類,可以讓某些線程一起等待某個條件(condition),只有滿足條件時,線程才會被喚醒 。
Condition 的基本使用
ConditionWait
public class ConditionDemoWait implements Runnable{
 private Lock lock;
 private Condition condition;
 public ConditionDemoWait(Lock lock, Condition condition){
   this.lock=lock;
   this.condition=condition;
 }
 @Override
 public void run() {
   System.out.println("begin -ConditionDemoWait");
   try {
     lock.lock();
     condition.await();
     System.out.println("end - ConditionDemoWait");
   } catch (InterruptedException e) {
     e.printStackTrace();
   }finally {
     lock.unlock();
   }
 }
}
ConditionSignal 
public class ConditionDemoSignal implements Runnable{
 private Lock lock;
 private Condition condition;
 public ConditionDemoSignal(Lock lock, Condition condition){
   this.lock=lock;
   this.condition=condition;
 }
 @Override
 public void run() {
   System.out.println("begin -ConditionDemoSignal");
   try {
     lock.lock();
     condition.signal();
    System.out.println("end - ConditionDemoSignal");
   }finally {
     lock.unlock();
   }
 }
}
通過這個案例簡單實現了 wait 和 notify 的功能,當調用 await 方法后,當前線程
會釋放鎖並等待,而其他線程調用 condition 對象的 signal 或者 signalall 方法通
知並被阻塞的線程,然后自己執行 unlock 釋放鎖,被喚醒的線程獲得之前的鎖繼
續執行,最后釋放鎖。
所以,condition 中兩個最重要的方法,一個是 await,一個是 signal 方法
await:把當前線程阻塞掛起
signal:喚醒阻塞的線程

Condition 源碼分析

調用 Condition,需要獲得 Lock 鎖,所以意味着會存在一個 AQS 同步隊列,先來看 Condition.await 方法
condition.await
調用 Condition 的 await()方法(或者以 await 開頭的方法),會使當前線程進入等待隊列並釋放鎖,同時線程狀態變為等待狀態。
當從 await()方法返回時,當前線程一定獲取了 Condition 相關聯的鎖。
public final void await() throws InterruptedException {
 if (Thread.interrupted())
 throw new InterruptedException();
 Node node = addConditionWaiter(); //創建一個新的節點,節點狀態為 condition,采用的數據結構仍然是鏈表
 int savedState = fullyRelease(node); //釋放當前的鎖,得到鎖的狀態,並喚醒 AQS 隊列中的一個線程
 int interruptMode = 0;
 //如果當前節點沒有在同步隊列上,即還沒有被 signal,則將當前線程阻塞
while (!isOnSyncQueue(node)) {//判斷這個節點是否在 AQS 隊列上,第一次判斷的是 false,因為前面已經釋放鎖了
 LockSupport.park(this); // 第一次總是 park 自己,開始阻塞等待
// 線程判斷自己在等待過程中是否被中斷了,如果沒有中斷,則再次循環,會在 isOnSyncQueue 中判斷自己是否在隊列上.
// isOnSyncQueue 判斷當前 node 狀態,如果是 CONDITION 狀態,或者不在隊列上了,就繼續阻塞.
// isOnSyncQueue 判斷當前 node 還在隊列上且不是 CONDITION 狀態了,就結束循環和阻塞.
 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
 break;
 }
// 當這個線程醒來,會嘗試拿鎖, 當 acquireQueued 返回 false 就是拿到鎖了.
 // interruptMode != THROW_IE -> 表示這個線程沒有成功將 node 入隊,但 signal 執行了 enq 方法讓其入隊了.
// 將這個變量設置成 REINTERRUPT.
 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
 interruptMode = REINTERRUPT;
// 如果 node 的下一個等待者不是 null, 則進行清理,清理 Condition 隊列上的節點. 
// 如果是 null ,就沒有什么好清理的了.
 if (node.nextWaiter != null) // clean up if cancelled
 unlinkCancelledWaiters();
// 如果線程被中斷了,需要拋出異常.或者什么都不做
 if (interruptMode != 0)
 reportInterruptAfterWait(interruptMode);
}
Condition.signal
調用 Condition 的 signal()方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點之前,會將節點移到同步隊列中
public final void signal() {
 if (!isHeldExclusively()) //先判斷當前線程是否獲得了鎖
 throw new IllegalMonitorStateException();
 Node first = firstWaiter; // 拿到 Condition 隊列上第一個節點
 if (first != null)
 doSignal(first);
}
Condition.doSignal 
private void doSignal(Node first) {
 do {
   if ( (firstWaiter = first.nextWaiter) == null)// 如果第一個節點的下一個節點是 null, 那么, 最后一個節點也是 null.
   lastWaiter = null; // 將 next 節點設置成 null
   first.nextWaiter = null;
   } while (!transferForSignal(first) &&(first = firstWaiter) != null);
}
 
AQS.transferForSignal
該方法先是 CAS 修改了節點狀態,如果成功,就將這個節點放到 AQS 隊列中,
然后喚醒這個節點上的線程。此時,那個節點就會在 await 方法中蘇醒
final boolean transferForSignal(Node node) {
 /*
 * If cannot change waitStatus, the node has been cancelled.
 */
 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
 return false;
 Node p = enq(node);
 int ws = p.waitStatus;
// 如果上一個節點的狀態被取消了, 或者嘗試設置上一個節點的狀態為 SIGNAL 失敗了(SIGNAL 表示: 他的 next 節點需要停止阻塞),
 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
 LockSupport.unpark(node.thread); // 喚醒輸入節點上的線程.
 return true;
}
Condition 總結
阻塞: await()方法中,在線程釋放鎖資源之后,如果節點不在 AQS 等待隊列,則阻塞當前線程,如果在等待隊列,則自旋等待嘗試獲取鎖。
釋放: signal()后,節點會從 condition 隊列移動到 AQS 等待隊列,則進入正常鎖的獲取流程。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM