AQS底層原理分析


J.U.C 簡介

Java.util.concurrent 是在並發編程中比較常用的工具類,里面包含很多用來在並發場景中使用的組件。比如線程池、阻塞隊列、計時器、同步器、並發集合等等。並發包的作者是大名鼎鼎的 Doug Lea。我們在接下來剖析一些經典的比較常用的組件的設計思想。

Lock

Lock 在 J.U.C 中是最核心的組件,前面我們講 synchronized 的時候說過,鎖最重 要的特性就是解決並發安全問題。為什么要以 Lock 作為切入點呢?如果有看 過 J.U.C 包中的所有組件,一定會發現絕大部分的組件都有用到了 Lock。所以通 過 Lock 作為切入點使得在后續的學習過程中會更加輕松。

Lock 簡介

在 Lock 接口出現之前,Java 中的應用程序對於多線程的並發安全處理只能基於 synchronized 關鍵字來解決。但是 synchronized 在有些場景中會存在一些短板, 也就是它並不適合於所有的並發場景。但是在 Java5 以后,Lock 的出現可以解決 synchronized 在某些場景中的短板,它比 synchronized 更加靈活。

Lock 的實現

Lock 本質上是一個接口,它定義了釋放鎖和獲得鎖的抽象方法,定義成接口就意 味着它定義了鎖的一個標准規范,也同時意味着鎖的不同實現。實現 Lock 接口的類有很多,以下為幾個常見的鎖實現:
ReentrantLock:表示重入鎖,它是唯一一個實現了 Lock 接口的類。重入鎖指的是 線程在獲得鎖之后,再次獲取該鎖不需要阻塞,而是直接關聯一次計數器增加重入 次數。
ReentrantReadWriteLock:重入讀寫鎖,它實現了 ReadWriteLock 接口,在這個 類中維護了兩個鎖,一個是 ReadLock,一個是 WriteLock,他們都分別實現了 Lock 接口。讀寫鎖是一種適合讀多寫少的場景下解決線程安全問題的工具,基本原則 是: 讀和讀不互斥、讀和寫互斥、寫和寫互斥。也就是說涉及到影響數據變化的 操作都會存在互斥。
StampedLock: stampedLock 是 JDK8 引入的新的鎖機制,可以簡單認為是讀寫 鎖的一個改進版本,讀寫鎖雖然通過分離讀和寫的功能使得讀和讀之間可以完全
並發,但是讀和寫是有沖突的,如果大量的讀線程存在,可能會引起寫線程的飢餓。 stampedLock 是一種樂觀的讀策略,使得樂觀鎖完全不會阻塞寫線程。
Lock 的類關系圖

Lock 有很多的鎖的實現,但是直觀的實現是 ReentrantLock 重入鎖

void lock() // 如果鎖可用就 獲得鎖,如果鎖不可用就阻塞 直到鎖釋放
void lockInterruptibly() // 和 lock()方法相似, 但阻塞的線 程 可 中 斷 ,             拋 出j ava.lang.InterruptedExcepti on 異常
boolean tryLock() // 非阻塞 獲取鎖;嘗試獲取鎖,如果成功 返回 true
boolean tryLock(long timeout, TimeUnit timeUnit) //帶有超時時間的獲
            取鎖方法
void unlock() // 釋放鎖

 

ReentrantLock 重入鎖

重入鎖,表示支持重新進入的鎖,也就是說,如果當前線程 t1 通過調用 lock 方 法獲取了鎖之后,再次調用 lock,是不會再阻塞去獲取鎖的,直接增加重試次數 就行了。synchronized 和 ReentrantLock 都是可重入鎖。不理解為什么鎖會存在重入的特性,那是因為對於同步鎖的理解程度還不夠,比如在下面這類 的場景中,存在多個加鎖的方法的相互調用,其實就是一種重入特性的場景。

重入鎖的設計目的

比如調用 demo 方法獲得了當前的對象鎖,然后在這個方法中再去調用 demo2,demo2 中的存在同一個實例鎖,這個時候當前線程會因為無法獲得
demo2 的對象鎖而阻塞,就會產生死鎖。重入鎖的設計目的是避免線程的死 鎖。
public class ReentrantDemo{
  public synchronized void demo(){
    System.out.println("begin:demo");
    demo2();
  }
  public void demo2(){
    System.out.println("begin:demo1");
    synchronized (this){
    }
  }
  public static void main(String[] args) {
    ReentrantDemo rd=new ReentrantDemo();
    new Thread(rd::demo).start();
}
}
ReentrantLock 的使用案例
public class AtomicDemo {
  private static int count=0;
  static Lock lock=new ReentrantLock();
  public static void inc(){
    lock.lock();
    try {
      Thread.sleep(1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    count++;
    lock.unlock();
  }
  public static void main(String[] args) throws InterruptedException {
    for(int i=0;i<1000;i++){
      new Thread(()->{AtomicDemo.inc();}).start();;
    }
    Thread.sleep(3000);System.out.println("result:"+count);
  }
}

 

ReentrantReadWriteLock

我們以前理解的鎖,基本都是排他鎖,也就是這些鎖在同一時刻只允許一個線程進 行訪問,而讀寫所在同一時刻可以允許多個線程訪問,但是在寫線程訪問時,所有 的讀線程和其他寫線程都會被阻塞。讀寫鎖維護了一對鎖,一個讀鎖、一個寫鎖; 一般情況下,讀寫鎖的性能都會比排它鎖好,因為大多數場景讀是多於寫的。在讀 多於寫的情況下,讀寫鎖能夠提供比排它鎖更好的並發性和吞吐量.
public class LockDemo {
  static Map<String,Object> cacheMap=new HashMap<>();
  static ReentrantReadWriteLock rwl=new ReentrantReadWriteLock();
  static Lock read=rwl.readLock();
  static Lock write=rwl.writeLock();
  public static final Object get(String key) {
    System.out.println("開始讀取數據");
    read.lock(); //讀鎖
    try {
      return cacheMap.get(key);
    }finally {       read.unlock();     }   }   public static final Object put(String key,Object value){     write.lock();     System.out.println("開始寫數據");     try{       return cacheMap.put(key,value);     }finally {       write.unlock();     }   } }

 

在這個案例中,通過 hashmap 來模擬了一個內存緩存,然后使用讀寫所來保證這 個內存緩存的線程安全性。當執行讀操作的時候,需要獲取讀鎖,在並發訪問的時 候,讀鎖不會被阻塞,因為讀操作不會影響執行結果。
在執行寫操作是,線程必須要獲取寫鎖,當已經有線程持有寫鎖的情況下,當前線 程會被阻塞,只有當寫鎖釋放以后,其他讀寫操作才能繼續執行。使用讀寫鎖提升 讀操作的並發性,也保證每次寫操作對所有的讀寫操作的可見性
⚫ 讀鎖與讀鎖可以共享
⚫ 讀鎖與寫鎖不可以共享(排他)
⚫ 寫鎖與寫鎖不可以共享(排他)

ReentrantLock 的實現原理

我們知道鎖的基本原理是,基於將多線程並行任務通過某一種機制實現線程的串 行執行,從而達到線程安全性的目的。在 synchronized 中,我們分析了偏向鎖、
輕量級鎖、樂觀鎖。基於樂觀鎖以及自旋鎖來優化了 synchronized 的加鎖開銷, 同時在重量級鎖階段,通過線程的阻塞以及喚醒來達到線程競爭和同步的目的。
那么在 ReentrantLock 中,也一定會存在這樣的需要去解決的問題。就是在多線程 競爭重入鎖時,競爭失敗的線程是如何實現阻塞以及被喚醒的呢?

AQS 是什么

在 Lock 中,用到了一個同步隊列 AQS,全稱 AbstractQueuedSynchronizer,它 是一個同步工具也是 Lock 用來實現線程同步的核心組件。如果你搞懂了 AQS,那 么 J.U.C 中絕大部分的工具都能輕松掌握。
AQS 的兩種功能
從使用層面來說,AQS 的功能分為兩種:獨占和共享。
獨占鎖:每次只能有一個線程持有鎖,比如前面給大家演示的 ReentrantLock 就是 以獨占方式實現的互斥鎖;
共享鎖:允 許 多 個 線 程 同 時 獲 取 鎖 , 並 發 訪 問 共 享 資 源 , 比 如 ReentrantReadWriteLock

AQS 的內部實現

AQS 隊列內部維護的是一個 FIFO 的雙向鏈表,這種結構的特點是每個數據結構都有兩個指針,分別指向直接的后繼節點和直接前驅節點。所以雙向鏈表可以從任
意一個節點開始很方便的訪問前驅和后繼。每個 Node 其實是由線程封裝,當線 程爭搶鎖失敗后會封裝成 Node 加入到 ASQ 隊列中去;當獲取鎖的線程釋放鎖以
后,會從隊列中喚醒一個阻塞的節點(線程)。
Node 的組成
 static final class Node {
      static final Node SHARED = new Node();
      static final Node EXCLUSIVE = null;

      static final int CANCELLED =  1;
      static final int SIGNAL    = -1;
      static final int CONDITION = -2;
      static final int PROPAGATE = -3;
    volatile int waitStatus;
      volatile Node prev;//前驅節點
      volatile Node next;//后驅節點
      volatile Thread thread;//當前線程
      Node nextWaiter;//存儲在condition 隊列中的后繼節點
      final boolean isShared() {//是否為共享鎖
            return nextWaiter == SHARED;
      }
      final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
      }
      Node() {    // Used to establish initial head or SHARED marker
      }
      Node(Thread thread, Node mode) {     // Used by addWaiter 將線程構造一個節點,添加隊列
            this.nextWaiter = mode;
            this.thread = thread;
      }
      Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
      }
    }

 

釋放鎖以及添加線程對於隊列的變化

當出現鎖競爭以及釋放鎖的時候,AQS 同步隊列中的節點會發生變化,首先看一下添加節點的場景。
這里會涉及到兩個變化
1. 新的線程封裝成 Node 節點追加到同步隊列中,設置 prev 節點以及修改當前節 點的前置節點的 next 節點指向自己
2. 通過 CAS 講 tail 重新指向新的尾部節點
head 節點表示獲取鎖成功的節點,當頭結點在釋放同步狀態時,會喚醒后繼節點, 如果后繼節點獲得鎖成功,會把自己設置為頭結點,節點的變化過程如下
這個過程也是涉及到兩個變化
1. 修改 head 節點指向下一個獲得鎖的節點
2. 新的獲得鎖的節點,將 prev 的指針指向 null
設置 head 節點不需要用 CAS,原因是設置 head 節點是由獲得鎖的線程來完成的,而同步鎖只能由一個線程獲得,所以不需要 CAS 保證,只需要把 head 節點設置為原首節點的后繼節點,並且斷開原 head 節點的 next 引用即可

ReentrantLock 的源碼分析

以 ReentrantLock 作為切入點,來看看在這個場景中是如何使用 AQS 來實現線程 的同步的。

ReentrantLock 的時序圖

調用 ReentrantLock 中的 lock()方法,源碼的調用過程我使用了時序圖來展現。

 
 
ReentrantLock.lock()
這個是 reentrantLock 獲取鎖的入口
public void lock() {
  sync.lock();
}
sync 實際上是一個抽象的靜態內部類,它繼承了 AQS 來實現重入鎖的邏輯,我們前面說過 AQS 是一個同步隊列,它能夠實現線程的阻塞以及喚醒,但它並不具備 業務功能,所以在不同的同步場景中,會繼承 AQS 來實現對應場景的功能, Sync 有兩個具體的實現類,分別是:
NofairSync:表示可以存在搶占鎖的功能,也就是說不管當前隊列上是否存在其他 線程等待,新線程都有機會搶占鎖
FailSync: 表示所有線程嚴格按照 FIFO 來獲取鎖
NofairSync.lock
以非公平鎖為例,來看看 lock 中的實現
1. 非公平鎖和公平鎖最大的區別在於,在非公平鎖中我搶占鎖的邏輯是,不管有 沒有線程排隊,我先上來 cas 去搶占一下
2. CAS 成功,就表示成功獲得了鎖
3. CAS 失敗,調用 acquire(1)走鎖競爭邏輯
final void lock() {
  if (compareAndSetState(0, 1))
    setExclusiveOwnerThread(Thread.currentThread());
  else
    acquire(1);
}

 

CAS 的實現原理

protected final boolean compareAndSetState(int expect, int update) {
  // See below for intrinsics setup to support this   

  return unsafe.compareAndSwapInt(this,stateOffset, expect, update);

}

通過 cas 樂觀鎖的方式來做比較並替換,這段代碼的意思是,如果當前內存中的 state 的值和預期值 expect 相等,則替換為 update。更新成功返回 true,否則返
回 false. 這個操作是原子的,不會出現線程安全問題,這里面涉及到Unsafe這個類的操作, 以及涉及到 state 這個屬性的意義。
state 是 AQS 中的一個屬性,它在不同的實現中所表達的含義不一樣,對於重入 鎖的實現來說,表示一個同步狀態。它有兩個含義的表示
1. 當 state=0 時,表示無鎖狀態
2. 當 state>0 時,表示已經有線程獲得了鎖,也就是 state=1,但是因為 ReentrantLock 允許重入,所以同一個線程多次獲得同步鎖的時候,state 會遞增, 比如重入 5 次,那么 state=5。而在釋放鎖的時候,同樣需要釋放 5 次直到 state=0 其他線程才有資格獲得鎖
Unsafe 類
Unsafe 類是在 sun.misc 包下,不屬於 Java 標准。但是很多 Java 的基礎類庫,包 括一些被廣泛使用的高性能開發庫都是基於 Unsafe 類開發的,比如 Netty、 Hadoop、Kafka 等; Unsafe 可認為是 Java 中留下的后門,提供了一些低層次操作,如直接內存訪問、 線程的掛起和恢復、CAS、線程同步、內存屏障。 而 CAS 就是 Unsafe 類中提供的一個原子操作,第一個參數為需要改變的對象, 第二個為偏移量(即之前求出來的 headOffset 的值),第三個參數為期待的值,第 四個為更新后的值。整個方法的作用是如果當前時刻的值等於預期值 var4 相等,則, 更新為新的期望值 var5,如果更新成功,則返回 true,否則返回 false;
stateOffset
一個 Java 對象可以看成是一段內存,每個字段都得按照一定的順序放在這段內存 里,通過這個方法可以准確地告訴你某個字段相對於對象的起始內存地址的字節 偏移。用於在后面的 compareAndSwapInt 中,去根據偏移量找到對象在內存中的 具體位置 所以 stateOffset 表示 state 這個字段在 AQS 類的內存中相對於該類首地址的偏移
compareAndSwapInt
在 unsafe.cpp 文件中,可以找到 compareAndSwarpInt 的實現
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj); //將 Java 對象解析成 JVM 的 oop(普通對象指針),
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); //根據對象 p和地址偏移量找到地址
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e; //基於 cas 比較並替換, x 表示需要更新的值,addr 表示 state 在內存中的地址,e 表示預期值
UNSAFE_END

 

AQS.accquire
acquire 是 AQS 中的方法,如果 CAS 操作未能成功,說明 state 已經不為 0,此 時繼續 acquire(1)操作
➢ 大家思考一下,acquire 方法中的 1 的參數是用來做什么呢? 這個方法的主要邏輯是
1. 通過 tryAcquire 嘗試獲取獨占鎖,如果成功返回 true,失敗返回 false
2. 如果 tryAcquire 失敗,則會通過 addWaiter 方法將當前線程封裝成 Node 添加
到 AQS 隊列尾部
3. acquireQueued,將 Node 作為參數,通過自旋去嘗試獲取鎖。
public final void acquire(int arg) {
  if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}

  

NonfairSync.tryAcquire
這個方法的作用是嘗試獲取鎖,如果成功返回 true,不成功返回 false, 它是重寫 AQS 類中的 tryAcquire 方法,並且大家仔細看一下 AQS 中 tryAcquire 方法的定義,並沒有實現,而是拋出異常。按照一般的思維模式,既然是一個不實 現的模版方法,那應該定義成 abstract,讓子類來實現呀?大家想想為什么
protected final boolean tryAcquire(int acquires) {
  return nonfairTryAcquire(acquires);
}

 

ReentrantLock.nofairTryAcquire
1. 獲取當前線程,判斷當前的鎖的狀態
2. 如果 state=0 表示當前是無鎖狀態,通過 cas 更新 state 狀態的值
3. 當前線程是屬於重入,則增加重入次數
final boolean nonfairTryAcquire(int acquires) {
  final Thread current = Thread.currentThread();//獲取當前執行的線程
  int c = getState();//獲得 state 的值
  if (c == 0) {//表示無鎖狀態
    if (compareAndSetState(0, acquires)) {//cas 替換 state 的值,cas 成功表示獲取鎖成功
      setExclusiveOwnerThread(current);//保存當前獲得鎖的線程,下次再來的時候不要再嘗試競爭鎖       return true;     }   }   else if (current == getExclusiveOwnerThread()) {//如果同一個線程來獲得鎖,直接增加重入次數     int nextc = c + acquires;     if (nextc < 0) // overflow       throw new Error("Maximum lock count exceeded");     setState(nextc);     return true;   }   return false; }
AQS.addWaiter
當 tryAcquire 方法獲取鎖失敗以后,則會先調用 addWaiter 將當前線程封裝成 Node. 入參 mode 表示當前節點的狀態,傳遞的參數是 Node.EXCLUSIVE,表示獨占狀 態。意味着重入鎖用到了 AQS 的獨占鎖功能
1. 將當前線程封裝成 Node
2. 當前鏈表中的 tail 節點是否為空,如果不為空,則通過 cas 操作把當前線程的node 添加到 AQS 隊列
3. 如果為空或者 cas 失敗,調用 enq 將節點添加到 AQS 隊列
private Node addWaiter(Node mode) {
  Node node
= new Node(Thread.currentThread(), mode);//當前線程封裝為 Node   Node pred = tail; //tail 是 AQS 中表示同比隊列隊尾的屬性,默認null   if (pred != null) {//tail 不為空的情況下,說明隊列中存在節點     node.prev = pred;//把當前線程的 Node 的 prev 指向 tail     if (compareAndSetTail(pred, node)) {//通過 cas 把 node加入到 AQS 隊列,也就是設置為 tail       pred.next = node;//設置成功以后,把原 tail 節點的 next指向當前 node       return node;     }   }   enq(node);//tail=null或者compareAndSetTail(pred, node)=false,把 node 添加到同步隊列  
  return node;
}
enq
enq 就是通過自旋操作把當前節點加入到隊列中
private Node enq(final Node node) {
  for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
    if (compareAndSetHead(new Node()))
      tail = head;
    } else {
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}

 

圖解分析
假設 3 個線程來爭搶鎖,那么截止到 enq 方法運行結束之后,或者調用 addwaiter 方法結束后,AQS 中的鏈表結構圖
AQS.acquireQueued
通過 addWaiter 方法把線程添加到鏈表后,會接着把 Node 作為參數傳遞給 acquireQueued 方法,去競爭鎖
1. 獲取當前節點的 prev 節點
2. 如果 prev 節點為 head 節點,那么它就有資格去爭搶鎖,調用 tryAcquire 搶占
3. 搶占鎖成功以后,把獲得鎖的節點設置為 head,並且移除原來的初始化 head 節點
4. 如果獲得鎖失敗,則根據 waitStatus 決定是否需要掛起線程
5. 最后,通過 cancelAcquire 取消獲得鎖的操作
final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();//取當前節點的 prev 節點
      if (p == head && tryAcquire(arg)) {//果是 head 節點,說明有資格去爭搶鎖
        setHead(node);//獲取鎖成功,也就是ThreadA 已經釋放了鎖,然后設置 head 為 ThreadB 獲得執行權限
        p.next = null; //把原 head 節點從鏈表中移除
        failed = false;
        return interrupted;
      }
      //ThreadA 可能還沒釋放鎖,使得 ThreadB 在執行 tryAcquire 時會返回 false
      if (shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt())
        interrupted
= true; //並且返回當前線程在等待過程中有沒有中斷過。     }   } finally {     if (failed)       cancelAcquire(node);   } }

 

NofairSync.tryAcquire
這個方法在前面分析過,就是通過 state 的狀態來判斷是否處於無鎖狀態,然后在 通過 cas 進行競爭鎖操作。成功表示獲得鎖,失敗表示獲得鎖失敗
shouldParkAfterFailedAcquire
如果 ThreadA 的鎖還沒有釋放的情況下,ThreadB 和 ThreadC 來爭搶鎖肯定是會 失敗,那么失敗以后會調用 shouldParkAfterFailedAcquire 方法
Node 有 5 中狀態,分別是:CANCELLED(1),SIGNAL(-1)、CONDITION(- 2)、PROPAGATE(-3)、默認狀態(0)
CANCELLED: 在同步隊列中等待的線程等待超時或被中斷,需要從同步隊列中取 消該 Node 的結點, 其結點的 waitStatus 為 CANCELLED,即結束狀態,進入該狀 態后的結點將不會再變化
SIGNAL: 只要前置節點釋放鎖,就會通知標識為 SIGNAL 狀態的后續節點的線程
CONDITION: 和 Condition 有關系,后續會講解
PROPAGATE:共享模式下,PROPAGATE 狀態的線程處於可運行狀態  0:初始狀態,
這個方法的主要作用是,通過 Node 的狀態來判斷,ThreadA 競爭鎖失敗以后是 否應該被掛起。
1. 如果 ThreadA 的 pred 節點狀態為 SIGNAL,那就表示可以放心掛起當前線程
2. 通過循環掃描鏈表把 CANCELLED 狀態的節點移除
3. 修改 pred 節點的狀態為 SIGNAL,返回 false.
返回 false 時,也就是不需要掛起,返回 true,則需要調用 parkAndCheckInterrupt掛起當前線程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  int ws = pred.waitStatus;//前置節點的waitStatus
  if (ws == Node.SIGNAL)//如果前置節點為 SIGNAL,意味着只需要等待其他前置節點的線程被釋放,
    return true;//返回 true,意味着可以直接放心的掛起了
  if (ws > 0) {//ws 大於 0,意味着 prev 節點取消了排隊,直接移除這個節點就行
  do {
    node.prev = pred = pred.prev;//相當於: pred=pred.prev;
    node.prev=pred;
  } while (pred.waitStatus > 0); //這里采用循環,從雙向列表中移除 CANCELLED 的節點
    pred.next = node;
  } else {//利用 cas 設置 prev 節點的狀態為 SIGNAL(-1)
    compareAndSetWaitStatus(pred, ws,Node.SIGNAL);
  }
  return false;
}
parkAndCheckInterrupt
使用 LockSupport.park 掛起當前線程編程 WATING 狀態
Thread.interrupted,返回當前線程是否被其他線程觸發過中斷請求,也就是 thread.interrupt(); 如果有觸發過中斷請求,那么這個方法會返回當前的中斷標識 true,並且對中斷標識進行復位標識已經響應過了中斷請求。如果返回 true,意味 着在 acquire 方法中會執行 selfInterrupt()。
private final boolean parkAndCheckInterrupt() {
  LockSupport.park(this);
  return Thread.interrupted();
}
selfInterrupt: 標識如果當前線程在 acquireQueued 中被中斷過,則需要產生一 個中斷請求,原因是線程在調用 acquireQueued 方法的時候是不會響應中斷請求
static void selfInterrupt() {
  Thread.currentThread().interrupt();
}
圖解分析
通過 acquireQueued 方法來競爭鎖,如果 ThreadA 還在執行中沒有釋放鎖的話, 意味着 ThreadB 和 ThreadC 只能掛起了。

LockSupport

LockSupport類是Java6引入的一個類,提供了基本的線程同步原語。LockSupport 實際上是調用了 Unsafe 類里的函數,歸結到 Unsafe 里,只有兩個函數, unpark 函數為線程提供“許可(permit)”,線程調用 park 函數則等待“許可”。這個有 點像信號量,但是這個“許可”是不能疊加的,“許可”是一次性的。
permit 相當於 0/1 的開關,默認是 0,調用一次 unpark 就加 1 變成了 1.調用一次park 會消費 permit,又會變成 0。 如果再調用一次 park 會阻塞,因為 permit 已 經是 0 了。直到 permit 變成 1.這時調用 unpark 會把 permit 設置為 1.每個線程都 有一個相關的 permit,permit 最多只有一個,重復調用 unpark 不會累積

鎖的釋放流程

如果這個時候 ThreadA 釋放鎖了,那么我們來看鎖被釋放后會產生什么效果。

ReentrantLock.unlock

在 unlock 中,會調用 release 方法來釋放鎖
public final boolean release(int arg) {
  if (tryRelease(arg)) { //釋放鎖成功
    Node h = head; //得到 aqs 中 head 節點
    if (h != null && h.waitStatus != 0)//如果 head 節點不為空並且狀態!=0.調用 unparkSuccessor(h)喚醒后續節點
       unparkSuccessor(h);
    return true;
  }
  return false;
}
ReentrantLock.tryRelease
這個方法可以認為是一個設置鎖狀態的操作,通過將 state 狀態減掉傳入的參數值 (參數是 1),如果結果狀態為 0,就將排它鎖的 Owner 設置為 null,以使得其它的線程有機會進行執行。 在排它鎖中,加鎖的時候狀態會增加 1(當然可以自己修改這個值),在解鎖的時 候減掉 1,同一個鎖,在可以重入后,可能會被疊加為 2、3、4 這些值,只有 unlock() 的次數與 lock()的次數對應才會將 Owner 線程設置為空,而且也只有這種情況下 才會返回 true。
protected final boolean tryRelease(int releases){
  int c = getState() - releases;
  if (Thread.currentThread() !=getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
  boolean free = false;
  if (c == 0) {
    free = true;
    setExclusiveOwnerThread(null);
  }
  setState(c);
  return free;
}

unparkSuccessor

 private void unparkSuccessor(Node node) {
  int ws = node.waitStatus;//獲得 head 節點的狀態
  if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);// 設置 head 節點狀態為 0
  Node s = node.next;//得到 head 節點的下一個節點
  if (s == null || s.waitStatus > 0) {
    //如果下一個節點為 null 或者 status>0 表示 cancelled 狀態.
    //通過從尾部節點開始掃描,找到距離 head 最近的一個waitStatus<=0 的節點
    s = null;
    for (Node t = tail; t != null && t != node; t =t.prev) 
      if (t.waitStatus <= 0)
        s = t;
  }
   if (s != null) //next 節點不為空,直接喚醒這個線程即可
      LockSupport.unpark(s.thread);
}

 

為什么在釋放鎖的時候是從 tail 進行掃描

我覺得有必要單獨擰出來說一下,我們再回到 enq 那個方法、。在標注為紅色部分的代碼來看一個新的節點是如何加入到鏈表中的
1. 將新的節點的 prev 指向 tail
2. 通過 cas 將 tail 設置為新的節點,因為 cas 是原子操作所以能夠保證線程安全性
3. t.next=node;設置原 tail 的 next 節點指向新的節點
private Node enq(final Node node) {
  for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;       }
    }
  }
}

 

在 cas 操作之后,t.next=node 操作之前。存在其他線程調用 unlock 方法從 head 開始往后遍歷,由於 t.next=node 還沒執行意味着鏈表的關系還沒有建立完整。
就會導致遍歷到 t 節點的時候被中斷。所以從后往前遍歷,一定不會存在這個問 題。
圖解分析
通過鎖的釋放,原本的結構就發生了一些變化。head 節點的 waitStatus 變成了 0, ThreadB 被喚醒

原本掛起的線程繼續執行

通過 ReentrantLock.unlock,原本掛起的線程被喚醒以后繼續執行,應該從哪里執 行大家還有印象吧。 原來被掛起的線程是在 acquireQueued 方法中,所以被喚
醒以后繼續從這個方法開始執行
AQS.acquireQueued
這個方法前面已經完整分析過了,我們只關注一下 ThreadB 被喚醒以后的執行流 程。
由於 ThreadB 的 prev 節點指向的是 head,並且 ThreadA 已經釋放了鎖。所以這 個時候調用 tryAcquire 方法時,可以順利獲取到鎖
1. 把 ThreadB 節點當成 head
2. 把原 head 節點的 next 節點指向為 null
final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
    if (shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt())
      interrupted = true;
    }
  } finally {
    if (failed)
    cancelAcquire(node);
  } }

 

圖解分析
1. 設置新 head 節點的 prev=null
2. 設置原 head 節點的 next 節點為 null

公平鎖和非公平鎖的區別

鎖的公平性是相對於獲取鎖的順序而言的,如果是一個公平鎖,那么鎖的獲取順序 就應該符合請求的絕對時間順序,也就是 FIFO。 在上面分析的例子來說,只要 CAS 設置同步狀態成功,則表示當前線程獲取了鎖,而公平鎖則不一樣,差異點 有兩個
FairSync.tryAcquire
final void lock() {
  acquire(1);
}
非公平鎖在獲取鎖的時候,會先通過 CAS 進行搶占,而公平鎖則不會

FairSync.tryAcquire

protected final boolean tryAcquire(int acquires) {
  final Thread current = Thread.currentThread();
  int c = getState();
  if (c == 0) {
    if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
      setExclusiveOwnerThread(current);
      return true;
    }
  }
  else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0)
      throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
  }
  return false;
}
這個方法與 nonfairTryAcquire(int acquires)比較,不同的地方在於判斷條件多了 hasQueuedPredecessors()方法,也就是加入了[同步隊列中當前節點是否有前驅節點]的判斷,如果該方法返回 true,則表示有線程比當前線程更早地請求獲取鎖, 因此需要等待前驅線程獲取並釋放鎖之后才能繼續獲取鎖。

Condition

在前面學習 synchronized 的時候,有講到 wait/notify 的基本使用,結合 synchronized 可以實現對線程的通信。那么這個時候我就在思考了,既然 J.U.C 里 面提供了鎖的實現機制,那 J.U.C 里面有沒有提供類似的線程通信的工具呢? 於 是找阿找,發現了一個 Condition 工具類。 Condition 是一個多線程協調通信的工具類,可以讓某些線程一起等待某個條件
(condition),只有滿足條件時,線程才會被喚醒。

Condition 的基本使用

ConditionWait
public class ConditionDemoWait implements Runnable{
  private Lock lock;
  private Condition condition;
  public ConditionDemoWait(Lock lock, Condition condition){
    this.lock=lock;
    this.condition=condition;
  }
  @Override   
public void run() {     System.out.println("begin -ConditionDemoWait");     try {       lock.lock();       condition.await();       System.out.println("end - ConditionDemoWait");     } catch (InterruptedException e) {       e.printStackTrace();     }finally {       lock.unlock();     }   } }

 

ConditionSignal

public class ConditionDemoSignal implements Runnable{
  private Lock lock;
  private Condition condition;
  public ConditionDemoSignal(Lock lock, Condition condition){
    this.lock=lock;
    this.condition=condition;   }   @Override   public void run() {     System.out.println("begin -ConditionDemoSignal");     try {       lock.lock();       condition.signal();       System.out.println("end - ConditionDemoSignal");     }finally {       lock.unlock();     }   } }
通過這個案例簡單實現了 wait 和 notify 的功能,當調用 await 方法后,當前線程 會釋放鎖並等待,而其他線程調用 condition 對象的 signal 或者 signalall 方法通
知並被阻塞的線程,然后自己執行 unlock 釋放鎖,被喚醒的線程獲得之前的鎖繼 續執行,最后釋放鎖。
所以,condition 中兩個最重要的方法,一個是 await,一個是 signal 方法
await:把當前線程阻塞掛起
signal:喚醒阻塞的線程

Condition 源碼分析

調用 Condition,需要獲得 Lock 鎖,所以意味着會存在一個 AQS 同步隊列,先來 看 Condition.await 方法
condition.await
調用 Condition 的 await()方法(或者以 await 開頭的方法),會使當前線程進入等 待隊列並釋放鎖,同時線程狀態變為等待狀態。當從 await()方法返回時,當前線 程一定獲取了 Condition 相關聯的鎖
public final void await() throws InterruptedException {
    if (Thread.interrupted())
      throw new InterruptedException();
    Node node = addConditionWaiter(); //創建一個新的節點,節點狀態為 condition,采用的數據結構仍然是鏈表
    int savedState = fullyRelease(node); //釋放當前的鎖,得到鎖的狀態,並喚醒 AQS 隊列中的一個線程
    int interruptMode = 0;
    //如果當前節點沒有在同步隊列上,即還沒有被 signal,則將當前線程阻塞
    while (!isOnSyncQueue(node)) {//判斷這個節點是否在 AQS 隊列上,第一次判斷的是 false,因為前面已經釋放鎖了
      LockSupport.park(this); // 第一次總是 park 自己,開始阻塞等待
      // 線程判斷自己在等待過程中是否被中斷了,如果沒有中斷,則再次循環,會在 isOnSyncQueue 中判斷自己是否在隊列上.
      // isOnSyncQueue 判斷當前 node 狀態,如果是 CONDITION 狀態,或者不在隊列上了,就繼續阻塞.
      // isOnSyncQueue 判斷當前 node 還在隊列上且不是 CONDITION 狀態了,就結束循環和阻塞.       if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)         break;     }     // 當這個線程醒來,會嘗試拿鎖, 當 acquireQueued 返回 false 就是拿到鎖了.     // interruptMode != THROW_IE -> 表示這個線程沒有成功將 node 入隊,但 signal 執行了 enq 方法讓其入隊了.     // 將這個變量設置成 REINTERRUPT.     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)       interruptMode = REINTERRUPT;     // 如果 node 的下一個等待者不是 null, 則進行清理,清理 Condition 隊列上的節點.     // 如果是 null ,就沒有什么好清理的了.     if (node.nextWaiter != null) // clean up if cancelled       unlinkCancelledWaiters();     // 如果線程被中斷了,需要拋出異常.或者什么都不做     if (interruptMode != 0)       reportInterruptAfterWait(interruptMode); }

 

Condition.signal
調用 Condition 的 signal()方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點之前,會將節點移到同步隊列中
public final void signal() {
  if (!isHeldExclusively()) //先判斷當前線程是否獲得了鎖
    throw new IllegalMonitorStateException();
  Node first = firstWaiter; // 拿到 Condition 隊列上第一個節點
  if (first != null)
    doSignal(first);
}

Condition.doSignal

private void doSignal(Node first) {
  do {
    if ( (firstWaiter = first.nextWaiter) == null)// 如果第一個節點的下一個節點是 null, 那么, 最后一個節點也是 null.
      lastWaiter = null; // 將 next 節點設置成 null
      first.nextWaiter = null;
  } while (!transferForSignal(first) && (first = firstWaiter) != null);
  }

 

該方法先是 CAS 修改了節點狀態,如果成功,就將這個節點放到 AQS 隊列中, 然后喚醒這個節點上的線程。此時,那個節點就會在 await 方法中蘇醒
final boolean transferForSignal(Node node) {
  /*
  * If cannot change waitStatus, the node has been cancelled.
  */
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
  Node p = enq(node);
  int ws = p.waitStatus;
  // 如果上一個節點的狀態被取消了, 或者嘗試設置上一個節點的狀態為 SIGNAL失敗了(SIGNAL 表示: 他的 next 節點需要停止阻塞),
  if (ws > 0 || !compareAndSetWaitStatus(p, ws,Node.SIGNAL))
    LockSupport.unpark(node.thread); // 喚醒輸入節點上的線程.
  return true;
}

 

AQS.transferForSignal

該方法先是 CAS 修改了節點狀態,如果成功,就將這個節點放到 AQS 隊列中, 然后喚醒這個節點上的線程。此時,那個節點就會在 await 方法中蘇醒
final boolean transferForSignal(Node node) {
  /*
  * If cannot change waitStatus, the node has been cancelled.*/
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
  Node p = enq(node);
  int ws = p.waitStatus;
  // 如果上一個節點的狀態被取消了, 或者嘗試設置上一個節點的狀態為 SIGNAL失敗了(SIGNAL 表示: 他的 next 節點需要停止阻塞),
  if (ws > 0 || !compareAndSetWaitStatus(p, ws,Node.SIGNAL))
    LockSupport.unpark(node.thread); // 喚醒輸入節點上的線程.
  return true;
}

 

Condition 總結

阻塞:await()方法中,在線程釋放鎖資源之后,如果節點不在 AQS 等待隊 列,則阻塞當前線程,如果在等待隊列,則自旋等待嘗試獲取鎖
釋放:signal()后,節點會從 condition 隊列移動到 AQS 等待隊列,則進入 正常鎖的獲取流程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM