ReentrantLock之公平鎖源碼分析


  本文分析的ReentrantLock所對應的Java版本為JDK8。

  在閱讀本文前,讀者應該知道什么是CAS、自旋。

本文大綱

  1.ReentrantLock公平鎖簡介
  2.AQS
  3.lock方法
  4.unlock方法

1. ReentrantLock公平鎖簡介

  ReentrantLock是JUC(java.util.concurrent)包中Lock接口的一個實現類,它是基於AbstractQueuedSynchronizer(下文簡稱AQS)來實現鎖的功能。ReentrantLock的內部類Sync繼承了AbstractQueuedSynchronizer,Sync又有FairSync和NonFairSync兩個子類。FairSync實現了公平鎖相關的操作,NonFairSync實現了非公平鎖相關的操作。它們之間的關系如下:

 

  公平鎖的公平之處主要體現在,對於一個新來的線程,如果鎖沒有被占用,它會判斷等待隊列中是否還有其它的等待線程,如果有的話,就加入等待隊列隊尾,否則就去搶占鎖。

  下面這段代碼展示了公平鎖的使用方法:

private final Lock lock = new ReentrantLock(true); // 參數true代表創建公平鎖

public void method() {
     lock.lock();  // block until condition holds
     try {
       // ... method body
     } finally {
       lock.unlock();
     }
}

2. AQS

  下面簡單介紹一下AQS中的Node內部類和幾個重要的成員變量。

2.1 Node

  AQS中,維護了一個Node內部類,用於包裝我們的線程。我們需要關注Node中的如下屬性:

  •   pre:當前節點的前驅節點。
  •   next:當前節點的后繼節點。
  •   thread:thread表示被包裝的線程。
  •   waitStatus:waitStatus是一個int整型,可以被賦予如下幾種值:
static final int CANCELLED =  1; // 線程被取消
static final int SIGNAL  = -1; // 后繼節點中的線程需要被喚醒
static final int CONDITION = -2; // 暫不關注
static final int PROPAGATE = -3; // 暫不關注

  另外,當一個新的Node被創建時,waitStatus是0。

2.2 head

  head指向隊列中的隊首元素,可以理解為當前持有鎖的線程。

2.3 tail

  tail指向隊列中的隊尾元素。

2.4 state

  state表示在ReentrantLock中可以理解為鎖的狀態,0表示當前鎖沒有被占用,大於0的數表示鎖被當前線程重入的次數。例如,當state等於2時,表示當前線程在這把鎖上進入了兩次。

2.5 exclusiveOwnerThread

  表示當前占用鎖的線程。

2.6 等待隊列

  下圖簡單展示了AQS中的等待隊列:

3. lock方法

  有了上面的AQS的基礎知識后,我們就可以展開對ReentrantLock公平鎖的分析了,先從lock方法入手。

  ReentrantLock中的lock方法很簡單,只是調用了Sync類(本文研究公平鎖,所以應該是FairSync類)中的lock方法:

public void lock() {
    sync.lock();
}

  我們跟到FairSync的lock方法,代碼也很簡單,調用了AQS中的acquire方法:

final void lock() {
    acquire(1);
}

  acquire方法:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && // 調用tryAcquire嘗試去獲取鎖,如果獲取成功,則方法結束
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果獲取鎖失敗,執行acquireQueued方法,將把當前線程排入隊尾
        selfInterrupt();
}

  tryAcquire方法:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState(); // 獲取鎖的狀態
    if (c == 0) { // 如果狀態是0,表示鎖沒有被占用
        if (!hasQueuedPredecessors() && // 判斷是隊列中是否有排隊中的線程
            compareAndSetState(0, acquires)) { // 隊列中沒有排隊的線程,則嘗試用CAS去獲取一下鎖
            setExclusiveOwnerThread(current); // 獲取鎖成功,則將當前占有鎖的線程設置為當前線程
            return true;
        }
    }
    // 鎖被占用、隊列中有排隊的線程或者當前線程在獲取鎖的時候失敗將執行下面的代碼
    else if (current == getExclusiveOwnerThread()) { // 當前線程是否是占有鎖的線程
        int nextc = c + acquires; // 是的話,表示當前線程是重入這把鎖,將鎖的狀態進行加1
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded"); // 鎖的重入次數超過int能夠表示最大的值,拋出異常
        setState(nextc); // 設置鎖的狀態
        return true;
    }
    return false; // 沒有獲取到鎖
}

  hasQueuedPredecessors方法:

public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t && // 隊列中的隊首和隊尾元素不相同
        ((s = h.next) == null || s.thread != Thread.currentThread()); // 隊列中的第二個元素不為null,且第二個元素中的線程不是當前線程。這里如果返回true,說明隊列中至少存在tail、head兩個節點,就會執行acquireQueued將當前線程加入隊尾
}

  如果tryAcquire沒有獲取到鎖,將執行:

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

  我們先分析addWaiter方法:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode); // 將當前線程包裝成Node,mode參數值為null,表示獨占模式
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred; // 如果隊列中的尾節點不為空,將當前node的前驅節點設置為之前隊列中的tail
        if (compareAndSetTail(pred, node)) { // 用CAS把當前node設置為隊尾元素
            pred.next = node; // 成功的話,則將之前隊尾元素的后繼節點設置為當前節點。如果這里不清楚的話,請結合前面講等待隊列的那張圖進行理解。
            return node;
        }
    }
    enq(node); // 隊尾節點為空,或者用CAS設置隊尾元素失敗,則用自旋的方式入隊
    return node;
}

  enq方法:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            if (compareAndSetHead(new Node())) // 隊尾元素為空,創建一個空的Node,並設置為隊首
                tail = head; // 設置隊首和隊尾為同一個空Node,進入下一次循環
        } else {
            node.prev = t; // 如果隊列中的尾節點不為空,將當前node的前驅節點設置為之前隊列中的tail
            if (compareAndSetTail(t, node)) { // 用CAS把當前node設置為隊尾元素
                t.next = node; // 成功的話,則將之前隊尾元素的后繼節點設置為當前節點
                return t;
            }
        }
    }
}

   下面這張圖反應了上面enq方法的處理流程:

  經過上面的方法,當前node已經加入等待隊列的隊尾,接下來將執行acquireQueued方法:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor(); // 獲取node的前驅節點
            if (p == head && tryAcquire(arg)) { // 如果node的前驅是head,它將去嘗試獲取鎖(tryAcquire方法在前面已經分析過)
                setHead(node); // 獲取成功,則將node設置為head
                p.next = null; // 將之前的head的后繼節點置空
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && // 當前node的前驅不是head,將為當前node找到一個能夠將其喚醒的前驅節點;或者當前node的前驅是head,但是獲取鎖失敗
                parkAndCheckInterrupt()) // 將當前線程掛起
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

  shouldParkAfterFailedAcquire方法的作用就是找到一個能夠喚醒當前node的節點:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; // 開始時是0
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true; // 前驅節點的狀態是-1,會喚醒后繼節點,可以將線程掛起
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev; // 前驅節點中的線程被取消,那就需要一直循環直到找到一個沒有被設置為取消狀態的前驅節點
        } while (pred.waitStatus > 0);
        pred.next = node; // 從后向前找,將第一個非取消狀態的節點,設置這個節點的后繼節點設置為當前node
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // waitStatus是0或者-3的時候,這時waitStatus都將被設置為-1
                                                        // 即后繼節點需要前驅節點喚醒
    }
    return false; // 上層代碼再進行一次循環,下次進入此方法時,將進入第一個if條件
}

  找到了合適的前驅節點,parkAndCheckInterrupt方法當前線程掛起:

private final boolean parkAndCheckInterrupt() { // 將線程掛起,等待前驅節點的喚醒
    LockSupport.park(this);
    return Thread.interrupted();
}

 4. unlock方法

  ReentrantLock的unlock方法調用AQS中的release方法:

public void unlock() {
    sync.release(1); // 調用AQS的release方法
}

  release方法:

public final boolean release(int arg) {
    if (tryRelease(arg)) { // 嘗試去釋放鎖
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); // 釋放鎖成功,head不為空,並且head的waitStatus不為0的情況下,將喚醒后繼節點
        return true;
    }
    return false;
}

  tryRelease方法:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases; // 將鎖的狀態減1
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException(); // 准備釋放鎖的線程不是持有鎖的線程,拋出異常
    boolean free = false;
    if (c == 0) {
        free = true; // 鎖的狀態是0,說明不存在重入的情況了,可以直接釋放了
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

  鎖釋放成功,將喚醒后繼節點,unparkSuccessor方法:

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus; // 注意,這個node是head節點
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0); // 當前node的狀態是小於0,將其狀態設置為0

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next; // head節點的后繼節點
    if (s == null || s.waitStatus > 0) {
        s = null; // 執行到這表示head的后繼節點是1,處於取消的狀態
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t; // 從等待隊列的隊尾向前找,找到倒序的最后一個處於非取消狀態的節點
    }
    if (s != null)
        LockSupport.unpark(s.thread); // 喚醒head后面的處於非取消狀態的第一個(正序)節點
}

  到此,全文結束,大家看代碼的時候結合圖來理解會容易很多。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM