並發編程之顯式鎖原理


Synchronized 關鍵字結合對象的監視器,JVM 為我們提供了一種『內置鎖』的語義,這種鎖很簡便,不需要我們關心加鎖和釋放鎖的過程,我們只需要告訴虛擬機哪些代碼塊需要加鎖即可,其他的細節會由編譯器和虛擬機自己實現。

可以將我們的『內置鎖』理解為是 JVM 的一種內置特性, 所以一個很顯著的問題就是,它不支持某些高級功能的定制,比如說,我想要這個鎖支持公平競爭,我想要根據不同的條件將線程阻塞在不同的隊列上,我想要支持定時競爭鎖,超時返回,我還想讓被阻塞的線程能夠響應中斷請求,等等等等。

這些特殊的需求是『內置鎖』滿足不了的,所以在 JDK 層面又引入了『顯式鎖』的概念,不再由 JVM 來負責加鎖和釋放鎖,這兩個動作釋放給我們程序來做,程序層面難免復雜了些,但鎖靈活性提高了,可以支持更多定制功能,但要求你對鎖具有更深層次的理解。

Lock 顯式鎖

Lock 接口位於 java.util.concurrent.locks 包下,基本定義如下:

public interface Lock {
    //獲取鎖,失敗則阻塞
    void lock();
    //響應中斷式獲取鎖
    void lockInterruptibly()
    //嘗試一次獲取鎖,成功返回true,失敗返回false,不會阻塞
    boolean tryLock();
    //定時嘗試
    boolean tryLock(long time, TimeUnit unit)
    //釋放鎖
    void unlock();
    //創建一個條件隊列
    Condition newCondition();
}

Lock 定義了顯式鎖應該具有的最基本的方法,各個子類的實現應該具有更加復雜的能力,整個 Lock 的框架如下:

image

其中,顯式鎖的實現類主要有三個,ReentrantLock 是其最主要的實現類,ReadLock 和 WriteLock 是 ReentrantReadWriteLock 內部定義的兩個內部類,他們繼承自 Lock 並實現了其定義的所有方法,精細化讀寫分離。而 ReentrantReadWriteLock 向外提供讀鎖寫鎖。

至於 LockSupport,它提供了阻塞和喚醒一個線程的能力,當然內部也是通過 Unsafe 類繼而調用操作系統底層的 API 來實現的。

AbstractQueuedSynchronizer 你可以叫它隊列同步器,也可以簡稱它為 AQS,它是我們實現鎖的一個核心,本質上就是個同步機制,記錄當前占有鎖的線程,每一個想要獲取鎖的線程都需要通過這個同步機制來判斷自己是否具備占有該鎖的條件,如果不具備則阻塞等待,否則將占有鎖,修改標志,這一點我們后續會詳細分析。

ReentrantLock 的基本理解

ReentrantLock 作為 Lock 顯式鎖的最基本實現,也是使用最頻繁的一個鎖實現類。它提供了兩個構造函數,用於支持公平競爭鎖。

public ReentrantLock()

public ReentrantLock(boolean fair)

默認無參的構造函數表示啟用非公平鎖,當然也可以通過第二個構造函數傳入 fair 參數值為 true 指明啟用公平鎖。

公平鎖和非公平鎖的區別之處在於,公平鎖在選擇下一個占有鎖的線程時,參考先到先得原則,等待時間越長的線程將具有更高的優先級。而非公平鎖則無視這種原則。

兩種策略各有利弊,公平策略可以保證每個線程都公平的競爭到鎖,但是維護公平算法本身也是一種資源消耗,每一次鎖請求的線程都直接被掛在隊列的尾部,而只有隊列頭部的線程有資格使用鎖,后面的都得排隊。

那么假設這么一種情況,A 獲得鎖正在運行,B 嘗試獲得鎖失敗被阻塞,此時 C 也嘗試獲得鎖,失敗而阻塞,雖然 C 只需要很短運行時間,它依然需要等待 B 執行結束才有機會獲得鎖來運行。

非公平鎖的前提下,A 執行結束,找到隊列首部的 B 線程,開始上下文切換,假如此時的 C 過來競爭鎖,非公平策略前提下,C 是可以獲得鎖的,並假設它迅速的執行結束了,當 B 線程被切換回來之后再去獲取鎖也不會有什么問題,結果是,C 線程在 B 線程的上下文切換過程中執行結束。顯然,非公平策略下 CPU 的吞吐量是提高的。

但是,非公平策略的鎖可能會造成某些線程飢餓,始終得不到運行,各有利弊,適時取舍。慶幸的是,我們的顯式鎖支持兩種模式的切換選擇。稍后我們將分析其中實現的細節之處。

ReentrantLock 中有以下三個內部類是比較重要的:

image

內部類 Sync 繼承自我們的 AQS 並重寫了部分方法,NonfairSync 和 FairSync 是 Sync 的兩個子類,分別對應公平鎖和非公平鎖。

為什么這么做呢?

image

類 Sync 中有一個 lock 方法,而公平策略下的 lock 方法和非公平策略下的 lock 方法應該具有不同的實現,所以這里並沒有寫死,而是交由子類去實現它。

這其實是一種典型的設計模式,『模板方法』。

關於 AQS,我們稍后做詳細的分析,這里你把它理解為一個用於記錄保存當前占有鎖線程信息和阻塞在該鎖上所有線程信息的容器即可。

接着看 ReentrantLock,你會發現,無論是 lock 方法,lockInterruptibly 方法、tryLock 或是 unlock 方法都是透傳調用 sync 的相關方法,也即 AQS 中的相關方法。

下面我們就深入源碼去分析分析這個 AQS 的實現情況。

AQS 的基本原理

AQS 就是我們的 AbstractQueuedSynchronizer,你可以把它理解為一個容器,它是一個抽象類,有一個父類 AbstractOwnableSynchronizer。這個父類的職責很簡單,有一個 Thread 類型的成員屬性,就是用來給 AQS 保存當前占有鎖的線程的。

除此之外,AQS 中還定義了一個靜態內部類 Node,是一個雙向鏈表的數據結構。AQS 中自然還對應兩個指針,隊列頭指針,尾指針。

int 類型的屬性 state 也是一個十分重要的成員,值為零表示當前鎖無任何線程持有,值為一說明有一個線程持有該鎖未釋放,大於一說明持有該鎖的線程多次重入。

AQS 中定義了很多的方法,有公共的,有私有的,這里不一一贅述,我們從 ReentrantLock 的 lock 和 unlock 入手,分析它一路調用的方法,以非公平鎖為例。

public void lock() {
    sync.lock();
}

ReentrantLock 的 lock 方法直接調用的 sync 的 lock 方法,而我們說過 sync 中定義的 lock 方法是一個抽象方法,具體實現在子類中,NonfairSync 的 lock 方法實現如下:

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

邏輯很簡單,嘗試使用 CAS 更新 state 的值為 1,表示當前線程嘗試占有該鎖,如果成功,說明 state 的值原本是一,也即鎖無任何線程占用,於是將當前線程保存到父類的 Thread 字段中。

如果更新失敗,那么說明鎖已經被持有,需要掛起當前線程,於是調用 acquire 方法(AQS中的方法)。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire 被子類 Sync 重寫了,所以這里調用的是 NonfairSync 的 tryAcquire 方法。

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
    }
    return false;
}

這段代碼並不復雜是,主要邏輯是,如果 state 為零,說明剛才占有鎖的線程釋放了鎖資源,於是嘗試占有鎖,否則判斷一下占有鎖的線程是否是當前線程,也就是判斷一下是否是重入鎖操作,如果是則增加重入次數即可。

關於返回值,如果是占有鎖成功或者重入鎖成功都將返回 true,否則統一返回 false。

接着看,

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

如果 tryAcquire 方法返回 true,外層 acquire 返回並結束 lock 方法的調用,否則說明占有鎖失敗並准備將當前線程阻塞,具體的阻塞情況我們繼續分析。

addWaiter 方法用於將當前線程包裝成一個 Node 結點並添加到隊列的尾部,我們看看源代碼:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

代碼比較簡單,不再啰嗦了,這個方法最終會導致當前線程掛在等待隊列的尾部。

添加到等待隊列之后會回到 acquireQueued 方法,這個方法會做最后一次嘗試獲取鎖,如果依然失敗則調用 LockSupport 方法掛起該線程。

image

整個方法的核心邏輯被寫在了死循環之中,循環體的前半部分再一次嘗試獲取鎖,這里需要注意,head 指向的結點並不是隊列中有效的等待線程,head 的 next 指針指向的結點才是第一個有效的等待線程。

也就是說,如果那個結點的前驅結點是 head,那么它就是鎖的第一有效繼承人。

如果依然失敗了,會先調用 shouldParkAfterFailedAcquire 判斷是否應該阻塞當前線程,這個方法在大部分情況下會返回 true,在某些特殊情況下會返回 false。

然后 parkAndCheckInterrupt 將直接阻塞當前線程,調用 LockSupport 的 park 方法。整個獲取鎖的過程基本上就算結束了,接着我們如何解除阻塞。

public void unlock() {
    sync.release(1);
}

unlock 調用 AQS 的 release 方法,

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

先調用 tryRelease 嘗試性釋放,

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

如果當前線程不是擁有鎖的線程,那么直接拋出異常,這是必要的異常點判斷。

如果 c 等於零,說明自己並沒有多次重入該鎖,清空 exclusiveOwnerThread 字段即可,並修改 state 狀態。這段代碼沒有加同步邏輯的原因是,unlock 方法只能由占有鎖的線程進行調用,同一時刻只會有一個線程能夠調用成功。

假如 c 不等於零,也就是當前線程多次重入該鎖,state 雖然會被減一修改,而 tryRelease 卻會返回 false,這一點需要注意。我們再回到 release 方法,

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

可以看到,如果由於線程的多次重入 tryRelease 返回 false 了,最終導致的是我們的 unlock 方法返回 false。

換句話說,你重入多少次鎖,你就需要手動調用多少次 unlock,而只有最后一次的 unlock 方法返回的是 true,這就是原理。

而假如我們的 tryRelease 調用成功並返回 true,unparkSuccessor 方法就會去 unpark 我們的隊列首個有效的結點所對應的線程。unparkSuccessor 比較簡單,不涉及任何同步機制,這里不再贅述了。

總的來說,unlock 要比 lock 簡單很多,原因在於,unlock 不需要同步機制,只有獲得鎖的線程才能夠調用,不存在並發訪問,而 lock 方法則不一樣,會面臨大量線程同時訪問。

我們回到 acquireQueued 方法,

image

線程被喚醒后,會從上一次被阻塞的位置起重新開始執行代碼,也就是線程會蘇醒於 parkAndCheckInterrupt 方法中,

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);  //這里開始蘇醒
    return Thread.interrupted();
}

第一件事情,調用 interrupted 方法,而這個方法用於判斷當前線程在阻塞期間是否被中斷。

如果遭遇中斷,會進入 if 判斷體,記錄一下,用於方法返回。被喚醒的線程將重新從循環體首部開始,再次嘗試去競爭鎖,直到位於等待隊列中自己之前的全部結點全部出隊后,才能有機會獲取到鎖並返回中斷標志。

所以來說,在死循環中阻塞一個線程是我們一種較為常見的阻塞模式,目的就是為了方便它被喚醒之后能夠有機會重新競爭相關的鎖資源。

以上,我們完成了對 ReentrantLock 這種獨占式鎖的加鎖和釋放鎖的相關原理的一個介紹,關於讀寫分離的 ReentrantReadWriteLock 鎖,它其實是共享鎖和獨占鎖的一個結合,相對更加復雜,我們下一篇單獨來分析。

除此之外的 ReentrantLock 中其他相關的一些響應中斷的獲取鎖方法,支持超時返回的相關方法等,無一例外的依賴我們上述介紹的原理,相信大家有能力自行參透。

好了,本篇文章到此結束,大家期待下篇文章吧。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM