ReentrantLock實現原理深入探究


前言

這篇文章被歸到Java基礎分類中,其實真的一點都不基礎。網上寫ReentrantLock的使用、ReentrantLock和synchronized的區別的文章很多,研究ReentrantLock並且能講清楚ReentrantLock的原理的文章很少,本文就來研究一下ReentrantLock的實現原理。研究ReentrantLock的實現原理需要比較好的Java基礎以及閱讀代碼的能力,有些朋友看不懂沒關系,可以以后看,相信你一定會有所收獲。

最后說一句,ReentrantLock是基於AQS實現的,這在下面會講到,AQS的基礎又是CAS,如果不是很熟悉CAS的朋友,可以看一下這篇文章Unsafe與CAS

 

AbstractQueuedSynchronizer

ReentrantLock實現的前提就是AbstractQueuedSynchronizer,簡稱AQS,是java.util.concurrent的核心,CountDownLatch、FutureTask、Semaphore、ReentrantLock等都有一個內部類是這個抽象類的子類。先用兩張表格介紹一下AQS。第一個講的是Node,由於AQS是基於FIFO隊列的實現,因此必然存在一個個節點,Node就是一個節點,Node里面有:

屬    性 定    義
Node SHARED = new Node() 表示Node處於共享模式
Node EXCLUSIVE = null 表示Node處於獨占模式
int CANCELLED = 1 因為超時或者中斷,Node被設置為取消狀態,被取消的Node不應該去競爭鎖,只能保持取消狀態不變,不能轉換為其他狀態,處於這種狀態的Node會被踢出隊列,被GC回收
int SIGNAL = -1 表示這個Node的繼任Node被阻塞了,到時需要通知它
 int CONDITION = -2 表示這個Node在條件隊列中,因為等待某個條件而被阻塞 
int PROPAGATE = -3 使用在共享模式頭Node有可能處於這種狀態, 表示鎖的下一次獲取可以無條件傳播
 int waitStatus 0,新Node會處於這種狀態 
 Node prev 隊列中某個Node的前驅Node 
 Node next 隊列中某個Node的后繼Node 
Thread thread 這個Node持有的線程,表示等待鎖的線程
Node nextWaiter 表示下一個等待condition的Node

看完了Node,下面再看一下AQS中有哪些變量和方法:

屬性/方法 含    義
Thread exclusiveOwnerThread 這個是AQS父類AbstractOwnableSynchronizer的屬性,表示獨占模式同步器的當前擁有者
Node 上面已經介紹過了,FIFO隊列的基本單位
Node head FIFO隊列中的頭Node
Node tail FIFO隊列中的尾Node
int state 同步狀態,0表示未鎖
int getState() 獲取同步狀態
setState(int newState) 設置同步狀態
boolean compareAndSetState(int expect, int update)  利用CAS進行State的設置 
 long spinForTimeoutThreshold = 1000L 線程自旋等待的時間 
Node enq(final Node node)  插入一個Node到FIFO隊列中 
Node addWaiter(Node mode) 為當前線程和指定模式創建並擴充一個等待隊列
void setHead(Node node) 設置隊列的頭Node
void unparkSuccessor(Node node) 如果存在的話,喚起Node持有的線程
void doReleaseShared() 共享模式下做釋放鎖的動作
void cancelAcquire(Node node) 取消正在進行的Node獲取鎖的嘗試
boolean shouldParkAfterFailedAcquire(Node pred, Node node) 在嘗試獲取鎖失敗后是否應該禁用當前線程並等待
void selfInterrupt() 中斷當前線程本身
boolean parkAndCheckInterrupt() 禁用當前線程進入等待狀態並中斷線程本身
boolean acquireQueued(final Node node, int arg) 隊列中的線程獲取鎖
tryAcquire(int arg) 嘗試獲得鎖(由AQS的子類實現它
tryRelease(int arg) 嘗試釋放鎖(由AQS的子類實現它
isHeldExclusively() 是否獨自持有鎖
acquire(int arg) 獲取鎖
release(int arg) 釋放鎖
compareAndSetHead(Node update) 利用CAS設置頭Node
compareAndSetTail(Node expect, Node update) 利用CAS設置尾Node
compareAndSetWaitStatus(Node node, int expect, int update) 利用CAS設置某個Node中的等待狀態

上面列出了AQS中最主要的一些方法和屬性。整個AQS是典型的模板模式的應用,設計得十分精巧,對於FIFO隊列的各種操作在AQS中已經實現了,AQS的子類一般只需要重寫tryAcquire(int arg)和tryRelease(int arg)兩個方法即可

 

ReentrantLock的實現

ReentrantLock中有一個抽象類Sync:

private final Sync sync;

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
    ...
}

ReentrantLock根據傳入構造方法的布爾型參數實例化出Sync的實現類FairSync和NonfairSync,分別表示公平的Sync和非公平的Sync。由於ReentrantLock我們用的比較多的是非公平鎖,所以看下非公平鎖是如何實現的。假設線程1調用了ReentrantLock的lock()方法,那么線程1將會獨占鎖,整個調用鏈十分簡單:

第一個獲取鎖的線程就做了兩件事情:

1、設置AbstractQueuedSynchronizer的state為1

2、設置AbstractOwnableSynchronizer的thread為當前線程

這兩步做完之后就表示線程1獨占了鎖。然后線程2也要嘗試獲取同一個鎖,在線程1沒有釋放鎖的情況下必然是行不通的,所以線程2就要阻塞。那么,線程2如何被阻塞?看下線程2的方法調用鏈,這就比較復雜了:

調用鏈看到確實非常長,沒關系,結合代碼分析一下,其實ReentrantLock沒有那么復雜,我們一點點來扒代碼:

 1 final void lock() {
 2     if (compareAndSetState(0, 1))
 3         setExclusiveOwnerThread(Thread.currentThread());
 4     else
 5         acquire(1);
 6 }

首先線程2嘗試利用CAS去判斷state是不是0,是0就設置為1,當然這一步操作肯定是失敗的,因為線程1已經將state設置成了1,所以第2行必定是false,因此線程2走第5行的acquire方法:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

從字面上就很好理解這個if的意思,先走第一個判斷條件嘗試獲取一次鎖,如果獲取的結果為false即失敗,走第二個判斷條件添加FIFO等待隊列。所以先看一下tryAcquire方法做了什么,這個方法最終調用到的是Sync的nonfairTryAcquire方法:

 1 final boolean nonfairTryAcquire(int acquires) {
 2     final Thread current = Thread.currentThread();
 3     int c = getState();
 4     if (c == 0) {
 5         if (compareAndSetState(0, acquires)) {
 6             setExclusiveOwnerThread(current);
 7             return true;
 8         }
 9     }
10     else if (current == getExclusiveOwnerThread()) {
11         int nextc = c + acquires;
12         if (nextc < 0) // overflow
13             throw new Error("Maximum lock count exceeded");
14         setState(nextc);
15         return true;
16     }
17     return false;
18 }

由於state是volatile的,所以state對線程2具有可見性,線程2拿到最新的state,再次判斷一下能否持有鎖(可能線程1同步代碼執行得比較快,這會兒已經釋放了鎖),不可以就返回false。

注意一下第10~第16行,這段代碼的作用是讓某個線程可以多次調用同一個ReentrantLock,每調用一次給state+1,由於某個線程已經持有了鎖,所以這里不會有競爭,因此不需要利用CAS設置state(相當於一個偏向鎖)。從這段代碼可以看到,nextc每次加1,當nextc<0的時候拋出error,那么同一個鎖最多能重入Integer.MAX_VALUE次,也就是2147483647。

然后就走到if的第二個判斷里面了,先走AQS的addWaiter方法:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

先創建一個當前線程的Node,模式為獨占模式(因為傳入的mode是一個NULL),再判斷一下隊列上有沒有節點,沒有就創建一個隊列,因此走enq方法:

 1 private Node enq(final Node node) {
 2     for (;;) {
 3         Node t = tail;
 4         if (t == null) { // Must initialize
 5             Node h = new Node(); // Dummy header
 6             h.next = node;
 7             node.prev = h;
 8             if (compareAndSetHead(h)) {
 9                 tail = node;
10                 return h;
11             }
12         }
13         else {
14             node.prev = t;
15             if (compareAndSetTail(t, node)) {
16                 t.next = node;
17                 return t;
18             }
19         }
20     }
21 }

這個方法其實畫一張圖應該比較好理解,形成一個隊列之后應該是這樣的:

每一步都用圖表示出來了,由於線程2所在的Node是第一個要等待的Node,因此FIFO隊列上肯定沒有內容,tail為null,走的就是第4行~第10行的代碼邏輯。這里用了CAS設置頭Node,當然有可能線程2設置頭Node的時候CPU切換了,線程3已經把頭Node設置好了形成了上圖所示的一個隊列,這時線程2再循環一次獲取tail,由於tail是volatile的,所以對線程2可見,線程2看見tail不為null,就走到了13行的else里面去往尾Node后面添加自身。整個過程下來,形成了一個雙向隊列。最后走AQS的acquireQueued(node, 1):

 1 final boolean acquireQueued(final Node node, int arg) {
 2     try {
 3         boolean interrupted = false;
 4         for (;;) {
 5             final Node p = node.predecessor();
 6             if (p == head && tryAcquire(arg)) {
 7                 setHead(node);
 8                 p.next = null; // help GC
 9                 return interrupted;
10             }
11             if (shouldParkAfterFailedAcquire(p, node) &&
12                 parkAndCheckInterrupt())
13                 interrupted = true;
14         }
15     } catch (RuntimeException ex) {
16         cancelAcquire(node);
17         throw ex;
18     }
19 }

此時再做判斷,由於線程2是雙向隊列的真正的第一個Node(前面還有一個h),所以第5行~第10行再次判斷一下線程2能不能獲取鎖(可能這段時間內線程1已經執行完了把鎖釋放了,state從1變為了0),如果還是不行,先調用AQS的shouldParkAfterFailedAcquire(p, node)方法:

 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2     int s = pred.waitStatus;
 3     if (s < 0)
 4         /*
 5          * This node has already set status asking a release
 6          * to signal it, so it can safely park
 7          */
 8         return true;
 9     if (s > 0) {
10         /*
11          * Predecessor was cancelled. Skip over predecessors and
12          * indicate retry.
13          */
14     do {
15     node.prev = pred = pred.prev;
16     } while (pred.waitStatus > 0);
17     pred.next = node;
18 }
19     else
20         /*
21          * Indicate that we need a signal, but don't park yet. Caller
22          * will need to retry to make sure it cannot acquire before
23          * parking.
24          */
25          compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
26     return false;
27 }

吐槽一下先,這段代碼的代碼格式真糟糕(看來JDK的開發大牛們也有寫得不好的地方),這個waitStatus是h的waitStatus,很明顯是0,所以此時把h的waitStatus設置為Noed.SIGNAL即-1並返回false。既然返回了false,上面的acquireQueued的11行if自然不成立,再走一次for循環,還是先嘗試獲取鎖,不成功,繼續走shouldParkAfterFailedAcquire,此時waitStatus為-1,小於0,走第三行的判斷,返回true。然后走acquireQueued的11行if的第二個判斷條件parkAndCheckInterrupt:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    unsafe.park(false, 0L);
    setBlocker(t, null);
}

最后一步,調用LockSupport的park方法阻塞住了當前的線程。至此,使用ReentrantLock讓線程1獨占鎖、線程2進入FIFO隊列並阻塞的完整流程已經整理出來了。

lock()的操作明了之后,就要探究一下unlock()的時候代碼又做了什么了,接着看下一部分。

 

unlock()的時候做了什么

就不畫流程圖了,直接看一下代碼流程,比較簡單,調用ReentrantLock的unlock方法:

public void unlock() {
    sync.release(1);
}

走AQS的release:

1 public final boolean release(int arg) {
2     if (tryRelease(arg)) {
3         Node h = head;
4         if (h != null && h.waitStatus != 0)
5            unparkSuccessor(h);
6         return true;
7     }
8     return false;
9 }

先調用Sync的tryRelease嘗試釋放鎖:

 1 protected final boolean tryRelease(int releases) {
 2     int c = getState() - releases;
 3     if (Thread.currentThread() != getExclusiveOwnerThread())
 4         throw new IllegalMonitorStateException();
 5     boolean free = false;
 6     if (c == 0) {
 7         free = true;
 8         setExclusiveOwnerThread(null);
 9     }
10     setState(c);
11     return free;
12 }

首先,只有當c==0的時候才會讓free=true,這和上面一個線程多次調用lock方法累加state是對應的,調用了多少次的lock()方法自然必須調用同樣次數的unlock()方法才行,這樣才把一個鎖給全部解開。

當一條線程對同一個ReentrantLock全部解鎖之后,AQS的state自然就是0了,AbstractOwnableSynchronizer的exclusiveOwnerThread將被設置為null,這樣就表示沒有線程占有鎖,方法返回true。代碼繼續往下走,上面的release方法的第四行,h不為null成立,h的waitStatus為-1,不等於0也成立,所以走第5行的unparkSuccessor方法:

 1 private void unparkSuccessor(Node node) {
 2     /*
 3      * Try to clear status in anticipation of signalling.  It is
 4      * OK if this fails or if status is changed by waiting thread.
 5      */
 6     compareAndSetWaitStatus(node, Node.SIGNAL, 0);
 7 
 8     /*
 9      * Thread to unpark is held in successor, which is normally
10      * just the next node.  But if cancelled or apparently null,
11      * traverse backwards from tail to find the actual
12      * non-cancelled successor.
13      */
14     Node s = node.next;
15     if (s == null || s.waitStatus > 0) {
16         s = null;
17         for (Node t = tail; t != null && t != node; t = t.prev)
18             if (t.waitStatus <= 0)
19                 s = t;
20    }
21     if (s != null)
22         LockSupport.unpark(s.thread);
23 }

s即h的下一個Node,這個Node里面的線程就是線程2,由於這個Node不等於null,所以走21行,線程2被unPark了,得以運行。有一個很重要的問題是:鎖被解了怎樣保證整個FIFO隊列減少一個Node呢?這是一個很巧妙的設計,又回到了AQS的acquireQueued方法了:

 1 final boolean acquireQueued(final Node node, int arg) {
 2     try {
 3         boolean interrupted = false;
 4         for (;;) {
 5             final Node p = node.predecessor();
 6             if (p == head && tryAcquire(arg)) {
 7                 setHead(node);
 8                 p.next = null; // help GC
 9                 return interrupted;
10             }
11             if (shouldParkAfterFailedAcquire(p, node) &&
12                 parkAndCheckInterrupt())
13                 interrupted = true;
14         }
15     } catch (RuntimeException ex) {
16         cancelAcquire(node);
17         throw ex;
18     }
19 }

被阻塞的線程2是被阻塞在第12行,注意這里並沒有return語句,也就是說,阻塞完成線程2依然會進行for循環。然后,阻塞完成了,線程2所在的Node的前驅Node是p,線程2嘗試tryAcquire,成功,然后線程2就成為了head節點了,把p的next設置為null,這樣原頭Node里面的所有對象都不指向任何塊內存空間,h屬於棧內存的內容,方法結束被自動回收,這樣隨着方法的調用完畢,原頭Node也沒有任何的引用指向它了,這樣它就被GC自動回收了。此時,遇到一個return語句,acquireQueued方法結束,后面的Node也是一樣的原理。

這里有一個細節,看一下setHead方法:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

setHead方法里面的前驅Node是Null,也沒有線程,那么為什么不用一個在等待的線程作為Head Node呢?

因為一個線程隨時有可能因為中斷而取消,而取消的話,Node自然就要被GC了,那GC前必然要把頭Node的后繼Node變為一個新的頭而且要應對多種情況,這樣就很麻煩。用一個沒有thread的Node作為頭,相當於起了一個引導作用,因為head沒有線程,自然也不會被取消。

再看一下上面unparkSuccessor的14行~20行,就是為了防止head的下一個node被取消的情況,這樣,就從尾到頭遍歷,找出離head最近的一個node,對這個node進行unPark操作。

 

ReentrantLock其他方法的實現

如果能理解ReentrantLock的實現方式,那么你會發現ReentrantLock中其余一些方法的實現還是很簡單的,從JDK API關於ReentrantLock方法的介紹這部分,舉幾個例子:

1、int getHoldCount()

final int getHoldCount() {
    return isHeldExclusively() ? getState() : 0;
}

獲取ReentrantLock的lock()方法被調用了幾次,就是state的當前值

2、Thread getOwner()

final Thread getOwner() {
    return getState() == 0 ? null : getExclusiveOwnerThread();
}

獲取當前占有鎖的線程,就是AbstractOwnableSynchronizer中exclusiveOwnerThread的值

3、Collection<Thread> getQueuedThreads()

public final Collection<Thread> getQueuedThreads() {
    ArrayList<Thread> list = new ArrayList<Thread>();
    for (Node p = tail; p != null; p = p.prev) {
        Thread t = p.thread;
        if (t != null)
            list.add(t);
    }
    return list;
}

從尾到頭遍歷一下,添加進ArrayList中

4、int getQueuedLength()

public final int getQueueLength() {
    int n = 0;
    for (Node p = tail; p != null; p = p.prev) {
        if (p.thread != null)
            ++n;
    }
    return n;
}

從尾到頭遍歷一下,累加n。當然這個方法和上面那個方法可能是不准確的,因為遍歷的時候可能別的線程又往隊列尾部添加了Node。

其余方法也都差不多,可以自己去看一下。

 

遺留問題

ReentrantLock的流程基本已經理清楚了,現在還有一個遺留問題:我們知道ReentrantLock是可以指定公平鎖或是非公平鎖,那么到底是怎么樣的代碼差別導致公平鎖和非公平鎖的產生的呢

說實話,這個問題,我自己到現在還沒有完全想通。之后會持續跟進這個問題,一旦想明白了,會第一時間更新此文或者是新發一篇文章來專門講述公平ReentrantLock和非公平ReentrantLock在代碼上的差別。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM