死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖


問題

(1)條件鎖是什么?

(2)條件鎖適用於什么場景?

(3)條件鎖的await()是在其它線程signal()的時候喚醒的嗎?

簡介

條件鎖,是指在獲取鎖之后發現當前業務場景自己無法處理,而需要等待某個條件的出現才可以繼續處理時使用的一種鎖。

比如,在阻塞隊列中,當隊列中沒有元素的時候是無法彈出一個元素的,這時候就需要阻塞在條件notEmpty上,等待其它線程往里面放入一個元素后,喚醒這個條件notEmpty,當前線程才可以繼續去做“彈出一個元素”的行為。

注意,這里的條件,必須是在獲取鎖之后去等待,對應到ReentrantLock的條件鎖,就是獲取鎖之后才能調用condition.await()方法。

在java中,條件鎖的實現都在AQS的ConditionObject類中,ConditionObject實現了Condition接口,下面我們通過一個例子來進入到條件鎖的學習中。

使用示例

public class ReentrantLockTest {
    public static void main(String[] args) throws InterruptedException {
        // 聲明一個重入鎖
        ReentrantLock lock = new ReentrantLock();
        // 聲明一個條件鎖
        Condition condition = lock.newCondition();

        new Thread(()->{
            try {
                lock.lock();  // 1
                try {
                    System.out.println("before await");  // 2
                    // 等待條件
                    condition.await();  // 3
                    System.out.println("after await");  // 10
                } finally {
                    lock.unlock();  // 11
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        // 這里睡1000ms是為了讓上面的線程先獲取到鎖
        Thread.sleep(1000);
        lock.lock();  // 4
        try {
            // 這里睡2000ms代表這個線程執行業務需要的時間
            Thread.sleep(2000);  // 5
            System.out.println("before signal");  // 6
            // 通知條件已成立
            condition.signal();  // 7
            System.out.println("after signal");  // 8
        } finally {
            lock.unlock();  // 9
        }
    }
}

上面的代碼很簡單,一個線程等待條件,另一個線程通知條件已成立,后面的數字代表代碼實際運行的順序,如果你能把這個順序看懂基本條件鎖掌握得差不多了。

源碼分析

ConditionObject的主要屬性

public class ConditionObject implements Condition, java.io.Serializable {
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}

可以看到條件鎖中也維護了一個隊列,為了和AQS的隊列區分,我這里稱為條件隊列,firstWaiter是隊列的頭節點,lastWaiter是隊列的尾節點,它們是干什么的呢?接着看。

lock.newCondition()方法

新建一個條件鎖。

// ReentrantLock.newCondition()
public Condition newCondition() {
    return sync.newCondition();
}
// ReentrantLock.Sync.newCondition()
final ConditionObject newCondition() {
    return new ConditionObject();
}
// AbstractQueuedSynchronizer.ConditionObject.ConditionObject()
public ConditionObject() { }

新建一個條件鎖最后就是調用的AQS中的ConditionObject類來實例化條件鎖。

condition.await()方法

condition.await()方法,表明現在要等待條件的出現。

// AbstractQueuedSynchronizer.ConditionObject.await()
public final void await() throws InterruptedException {
    // 如果線程中斷了,拋出異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加節點到Condition的隊列中,並返回該節點
    Node node = addConditionWaiter();
    // 完全釋放當前線程獲取的鎖
    // 因為鎖是可重入的,所以這里要把獲取的鎖全部釋放
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 是否在同步隊列中
    while (!isOnSyncQueue(node)) {
        // 阻塞當前線程
        LockSupport.park(this);
        
        // 上面部分是調用await()時釋放自己占有的鎖,並阻塞自己等待條件的出現
        // *************************分界線*************************  //
        // 下面部分是條件已經出現,嘗試去獲取鎖
        
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    // 嘗試獲取鎖,注意第二個參數,這是上一章分析過的方法
    // 如果沒獲取到會再次阻塞(這個方法這里就不貼出來了,有興趣的翻翻上一章的內容)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清除取消的節點
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 線程中斷相關
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
// AbstractQueuedSynchronizer.ConditionObject.addConditionWaiter
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果條件隊列的尾節點已取消,從頭節點開始清除所有已取消的節點
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        // 重新獲取尾節點
        t = lastWaiter;
    }
    // 新建一個節點,它的等待狀態是CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 如果尾節點為空,則把新節點賦值給頭節點(相當於初始化隊列)
    // 否則把新節點賦值給尾節點的nextWaiter指針
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    // 尾節點指向新節點
    lastWaiter = node;
    // 返回新節點
    return node;
}
// AbstractQueuedSynchronizer.fullyRelease
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 獲取狀態變量的值,重復獲取鎖,這個值會一直累加
        // 所以這個值也代表着獲取鎖的次數
        int savedState = getState();
        // 一次性釋放所有獲得的鎖
        if (release(savedState)) {
            failed = false;
            // 返回獲取鎖的次數
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
// AbstractQueuedSynchronizer.isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
    // 如果等待狀態是CONDITION,或者前一個指針為空,返回false
    // 說明還沒有移到AQS的隊列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果next指針有值,說明已經移到AQS的隊列中了
    if (node.next != null) // If has successor, it must be on queue
        return true;
    // 從AQS的尾節點開始往前尋找看是否可以找到當前節點,找到了也說明已經在AQS的隊列中了
    return findNodeFromTail(node);
}

這里有幾個難理解的點:

(1)Condition的隊列和AQS的隊列不完全一樣;

AQS的隊列頭節點是不存在任何值的,是一個虛節點;

Condition的隊列頭節點是存儲着實實在在的元素值的,是真實節點。

(2)各種等待狀態(waitStatus)的變化;

首先,在條件隊列中,新建節點的初始等待狀態是CONDITION(-2);

其次,移到AQS的隊列中時等待狀態會更改為0(AQS隊列節點的初始等待狀態為0);

然后,在AQS的隊列中如果需要阻塞,會把它上一個節點的等待狀態設置為SIGNAL(-1);

最后,不管在Condition隊列還是AQS隊列中,已取消的節點的等待狀態都會設置為CANCELLED(1);

另外,后面我們在共享鎖的時候還會講到另外一種等待狀態叫PROPAGATE(-3)。

(3)相似的名稱;

AQS中下一個節點是next,上一個節點是prev;

Condition中下一個節點是nextWaiter,沒有上一個節點。

如果弄明白了這幾個點,看懂上面的代碼還是輕松加愉快的,如果沒弄明白,彤哥這里指出來了,希望您回頭再看看上面的代碼。

下面總結一下await()方法的大致流程:

(1)新建一個節點加入到條件隊列中去;

(2)完全釋放當前線程占有的鎖;

(3)阻塞當前線程,並等待條件的出現;

(4)條件已出現(此時節點已經移到AQS的隊列中),嘗試獲取鎖;

也就是說await()方法內部其實是先釋放鎖->等待條件->再次獲取鎖的過程。

condition.signal()方法

condition.signal()方法通知條件已經出現。

// AbstractQueuedSynchronizer.ConditionObject.signal
public final void signal() {
    // 如果不是當前線程占有着鎖,調用這個方法拋出異常
    // 說明signal()也要在獲取鎖之后執行
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 條件隊列的頭節點
    Node first = firstWaiter;
    // 如果有等待條件的節點,則通知它條件已成立
    if (first != null)
        doSignal(first);
}
// AbstractQueuedSynchronizer.ConditionObject.doSignal
private void doSignal(Node first) {
    do {
        // 移到條件隊列的頭節點往后一位
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 相當於把頭節點從隊列中出隊
        first.nextWaiter = null;
        // 轉移節點到AQS隊列中
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
// AbstractQueuedSynchronizer.transferForSignal
final boolean transferForSignal(Node node) {
    // 把節點的狀態更改為0,也就是說即將移到AQS隊列中
    // 如果失敗了,說明節點已經被改成取消狀態了
    // 返回false,通過上面的循環可知會尋找下一個可用節點
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 調用AQS的入隊方法把節點移到AQS的隊列中
    // 注意,這里enq()的返回值是node的上一個節點,也就是舊尾節點
    Node p = enq(node);
    // 上一個節點的等待狀態
    int ws = p.waitStatus;
    // 如果上一個節點已取消了,或者更新狀態為SIGNAL失敗(也是說明上一個節點已經取消了)
    // 則直接喚醒當前節點對應的線程
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    // 如果更新上一個節點的等待狀態為SIGNAL成功了
    // 則返回true,這時上面的循環不成立了,退出循環,也就是只通知了一個節點
    // 此時當前節點還是阻塞狀態
    // 也就是說調用signal()的時候並不會真正喚醒一個節點
    // 只是把節點從條件隊列移到AQS隊列中
    return true;
}

signal()方法的大致流程為:

(1)從條件隊列的頭節點開始尋找一個非取消狀態的節點;

(2)把它從條件隊列移到AQS隊列;

(3)且只移動一個節點;

注意,這里調用signal()方法后並不會真正喚醒一個節點,那么,喚醒一個節點是在啥時候呢?

還記得開頭例子嗎?倒回去再好好看看,signal()方法后,最終會執行lock.unlock()方法,此時才會真正喚醒一個節點,喚醒的這個節點如果曾經是條件節點的話又會繼續執行await()方法“分界線”下面的代碼。

結束了,仔細體會下^^

如果非要用一個圖來表示的話,我想下面這個圖可以大致表示一下(這里是用時序圖畫的,但是實際並不能算作一個真正的時序圖哈,了解就好):

ReentrantLock

總結

(1)重入鎖是指可重復獲取的鎖,即一個線程獲取鎖之后再嘗試獲取鎖時會自動獲取鎖;

(2)在ReentrantLock中重入鎖是通過不斷累加state變量的值實現的;

(3)ReentrantLock的釋放要跟獲取匹配,即獲取了幾次也要釋放幾次;

(4)ReentrantLock默認是非公平模式,因為非公平模式效率更高;

(5)條件鎖是指為了等待某個條件出現而使用的一種鎖;

(6)條件鎖比較經典的使用場景就是隊列為空時阻塞在條件notEmpty上;

(7)ReentrantLock中的條件鎖是通過AQS的ConditionObject內部類實現的;

(8)await()和signal()方法都必須在獲取鎖之后釋放鎖之前使用;

(9)await()方法會新建一個節點放到條件隊列中,接着完全釋放鎖,然后阻塞當前線程並等待條件的出現;

(10)signal()方法會尋找條件隊列中第一個可用節點移到AQS隊列中;

(11)在調用signal()方法的線程調用unlock()方法才真正喚醒阻塞在條件上的節點(此時節點已經在AQS隊列中);

(12)之后該節點會再次嘗試獲取鎖,后面的邏輯與lock()的邏輯基本一致了。

彩蛋

為什么java有自帶的關鍵字synchronized了還需要實現一個ReentrantLock呢?

首先,它們都是可重入鎖;

其次,它們都默認是非公平模式;

然后,...,呃,我們下一章繼續深入探討 ReentrantLock VS synchronized。

推薦閱讀

  1. 死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖

  2. 死磕 java同步系列之AQS起篇

  3. 死磕 java同步系列之自己動手寫一個鎖Lock

  4. 死磕 java魔法類之Unsafe解析

  5. 死磕 java同步系列之JMM(Java Memory Model)

  6. 死磕 java同步系列之volatile解析

  7. 死磕 java同步系列之synchronized解析


歡迎關注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

qrcode


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM