ReentrantLock是如何基於AQS實現的


ReentrantLock是一個可重入的互斥鎖,基於AQS實現,它具有與使用 synchronized 方法和語句相同的一些基本行為和語義,但功能更強大。

lock和unlock

ReentrantLock 中進行同步操作都是從lock方法開始。lock獲取鎖,進行一系列的業務操作,結束后使用unlock釋放鎖。

    private final ReentrantLock lock = new ReentrantLock();
    public void sync(){
        lock.lock();
        try {
            // ... method body
        } finally {
          lock.unlock()
        }
    }

lock

ReentrantLock 中lock的實現是通過調用AQS的AbstractQueuedSynchronizer#acquire方法實現。

    public final void acquire(int arg) {
        //嘗試獲取鎖
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

根據之前介紹的模板方法模式,對於鎖的獲取tryAcquire是在ReentrantLock中實現的。而非公平鎖中的實際實現方法為nonfairTryAcquire。

ReentrantLock#nonfairTryAcquire

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
 
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}   

在獲取鎖的邏輯中首先是嘗試以cas方式獲取鎖,如果獲取失敗則表示鎖已經被線程持有。

再判斷持有該鎖的線程是否為當前線程,如果是當前線程就將state的值加1,在釋放鎖是也需要釋放多次。這就是可重入鎖的實現。

如果持有鎖的線程並非當前線程則這次加鎖失敗,返回false。加鎖失敗后將調用AbstractQueuedSynchronizer#acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

首先會調用addWaiter方法將該線程入隊。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

mode是指以何種模式的節點入隊,這里傳入的是Node.EXCLUSIVE(獨占鎖)。首先將當前線程包裝為node節點。然后判斷等待隊列的尾節點是否為空,如果不為空則通過cas的方式將當前節點接在隊尾。如果tail為空則執行enq方法。

AbstractQueuedSynchronizer#enq

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq方法通過for(;;)無限循環的方式將node節點設置到等待隊列的隊尾(隊列為空時head和tail都指向當前節點)。

綜上可知addWaiter方法的作用是將競爭鎖失敗的節點放到等待隊列的隊尾。

等待隊列中的節點也並不是什么都不做,這些節點也會不斷的嘗試獲取鎖,邏輯在acquireQueued中實現。

AbstractQueuedSynchronizer#acquireQueued

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

可以看到該方法也是使用for(;;)無限循環的方式來嘗試獲取鎖。首先判斷當前節點是否為頭結點的下一個節點,如果是則再次調用tryAcquire嘗試獲取鎖。當然這個過程並不是一定不停進行的,這樣的話多線程競爭下cpu切換也極耗費資源。

shouldParkAfterFailedAcquire會判斷是否對當前節點進行阻塞,阻塞之后只有當unpark后節點才會繼續假如爭奪鎖的行列。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

判斷一個節點是否需要被阻塞是通過該節點的前繼節點的狀態判斷的。

如果前繼節點狀態為singal,則表示前繼節點還在等待,當前節點需要繼續被阻塞。返回true。

如果前繼節點大於0,則表示前繼節點為取消狀態。取消狀態的節點不參與鎖的競爭,直接跳過。返回false。

如果前繼節點時其他狀態(0,PROPAGATE),不進行阻塞,表示當前節點需要重試嘗試獲取鎖。返回false。

shouldParkAfterFailedAcquire方法如果返回true,表示需要將當前節點阻塞,阻塞方法為parkAndCheckInterrupt。

AbstractQueuedSynchronizer#parkAndCheckInterrupt

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

阻塞是通過LockSupport進行阻塞,被阻塞的節點不參與鎖的競爭(不在進行循環獲取鎖),只能被unpark后才繼續競爭鎖。

而被阻塞的節點要被釋放則依賴於unlock方法。

unlock

ReentrantLock 中unlock的實現是通過調用AQS的AbstractQueuedSynchronizer#release方法實現。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

release調用tryRelease方法,tryRelease是在ReentrantLock中實現。

ReentrantLock#tryRelease

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

tryRelease方法邏輯很簡單,首先減去releases(一般為1)表示釋放一個鎖,如果釋放后state=0表示釋放鎖成功,后續等待的節點可以獲取該鎖了。如果state!=0則表示該鎖為重入鎖,需要多次釋放。

當釋放鎖成功后(state=0),會對頭結點的后繼節點進行unpark。

AbstractQueuedSynchronizer#unparkSuccessor

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
    }

unparkSuccessor見名知意適用於接觸后面節點的阻塞狀態。整個方法的邏輯就是找到傳入節點的后繼節點,將其喚醒(排除掉狀態為cancel即waitStatus > 0的節點)。

公平鎖和非公平鎖

ReentrantLock的構造方法接受一個可選的公平參數。當設置為 true 時,在多個線程的競爭時,傾向於將鎖分配給等待時間最長的線程。

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

在多個鎖競爭統一資源的環境下,AQS維護了一個等待隊列,未能獲取到鎖的線程都會被掛到該隊列中。如果使用公平鎖則會從隊列的頭結點開始獲取該資源。

而根據代碼在公平鎖和非公平鎖的實現的差別僅僅在於公平鎖多了一個檢測的方法。

公平鎖

protected final boolean tryAcquire(int acquires) {
    //...
    if (c == 0) {
        if (!hasQueuedPredecessors() //!hasQueuedPredecessors()便是比非公平鎖多出來的操作
        && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //...
    return false;
}

hasQueuedPredecessors()

public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

方法邏輯很簡單,就是如果等待隊列還有節點並且排在首位的不是當前線程所處的節點返回true表示還有等待更長時間的節點。需要等這部分節點獲取資源后才能獲取。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM