JAVA並發(2)-ReentrantLock的見解


上節,我們講了AQS的阻塞與釋放實現原理,線程間通信(Condition)的原理。這次,我們就講講基於AQS實現的ReentrantLock(重入鎖)。

1. 介紹

ReentrantLock類圖
結合上面的ReentrantLock類圖,ReentrantLock實現了Lock接口,它的內部類Sync繼承自AQS,絕大部分使用AQS的子類需要自定義的方法存在Sync中。而ReentrantLock有公平與非公平的區別,即'是否先阻塞就先獲取資源',它的主要實現就是FairSyncNonfairSync,后面會從源碼角度看看它們的區別。

2. 源碼剖析

SyncReentrantLock控制同步的基礎。它的子類分為了公平與非公平。使用AQSstate代表獲取鎖的數量

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();

        ...
    }

我們可以看出內部類Sync是一個抽象類,繼承它的子類(FairSyncNonfairSync)需要實現抽象方法lock

下面我們先從非公平鎖的角度來看看獲取資源與釋放資源的原理
故事就從就兩個變量開始:

    // 獲取一個非公平的獨占鎖
    /**
    * public ReentrantLock() {
    *    sync = new ReentrantLock.NonfairSync();
    * }
    */
    private Lock lock = new ReentrantLock();
    // 獲取條件變量
    private Condition condition = lock.newCondition();

2.1 上鎖(獲取資源)

   lock.lock()
    public void lock() {
        sync.lock();
    }
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        // 獲取資源
        final void lock() {
            // 若此時沒有線程獲取到資源,直接設置當前線程獨占訪問資源。
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // AQS的方法
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            // 實現在父類Sync中
            return nonfairTryAcquire(acquires);
        }
    }

AQSacquire

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

上一節已經剖析過AQSacquire的整個流程了,就差子類如何去實現tryAcquire了。

  // Sync實現的非公平的tryAcquire
  final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 此時若沒有線程獲取到資源,當前線程就直接占用該資源
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 若當前線程已經占用了該資源,可以再次獲取該資源  ->這個行為就是可重入鎖的支撐
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

嘗試獲取資源的過程是非常簡單的,這里再貼一下acquire的流程圖
acquire流程圖

2.2 釋放資源

    lock.unlock();
    public void unlock() {
        // AQS的方法
        sync.release(1);
    }

AQSrelease

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

release的流程已經剖析過了,接下來看看tryRelease的實現

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            // 這里可以看出若沒有持有鎖,就釋放資源,就會報錯
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

tryRelease的實現也很簡單,這里再貼一下release的流程圖
release流程圖

2.3 公平鎖與非公平鎖的區別

公平鎖與非公平鎖,即'是否先阻塞就先獲取資源', ReentrantLock公平與否的控制就在tryAcquire中。下面我們看看,公平鎖的tryAcquire

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // (2.3.1)
                // sync queue中是否存在前驅結點
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

區別在代碼(2.3.1)
hasQueuedPredecessors

判斷當前線程的前面有無其他線程排隊;若當前線程在隊列頭部或者隊列為空返回false

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

結合下面的入隊代碼(enq), 我們分析hasQueuedPredecessorstrue的情況:

  1. h != t ,表示此時queue不為空; (s = h.next) == null, 表示另一個結點已經運行了下面的步驟(2),還沒來得及運行步驟(3)。簡言之,就是B線程想要獲取鎖的同時,A線程獲取鎖失敗剛好在入隊(B入隊的同時,之前占有的資源的線程,剛好釋放資源)
  2. h != t(s = h.next) != null,表示此時至少有一個結點在sync queue中;s.thread != Thread.currentThread(),這個情況比較復雜,設想一下有這三個結點 A -> B CA此時獲取到資源,而B此時因為獲取資源失敗正在sync queue阻塞,C還沒有獲取資源(還沒有執行tryAcquire)。
    時刻一:A釋放資源成功后(執行tryRelease成功),B此時還沒有成功獲取資源(C執行s = h.next時,B還在sync queue中且是老二)
    時刻二: C此時執行hasQueuedPredecessorss.thread != Thread.currentThread()成立,此時s.thread表示的是B
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node())) // (1) 第一次初始化
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) { // (2) 設置queue的tail
                    t.next = node; // (3)
                    return t;
                }
            }
        }
    }

Note that 1. because cancellations due to interrupts and timeouts may occur at any time, a true return does not guarantee that some other thread will acquire before the current thread(虛假true). 2. Likewise, it is possible for another thread to win a race to enqueue after this method has returned false, due to the queue being empty(虛假false).

這位大佬hasQueuedPredecessors進行詳細的分析,他文中解釋了虛假true以及虛假false。我這里簡單解釋一下:

  1. 虛假true, 當兩個線程都執行tryAcquire,都執行到hasQueuedPredecessors,都返回true,但是只有一個線程執行compareAndSetState(0, acquires)成功
  2. 虛假false,當一個線程A執行doAcquireInterruptibly,發生了中斷,還沒有清除掉該結點時;此時,線程B執行hasQueuedPredecessors時,返回true

3. 總結

本文介紹了利用AQS實現的鎖ReetrantLock

  • 講解了tryReleasetryAcquire的實現原理
  • 說了說鎖的公平與否的實現,是否在意當前線程是否有其他線程排隊
  • 分析了一下hasQueuedPredecessors的幾種情況

4. 參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM