ReentrantLock實現原理分析


原文出處http://www.yund.tech/zdetail.html?type=1&id=ef94715a2838f06ab03b8621c23d1613    

作者:jstarseven  


 

ReentrantLock主要利用CAS+CLH隊列來實現。它支持公平鎖和非公平鎖,兩者的實現類似。

  • CAS:Compare and Swap,比較並交換。CAS有3個操作數:內存值V、預期值A、要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改為B,否則什么都不做。該操作是一個原子操作,被廣泛的應用在Java的底層實現中。在Java中,CAS主要是由sun.misc.Unsafe這個類通過JNI調用CPU底層指令實現。

  • CLH隊列:帶頭結點的雙向非循環鏈表(如下圖所示):

圖片描述

ReentrantLock的基本實現可以概括為:先通過CAS嘗試獲取鎖。如果此時已經有線程占據了鎖,那就加入CLH隊列並且被掛起。當鎖被釋放之后,排在CLH隊列隊首的線程會被喚醒,然后CAS再次嘗試獲取鎖。在這個時候,如果:

        1.非公平鎖:如果同時還有另一個線程進來嘗試獲取,那么有可能會讓這個線程搶先獲取;

        2. 公平鎖:如果同時還有另一個線程進來嘗試獲取,當它發現自己不是在隊首的話,就會排到隊尾,由隊首的線程獲取到鎖。

ReentrantLock是java concurrent包提供的一種鎖實現。不同於synchronized,ReentrantLock是從代碼層面實現同步的。 
這里寫圖片描述 
圖1 reentrantLock的類層次結構圖

Lock定義了鎖的接口規范。 
ReentrantLock實現了Lock接口。 
AbstractQueuedSynchronizer中以隊列的形式實現線程之間的同步。 
ReentrantLock的方法都依賴於AbstractQueuedSynchronizer的實現。

Lock接口定義了如下方法: 
這里寫圖片描述
圖2 lock接口規范

1、lock()方法的實現 
進入lock()方法,發現其內部調用的是sync.lock();

    public void lock() {
        sync.lock();
    }

sync是在ReentrantLock的構造函數中實現的。其中fair參數的不同可實現公平鎖和非公平鎖。由於在鎖釋放的階段,鎖處於無線程占有的狀態,此時其他線程和在隊列中等待的線程都可以搶占該鎖,從而出現公平鎖和非公平鎖的區別。 
非公平鎖:當鎖處於無線程占有的狀態,此時其他線程和在隊列中等待的線程都可以搶占該鎖。 
公平鎖:當鎖處於無線程占有的狀態,在其他線程搶占該鎖的時候,都需要先進入隊列中等待。 
本文以非公平鎖NonfairSync的sync實例進行分析。

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = (fair)? new FairSync() : new NonfairSync();
    }

由圖1可知,NonfairSync繼承自Sync,因此也繼承了AbstractQueuedSynchronizer中的所有方法實現。接着進入NonfairSync的lock()方法。

 final void lock() {
            // 利用cas置狀態位,如果成功,則表示占有鎖成功
            if (compareAndSetState(0, 1))
                // 記錄當前線程為鎖擁有者
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

在lock方法中,利用cas實現ReentrantLock的狀態置位(cas即compare and swap,它是CPU的指令,因此賦值操作都是原子性的)。如果成功,則表示占有鎖成功,並記錄當前線程為鎖擁有者。當占有鎖失敗,則調用acquire(1)方法繼續處理。

    public final void acquire(int arg) {
        //嘗試獲得鎖,如果失敗,則加入到隊列中進行等待
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire()是AbstractQueuedSynchronizer的方法。它首先會調用tryAcquire()去嘗試獲得鎖,如果獲得鎖失敗,則將當前線程加入到CLH隊列中進行等待。tryAcquire()方法在NonfairSync中有實現,但最終調用的還是Sync中的nonfairTryAcquire()方法。

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 獲得狀態
            int c = getState();
            // 如果狀態為0,則表示該鎖未被其他線程占有
            if (c == 0) {
                // 此時要再次利用cas去嘗試占有鎖
                if (compareAndSetState(0, acquires)) {
                    // 標記當前線程為鎖擁有者
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果當前線程已經占有了,則state + 1,記錄占有次數
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 此時無需利用cas去賦值,因為該鎖肯定被當前線程占有
                setState(nextc);
                return true;
            }
            return false;
        }

在nonfairTryAcquire()中,首先會去獲得鎖的狀態,如果為0,則表示鎖未被其他線程占有,此時會利用cas去嘗試將鎖的狀態置位,並標記當前線程為鎖擁有者;如果鎖的狀態大於0,則會判斷鎖是否被當前線程占有,如果是,則state + 1,這也是為什么lock()的次數要和unlock()次數對等;如果占有鎖失敗,則返回false。 
在nonfairTryAcquire()返回false的情況下,會繼續調用acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,將當前線程加入到隊列中繼續嘗試獲得鎖。

  private Node addWaiter(Node mode) {
        // 創建當前線程的節點
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 如果尾節點不為空
        if (pred != null) {
            // 則將當前線程的節點加入到尾節點之后,成為新的尾節點
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }

        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        // CAS方法有可能失敗,因此要循環調用,直到當前線程的節點加入到隊列中
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                Node h = new Node(); // Dummy header,頭節點為虛擬節點
                h.next = node;
                node.prev = h;
                    if (compareAndSetHead(h)) {
                    tail = node;  
                    return h;
                }
            }
            else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

 

addWaiter()是AbstactQueuedSynchronizer的方法,會以節點的形式來標記當前線程,並加入到尾節點中。enq()方法是在節點加入到尾節點失敗的情況下,通過for(;;)循環反復調用cas方法,直到節點加入成功。由於enq()方法是非線程安全的,所以在增加節點的時候,需要使用cas設置head節點和tail節點。此時添加成功的結點狀態為Node.EXCLUSIVE。 
在節點加入到隊列成功之后,會接着調用acquireQueued()方法去嘗試獲得鎖。

 final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                // 獲得前一個節點
                final Node p = node.predecessor();
                // 如果前一個節點是頭結點,那么直接去嘗試獲得鎖
                // 因為其他線程有可能隨時會釋放鎖,沒必要Park等待
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }

在acquireQueued()方法中,會利用for (;;)一直去獲得鎖,如果前一個節點為head節點,則表示可以直接嘗試去獲得鎖了,因為占用鎖的線程隨時都有可能去釋放鎖並且該線程是被unpark喚醒的CLH隊列中的第一個節點,獲得鎖成功后返回。 
如果該線程的節點在CLH隊列中比較靠后或者獲得鎖失敗,即其他線程依然占用着鎖,則會接着調用shouldParkAfterFailedAcquire()方法來阻塞當前線程,以讓出CPU資源。在阻塞線程之前,會執行一些額外的操作以提高CLH隊列的性能。由於隊列中前面的節點有可能在等待過程中被取消掉了,因此當前線程的節點需要提前,並將前一個節點置狀態位為SIGNAL,表示可以阻塞當前節點。因此該函數在判斷到前一個節點為SIGNAL時,直接返回true即可。此處雖然存在對CLH隊列的同步操作,但由於局部變量節點肯定是不一樣的,所以對CLH隊列操作是線程安全的。由於在compareAndSetWaitStatus(pred, ws, Node.SIGNAL)執行之前可能發生pred節點搶占鎖成功或pred節點被取消掉,因此此處需要返回false以允許該節點可以搶占鎖。 
當shouldParkAfterFailedAcquire()返回true時,會進入parkAndCheckInterrupt()方法。parkAndCheckInterrupt()方法最終調用safe.park()阻塞該線程,以免該線程在等待過程中無線循環消耗cpu資源。至此,當前線程便被park了。那么線程何時被unpark,這將在unlock()方法中進行。 
這里有一個小細節需要注意,在線程被喚醒之后,會調用Thread.interrupted()將線程中斷狀態置位為false,然后記錄下中斷狀態並返回上層函數去拋出異常。我想這樣設計的目的是為了可以讓該線程可以完成搶占鎖的操作,從而可以使當前節點稱為CLH的虛擬頭節點。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park
             */
            return true;

        if (ws > 0) {
            // 如果前面的節點是CANCELLED狀態,則一直提前
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        } 
        return false;
    }

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(false, 0L);
        setBlocker(t, null);
    }

2、unlock()方法的實現 
同lock()方法,unlock()方法依然調用的是sync.release(1)。

 public final boolean release(int arg) {
        // 釋放鎖
        if (tryRelease(arg)) {
            Node h = head;
            // 此處有個疑問,為什么需要判斷h.waitStatus != 0
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

可以看到,tryRelease()方法實現了鎖的釋放,邏輯上即是將鎖的狀態置為0。當釋放鎖成功之后,通常情況下不需要喚醒隊列中線程,因此隊列中總是有一個線程處於活躍狀態。

總結: 
         ReentrantLock的鎖資源以state狀態描述,利用CAS則實現對鎖資源的搶占,並通過一個CLH隊列阻塞所有競爭線程,在后續則逐個喚醒等待中的競爭線程。ReentrantLock繼承AQS完全從代碼層面實現了java的同步機制,相對於synchronized,更容易實現對各類鎖的擴展。同時,AbstractQueuedSynchronizer中的Condition配合ReentrantLock使用,實現了wait/notify的功能。

 

 

 


 -END-


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM