關於 ReentrantLock 中鎖 lock() 和解鎖 unlock() 的底層原理淺析


關於 ReentrantLock 中鎖 lock() 和解鎖 unlock() 的底層原理淺析

如下代碼,當我們在使用 ReentrantLock 進行加鎖和解鎖時,底層到底是如何幫助我們進行控制的啦?

    static Lock lock = new ReentrantLock();

    public static void main(String[] args) {

        // 使用兩個線程模擬多線程執行並發
        new Thread(() -> doBusiness(), "Thread-1").start();
        new Thread(() -> doBusiness(), "Thread-2").start();
    }

    private static void doBusiness() {
        try {
            lock.lock();
            System.out.println("需要加鎖的業務處理代碼,防止並發異常");
        } finally {
            lock.unlock();
        }
    }

帶着這樣的疑問,我們先后跟進 lock()和unlock() 源碼一探究竟

說明:

  1、在進行查看 ReentrantLock 進行 lock() 加鎖和 unlock() 解鎖源碼時,需要知道 LockSupport 類、了解自旋鎖以及鏈表相關知識。

  2、在分析過程中,假設第一個線程獲取到鎖的時候執行代碼需要很長時間才釋放鎖,及在第二個第三個線程來獲取鎖的時候,第一個線程並沒有執行完成,沒有釋放鎖資源。

  3、在分析過程中,我們假設第一個線程就是最先進來獲取鎖的線程,那么第二個第三個線程也是依次進入的,不會存在第三個線程先於第二個線程(即第三個線程如果先於第二個線程發生,那么第三個線程就是我們下面描述的第二個線程)

 

一、 lock() 方法

1、查看lock()方法源碼

    public void lock() {
        sync.lock();
    }

  從上面可以看出 ReentrantLock 的 lock() 方法調用的是 sync 這個對象的 lock() 方法,而 Sync 就是一個實現了抽象類AQS(AbstractQueuedSynchronizer) 抽象隊列同步器的一個子類,繼續跟進代碼(說明:ReentrantLock 分為公平鎖和非公平鎖,如果無參構造器創建鎖默認是非公平鎖,我們按照非公平鎖的代碼來講解)

  1.1 關於Sync子類的源碼

abstract static class Sync extends AbstractQueuedSynchronizer { 
    // 此處省略具體實現AbstractQueuedSynchronizer 類的多個方法
}

  這里需要說明的是 AbstractQueuedSynchronizer 抽象隊列同步器底層是一個通過Node實現的雙向鏈表,該抽象同步器有三個屬性 head 頭節點tail 尾節點 和 state 狀態值。

    屬性1:head——注釋英文翻譯:等待隊列(同步阻塞隊列)的頭部,懶加載,用於初始化,當調用 setHead() 方法的時候會對 head 進行修改。注:如果 head 節點存在,則 head 節點的 waitStatus 狀態值用於保證其不變成 CANCELLED(取消狀態,值為1) 狀態

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    屬性2: tail——tail節點是等待隊列(同步阻塞隊列)的尾部,懶加載,在調用 enq() 方法時會添加一個新的 node 到等待隊列的時候會修改 tail 節點。

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    屬性3:state——用於同步的狀態碼。如果 state 該值為0,則表示沒有其他線程獲取到鎖,如果該值大於0則表示已經被某線程獲取到了鎖,該值可以是2、3、4,所以使用該值可以處理重入鎖(遞歸鎖)的邏輯。

    /**
     * The synchronization state.
     */
    private volatile int state;

  1.2 上面 Sync 類使用 Node來作為雙向隊列的具體保存值和狀態的載體,Node 的具體結構如下

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node(); // 共享鎖模式(主要用於讀寫鎖中的讀鎖)
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null; // 排他鎖模式(也叫互斥鎖)

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1; // Node線程等待取消,不再參與鎖競爭,處於這種狀態的Node會被踢出隊列,被GC回收
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1; // 表明Node線程需要被喚醒,可以競爭鎖
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2; // 表示這個Node線程在條件隊列中,因為等待某個條件而被阻塞 
        /** waitStatus value to indicate the next acquireShared should unconditionally propagate */
        static final int PROPAGATE = -3; // 使用在共享模式頭Node有可能處於這種狀態, 表示鎖的下一次獲取可以無條件傳播

        volatile int waitStatus; // 默認初始狀態為0,所有新增node節點的初始狀態都是0

        volatile Node prev; // 前驅Node節點

        volatile Node next; // 后繼Node節點

        /** The thread that enqueued this node.  Initialized on construction and nulled out after use.*/
        volatile Thread thread; // 當前線程和節點進行綁定,通過構造器初始化Thread,在釋放的時候將當前線程替換原有的null值

        // 省略部分代碼

  說明:Sync 通過Node節點構建隊列,Node節點使用prev和next節點來行程雙向隊列,使用prev來關聯上一個節點,使用next來關聯下一個節點,每一個node節點和一個thread線程進行綁定,用來表示當前線程在阻塞隊列中的具體位置和狀態 waitStatus

 

2、上面的 sync.lock() 繼續跟進源碼(非公平鎖):

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else acquire(1);
    }

說明:上面代碼說明,如果 compareAndSetState(0, 1) 為 true ,則執行 setExclusiveOwnerThread(Thread.currentThread()) ,否則執行 acquire(1);

  2.1 compareAndSetState(0, 1) 底層使用unsafe類完成CAS操作,意思就是判斷當前state狀態是否為0,如果為零則將該值修改為1,並返回true;state不為0,則無法將該值修改為1,返回false。

    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

  2.2 假如第1個線程進來的時候 compareAndSetState(0, 1) 肯定執行成功,state 狀態會從0變成1,同時返回true,執行 setExclusiveOwnerThread(Thread.currentThread()) 方法:

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

  setExclusiveOwnerThread(Thread.currentThread()) 表示將當前 Sync 對象和當前線程綁定,意思是表明:當前隊列內同步器執行的線程為 thread,該 thread 獲取了鎖正在執行。

  2.3 假如進來的線程為第2個,並且第一個線程還在執行沒有釋放鎖,那么第2個線程就會執行 acquire(1)方法:

    public final void acquire(int arg) {
     // 如下:三個方法的各自作用
     // tryAcquire(arg) 當前線程是否還有機會獲取到鎖,非公平鎖的特質
     // addWaiter() 將當前線程加入隊列
     // acquireQueued() 方法中:1 判斷是否是隊首任務head節點 並且搶占鎖成功,則修改鏈表 2、否則(不是隊首head節點 或者 沒有搶占到鎖的所有節點)都進行阻塞(原理LockSupport.park())
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

  進入到該方法中發現,需要通過 !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 兩個方法判斷是否需要執行 selfInterrupt();

    (1)先執行tryAcquire(arg)這個方法進行判斷,注:tryAcquire(arg)方法體現了非公平鎖的特性,只要在加入隊列前都有機會獲取鎖。這個方法就是判斷當前線程是否在進入隊列前有機會獲取鎖。

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
       // 獲取state狀態,因為第一個線程進來的時候只要還沒有執行完就已經將state設置為1了(即:2.1步)
int c = getState();
       // 再次判斷之前獲取鎖的線程是否已經釋放鎖了
if (c == 0) {
// 如果之前的線程已經釋放鎖,那么當前線程進來就將狀態改為1,並且設置當前占用鎖的線程為自身
if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }
       // 判斷當前占用鎖的線程是不是就是我自身,如果是我自身,這將State在原值的基礎上進行加1,來處理重入鎖邏輯
else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires;
          // 判斷重入鎖次數是不是超過限制,超過限制則直接報錯
if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

    從上面的方法看,如果第二個線程進來,且第一個線程還未釋放鎖的情況下,該方法 tryAcquire(arg) 直接放回false,那么 !tryAcquire(arg) 就為true,需要判斷第二個方法 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),第二個方法先執行addWaiter(Node.EXCLUSIVE),及添加等待線程進入隊列

    (2)添加等待線程到同步阻塞隊列中

    private Node addWaiter(Node mode) {
     // 將當前線程和node節點進行綁定,設置模式為排他鎖模式 Node node
= new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail;// 第二個線程也就是第一次進來該方法的線程,tail肯定是null if (pred != null) { // 如果tail尾節點不為空,表示第3、4、5次進來的線程 node.prev = pred; // 那么就將當前進來的線程節點的 prev 節點指向之前的尾節點 if (compareAndSetTail(pred, node)) { // 通過比較並交換,如果當前尾節點在設置過程中沒有被其他線程搶先操作,那么就將當前節點設置為tail尾節點 pred.next = node; // 將以前尾節點的下一個節點指向當前節點(新的尾節點) return node; } } enq(node); // 如果為第二個線程進來,就是上面的 pred != null 成立,if 沒有執行,直接執行 enq()方法 return node; }
    private Node enq(final Node node) {
        for (;;) { // 一直循環檢查,相當於自旋鎖
            Node t = tail;
            if (t == null) { // Must initialize
                   // 第二個線程的第一次進來肯定先循環進入該方法,這時設置頭結點,該頭結點一般被稱為哨兵節點,並且頭和尾都指向該節點
if (compareAndSetHead(new Node()))
            // 第一次進來設置的頭結點和尾節點是同一個節點 tail
= head; } else {
          // 1、第二個線程在第二次循環時將進入else 方法中,將該節點掛在哨兵節點(頭結點)后,並且尾節點指向該節點,並且將該節點返回(該節點有prev信息) node.prev
= t;
          // 在if中將當前隊列的尾節點修改新增進來的 node 節點
if (compareAndSetTail(t, node)) {
            // t節點是之前鏈表的尾部節點,第一次進來head和tail是同一個節點,並將新的尾節點node 設置到之前尾節點t 后,連成一個整串 t.next
= node; return t; } } } }

    如上在執行 enq(final Node node) 結束,並且返回添加了第二個線程node節點的時候, addWaiter(Node mode) 方法會繼續向上返回

    或者

    如果是添加第3、4個線程直接走 addWaiter(Node mode) 方法中的 if 流程直接添加返回

    都將到2.3 步,執行acquireQueued(final Node node, int arg),再次貼源碼

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    (3)即下一步就會執行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法:

    注:上面的流程是將后面的線程加入到了同步阻塞隊列中,下面的方法第一個if (p == head && tryAcquire(arg))則是看同步阻塞隊列的第一條阻塞線程是否可以獲取到鎖,如果能夠獲取到鎖就修改相應鏈表結構,第二個if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()即將發生線程阻塞

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
          // 自旋鎖,如果為第二個線程,那么 p 就是 head 哨兵節點
final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {
            // 上面的 if 表明如果當前線程為同步阻塞隊列中的第一個線程,那么就再次試圖獲取鎖 tryAcquire(),如果獲取成功,則修改同步阻塞隊列 setHead(node); // 將head頭結點(哨兵節點)設置為已經獲取鎖的線程node,並將該node的Theread 設置為空 p.next
= null; // help GC 取消和之前哨兵節點的關聯,便於垃圾回收器對之前數據的回收 failed = false; return interrupted; }
          // 如果第二個線程沒有獲取到鎖(同步阻塞隊列中的第一個線程),那么就需要執行下面兩個方法,注標的方法會讓當前未獲取到鎖的線程阻塞
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
    private void setHead(Node node) {
        // 將哨兵節點往后移,並且將 thread 設置為空,取消和以前哨兵節點的關聯,並於垃圾回收器回收
        head = node;
        node.thread = null;
        node.prev = null;
    }

    shouldParkAfterFailedAcquire(p, node)這個方法將哨兵隊列的狀態設置為待喚醒狀態

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     // 該方法第一次進來 pred為哨兵節點,ws為哨兵節點的初始0狀態
     // 該方法第二次進來 pred為哨兵節點,ws為哨兵節點的狀態-1狀態
     // 該方法第三、四次進來就為鏈表的倒數第二個節點,ws為倒數第二個節點的狀態
int ws = pred.waitStatus;
    
if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */
       // 如上第二次進來的時候哨兵節點的狀態就是-1,此時返回true
return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do {
          // 如果ws及倒數第二個節點是取消狀態,那么通過雙向鏈表向前找倒數第三個,第四關節點,直到向前找到最近一個狀態不是取消的node節點,並把當前節點掛在該節點后 node.prev
= pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 將頭結點(哨兵節點)設置成待喚醒狀態,第一次進來的時候 有0——>-1 然后繼續執行返回false } return false; }
    parkAndCheckInterrupt()這個方法會讓當前線程阻塞
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // LockSupport.park()會導致當前線程阻塞,直到某個線程調用unpark()方法
        return Thread.interrupted();
    }

    那么在lock()方法執行時,只要第一個線程沒有unlock()釋放鎖,其他所有線程都會加入同步阻塞隊列中,該隊列中記錄了阻塞線程的順序,在加入同步阻塞隊列前有多次機會可以搶先執行(非公平鎖),如果沒有被執行到,那么加入同步阻塞隊列后,就只有頭部節點(哨兵節點)后的阻塞線程有機會獲取到鎖進行邏輯處理。再次查看該方法: 

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    // if 表明只有頭部節點(哨兵節點)后的節點在放入同步阻塞隊列前可以獲取鎖
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 所有線程都被阻塞在這個方法處
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }        

 

二、unlock()方法

1、unlock源碼

    public void unlock() {
        sync.release(1);
    }

  同樣是調用的同步阻塞隊列的方法 sync.release(1),跟進查看源碼:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

2、查看tryRelease()方法:

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                // 如果不是自身鎖對象調用unlock()方法的話,就報異常
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                // 如果標志位已經為0,表示重入鎖已經全部釋放,這將當前獲取鎖的線程設置為null,以便其他線程進行加鎖
                setExclusiveOwnerThread(null);
            }
            // 更新重入鎖解鎖到達的次數,如果C不為0,表示還有重入鎖unlock()沒有調用完
            setState(c);
            return free;
        }        

3、如果tryRelease()方法成功執行,表示之前獲取鎖的線程已經執行完所有需要同步的代碼(重入鎖也完全退出),那么就需要喚醒同步阻塞隊列中的第一個等待的線程(也是等待最久的線程),執行unparkSuccessor(h)方法:

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        // 先獲取頭結點(哨兵節點)的waitStatus狀態,如果小於0,則可以獲取鎖,並將waitStatus的狀態設置為0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            // 如果哨兵節點的下一個節點為null,或者狀態為1表示已經取消,則依次循環尋找(從后往前尋找)后面節點,直至找到一個waitStatus<0的節點,並將該節點設置為需要獲取鎖的節點
            s = null;
       // 注下面的 for 循環是從后往前遍歷,直到 for 循環遍歷完成找到最開頭的一個節點,並且該節點 waitStatus<0。將找到的最開頭的滿足條件的節點給到 s 並對其進行喚醒
for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 將該node節點的線程解鎖,允許它去獲取鎖,然后執行業務邏輯 LockSupport.unpark(s.thread); }

 

三、unlock()方法調用后,會到lock()方法阻塞的地方,完成喚醒工作

1、在上面方法 unparkSuccessor(Node node) 中執行完 LockSupport.unpark(s.thread) 后在同步阻塞隊列后的第一個 node 關聯的線程將被喚醒,即unlock()方法代碼執行完,將會到lock() 源碼解析的 2.3 步里,第三次貼該處源碼:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

2、在上面放大的紅色方法中,之前上面lock()源碼講了當中所有線程都被阻塞了,如下面源碼紅色標記的地方:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

3、所有未獲取到鎖的線程都在 parkAndCheckInterrupt() 方法處阻塞着,所以我們即將喚醒的哨兵節點后的第一個阻塞線程也是在該處阻塞着,在執行完 unlock() 源碼步驟第3步unparkSuccessor(Node node)中的方法,則將返回到之前阻塞線程的這個方法 parkAndCheckInterrupt()的這行代碼 LockSupport.park(this) 的下一步執行 Thread.interrupted(),因為線程沒有被打斷,所以返回false,故acquireQueued(final Node node, int arg)方法中繼續輪訓再次嘗試acquireQueued(final Node node, int arg)獲取鎖,因為第一個線程已經釋放鎖,所以第二個線程可以獲取鎖了,並在執行完后返回interrupted為false,表示線程不是被中斷的,繼續向上返回,最終lock()方法執行完成(獲取到鎖),可以繼續執行業務代碼。理,其他線程也在parkAndCheckInterrupt()這個方法中中斷着,等待被第二個線程喚醒。

 

總結:

  在第一個 A 線程 lock() 獲取到鎖后,第一個線程會在底層的同步阻塞隊列中設置鎖狀態 state 為1(如果重入鎖多次獲取 state 每次加1),並設置擁有當前鎖的線程為自身A線程,其他線程 B/C/D 來獲取鎖的時候就會比較鎖狀態是否為0,如果不為0,表示已經被獲取了鎖,再次比較獲取鎖的線程是否為自身,如果為自身則對 state 加1(滿足重入鎖的規則),否則就將當前未獲取到鎖的線程放入同步阻塞隊列中,在放入的過程中,需要設置 head 哨兵節點和 tail 尾節點,以及相應的 waitStatus 狀態,並且在放入過程中需要設置當前節點以及先關節點的 prev 和 next 節點,從而達到雙向隊列的效果,存放到阻塞隊列后,線程會被阻塞到這樣一個方法中 parkAndCheckInterrupt(),對於用戶(調用者)來說是執行到.lock()方法被阻塞,等待被喚醒。

  在第一個 A 線程執行完畢,調用 unlock() 解鎖后,unlock() 方法會從同步阻塞隊列的哨兵節點后的第一個節點獲取等待解鎖的線程B,並將其解鎖,然后就會到B阻塞的方法 parkAndCheckInterrupt() 來繼續執行,因為不是被中斷,所以Thread.interrupted()方法為false,返回false后,在acquireQueued(final Node node, int arg)方法的for(;;)處自旋調用tryAcquire(arg)嘗試獲取鎖,因為第一個線程已經釋放鎖,所以第二個線程在自旋獲取鎖的時候能夠成功,代碼繼續執行,使得 B 線程原本在 lock() 方法處阻塞的代碼能夠執行完成繼續向下執行(能夠執行表明獲取到鎖),然后執行業務邏輯。其他線程以此類推,依次發生阻塞和喚醒。

 

根據源碼總結的 lock() 和 unlock() 的原理,歡迎大神批評指正,如有剛好的見解,也歡迎大家相互交流。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM