ReentrantLock——可重入鎖的實現原理


一、 概述

  本文首先介紹Lock接口、ReentrantLock的類層次結構以及鎖功能模板類AbstractQueuedSynchronizer的簡單原理,然后通過分析ReentrantLock的lock方法和unlock方法,來解釋ReentrantLock的內部原理,最后做一個總結。本文不涉及ReentrantLock中的條件變量。

  Lock接口,是對控制並發的工具的抽象。它比使用synchronized關鍵詞更靈活,並且能夠支持條件變量。它是一種控制並發的工具,一般來說,它控制對某種共享資源的獨占。也就是說,同一時間內只有一個線程可以獲取這個鎖並占用資源。其他線程想要獲取鎖,必須等待這個線程釋放鎖。在Java實現中的ReentrantLock就是這樣的鎖。另外一種鎖,它可以允許多個線程讀取資源,但是只能允許一個線程寫入資源,ReadWriteLock就是這樣一種特殊的鎖,簡稱讀寫鎖。下面是對Lock接口的幾個方法的總體描述:

方法名稱 描述
lock 獲取鎖,如果鎖無法獲取,那么當前的線程就變為不可被調度,直到鎖被獲取到
lockInterruptibly 獲取鎖,除非當前線程被中斷。如果獲取到了鎖,那么立即返回,如果獲取不到,那么當前線程變得不可被調度,一直休眠直到下面兩件事情發生:

 

1、當前線程獲取到了鎖

2、其他的線程中斷了當前的線程

tryLock 如果調用的時候能夠獲取鎖,那么就獲取鎖並且返回true,如果當前的鎖無法獲取到,那么這個方法會立刻返回false
tryLcok(long time,TimeUnit unit) 在指定時間內嘗試獲取鎖如果可以獲取鎖,那么獲取鎖並且返回true,如果當前的鎖無法獲取,那么當前的線程變得不可被調度,直到下面三件事之一發生:

 

1、當前線程獲取到了鎖

2、當前線程被其他線程中斷

3、指定的等待時間到了

 

unlock 釋放當前線程占用的鎖
newCondition 返回一個與當前的鎖關聯的條件變量。在使用這個條件變量之前,當前線程必須占用鎖。調用Condition的await方法,會在等待之前原子地釋放鎖,並在等待被喚醒后原子的獲取鎖

  接下來,我們將圍繞lock和unlock這兩個方法,來介紹整個ReentrantLock是怎么工作的。在介紹ReentrantLock之前,我們首先來看一下ReentrantLock的類層次結構以及和它密切相關的AbstractQueuedSynchronizer。

1.2、ReentrantLock類層次結構

  ReentrantLock實現了Lock接口,內部有三個內部類,Sync、NonfairSync、FairSync,Sync是一個抽象類型,它繼承AbstractQueuedSynchronizer,這個AbstractQueuedSynchronizer是一個模板類,它實現了許多和鎖相關的功能,並提供了鈎子方法供用戶實現,比如tryAcquire,tryRelease等。Sync實現了AbstractQueuedSynchronizer的tryRelease方法。NonfairSync和FairSync兩個類繼承自Sync,實現了lock方法,然后分別公平搶占和非公平搶占針對tryAcquire有不同的實現。

1.3、AbstractQueuedSynchronizer

  首先,AbstractQueuedSynchronizer繼承自AbstractOwnableSynchronizer,AbstractOwnableSynchronizer的實現很簡單,它表示獨占的同步器,內部使用變量  exclusiveOwnerThread表示獨占的線程。

  其次,AbstractQueuedSynchronizer內部使用CLH鎖隊列來將並發執行變成串行執行。整個隊列是一個雙向鏈表。每個CLH鎖隊列的節點,會保存前一個節點和后一個節點的引用,當前節點對應的線程,以及一個狀態。這個狀態用來表明該線程是否應該block。當節點的前一個節點被釋放的時候,當前節點就被喚醒,成為頭部。新加入的節點會放在隊列尾部。

二、 非公平鎖的lock方法

2.1、lock方法流程圖

 

 

2.2、lock方法詳細描述

  1、在初始化ReentrantLock的時候,如果我們不傳參數是否公平,那么默認使用非公平鎖,也就是NonfairSync。

  2、當我們調用ReentrantLock的lock方法的時候,實際上是調用了NonfairSync的lock方法,這個方法先用CAS操作,去嘗試搶占該鎖。如果成功,就把當前線程設置在這個鎖上,表示搶占成功。如果失敗,則調用acquire模板方法,等待搶占。代碼如下:

static final class NonfairSync extends Sync {
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
 
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
}

  3、調用acquire(1)實際上使用的是AbstractQueuedSynchronizer的acquire方法,它是一套鎖搶占的模板,總體原理是先去嘗試獲取鎖,如果沒有獲取成功,就在CLH隊列中增加一個當前線程的節點,表示等待搶占。然后進入CLH隊列的搶占模式,進入的時候也會去執行一次獲取鎖的操作,如果還是獲取不到,就調用LockSupport.park將當前線程掛起。那么當前線程什么時候會被喚醒呢?當持有鎖的那個線程調用unlock的時候,會將CLH隊列的頭節點的下一個節點上的線程喚醒,調用的是LockSupport.unpark方法。acquire代碼比較簡單,具體如下:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

  3.1、acquire方法內部先使用tryAcquire這個鈎子方法去嘗試再次獲取鎖,這個方法在NonfairSync這個類中其實就是使用了nonfairTryAcquire,具體實現原理是先比較當前鎖的狀態是否是0,如果是0,則嘗試去原子搶占這個鎖(設置狀態為1,然后把當前線程設置成獨占線程),如果當前鎖的狀態不是0,就去比較當前線程和占用鎖的線程是不是一個線程,如果是,會去增加狀態變量的值,從這里看出可重入鎖之所以可重入,就是同一個線程可以反復使用它占用的鎖。如果以上兩種情況都不通過,則返回失敗false。代碼如下:

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

  3.2、tryAcquire一旦返回false,就會則進入acquireQueued流程,也就是基於CLH隊列的搶占模式:

  3.2.1、首先,在CLH鎖隊列尾部增加一個等待節點,這個節點保存了當前線程,通過調用addWaiter實現,這里需要考慮初始化的情況,在第一個等待節點進入的時候,需要初始化一個頭節點然后把當前節點加入到尾部,后續則直接在尾部加入節點就行了。

private Node addWaiter(Node mode) {
        // 初始化一個節點,這個節點保存當前線程
        Node node = new Node(Thread.currentThread(), mode);
        // 當CLH隊列不為空的視乎,直接在隊列尾部插入一個節點
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 當CLH隊列為空的時候,調用enq方法初始化隊列
        enq(node);
        return node;
}
 
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // 初始化節點,頭尾都指向一個空節點
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {// 考慮並發初始化
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
}

  3.2.2、將節點增加到CLH隊列后,進入acquireQueued方法。

  首先,外層是一個無限for循環,如果當前節點是頭節點的下個節點,並且通過tryAcquire獲取到了鎖,說明頭節點已經釋放了鎖,當前線程是被頭節點那個線程喚醒的,這時候就可以將當前節點設置成頭節點,並且將failed標記設置成false,然后返回。至於上一個節點,它的next變量被設置為null,在下次GC的時候會清理掉。

如果本次循環沒有獲取到鎖,就進入線程掛起階段,也就是shouldParkAfterFailedAcquire這個方法。

  代碼如下:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

  3.2.3、如果嘗試獲取鎖失敗,就會進入shouldParkAfterFailedAcquire方法,會判斷當前線程是否掛起,如果前一個節點已經是SIGNAL狀態,則當前線程需要掛起。如果前一個節點是取消狀態,則需要將取消節點從隊列移除。如果前一個節點狀態是其他狀態,則嘗試設置成SIGNAL狀態,並返回不需要掛起,從而進行第二次搶占。完成上面的事后進入掛起階段。

  代碼如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //
            return true;
        if (ws > 0) {
            //
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

  3.2.4、當進入掛起階段,會進入parkAndCheckInterrupt方法,則會調用LockSupport.park(this)將當前線程掛起。代碼:

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
}

三、 非公平鎖的unlock方法

3.1、unlock方法的活動圖

3.2、unlock方法詳細描述

  1、調用unlock方法,其實是直接調用AbstractQueuedSynchronizer的release操作。

  2、進入release方法,內部先嘗試tryRelease操作,主要是去除鎖的獨占線程,然后將狀態減一,這里減一主要是考慮到可重入鎖可能自身會多次占用鎖,只有當狀態變成0,才表示完全釋放了鎖。

  3、一旦tryRelease成功,則將CHL隊列的頭節點的狀態設置為0,然后喚醒下一個非取消的節點線程。

  4、一旦下一個節點的線程被喚醒,被喚醒的線程就會進入acquireQueued代碼流程中,去獲取鎖。

  具體代碼如下:

  unlock代碼:

public void unlock() {
        sync.release(1);
}

  release方法代碼:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

  Sync中通用的tryRelease方法代碼:

protected final boolean tryRelease(int releases) {
     int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
          free = true;
          setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
 }

  unparkSuccessor代碼:

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null) 
            LockSupport.unpark(s.thread);
}

四、 公平鎖和非公平鎖的區別

  公平鎖和非公平鎖,在CHL隊列搶占模式上都是一致的,也就是在進入acquireQueued這個方法之后都一樣,它們的區別在初次搶占上有區別,也就是tryAcquire上的區別,下面是兩者內部調用關系的簡圖:

NonfairSync
lock —> compareAndSetState
                | —> setExclusiveOwnerThread
      —> accquire
             | —> tryAcquire
                           |—>nonfairTryAcquire
                |—> acquireQueued

FairSync
lock —> acquire
               | —> tryAcquire
                           |—>!hasQueuePredecessors
                           |—>compareAndSetState
                           |—>setExclusiveOwnerThread
               |—> acquireQueued

  真正的區別就是公平鎖多了hasQueuePredecessors這個方法,這個方法用於判斷CHL隊列中是否有節點,對於公平鎖,如果CHL隊列有節點,則新進入競爭的線程一定要在CHL上排隊,而非公平鎖則是無視CHL隊列中的節點,直接進行競爭搶占,這就有可能導致CHL隊列上的節點永遠獲取不到鎖,這就是非公平鎖之所以不公平的原因。

五、 總結

  線程使用ReentrantLock獲取鎖分為兩個階段,第一個階段是初次競爭,第二個階段是基於CHL隊列的競爭。在初次競爭的時候是否考慮隊列節點直接區分出了公平鎖和非公平鎖。在基於CHL隊列的鎖競爭中,依靠CAS操作保證原子操作,依靠LockSupport來做線程的掛起和喚醒,使用隊列來保證並發執行變成了串行執行,從而消除了並發所帶來的問題。總體來說,ReentrantLock是一個比較輕量級的鎖,而且使用面向對象的思想去實現了鎖的功能,比原來的synchronized關鍵字更加好理解。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM