JAVA AQS源碼分析


JAVA AQS的全稱為(AbstractQueuedSynchronizer),用於JAVA多線程的開發,從名稱我們也可以看出,它實現了同步的隊列,而這個隊列是指線程隊列。AQS類在java.util.concurrent.locks下面。

AQS和CAS作為JAVA5之后非常重要的特性,能在並發應用中提高程序性能,具體要就實際情況使用,因為JVM也在一直優化synchronized關鍵字,在JAVA7之后其性能也趨於穩定,不會隨着線程數增加而導致性能驟降(具體可以取網上搜索對比數據)。

總之,一般情況下還是建議用synchronized

CAS(CompareAndSet)是最小粒度的操作,保證了原子性,通過硬件指令集實現。簡單來說,CAS有3個操作數,內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改為B,否則返回V。

基於此,我們才能完成非阻塞同步操作(當然還有一些其他原子命令,例如FAI,LL/SC等),目的就是用樂觀鎖來換取性能的提升。

為什么要說CAS呢?因為AQS也是基於CAS實現的,下面進入正題,我們通過源碼來具體分析下AQS的實現:

--------------------------------------------------------------------------------------------------------------------------------------------------------------------

 AQS的包結構如下                            

繼承的子類有

我們從ReentrantLock來分析AQS的實現原理:

ReentrantLock是一個可重入的排他鎖。可重入指當前擁有鎖的對象可重復進入同步區域,防止重復操作,例如網頁登陸時重復點擊按鈕。排他鎖就是指該鎖(Lock)是互斥鎖,只允許一個對象擁有鎖。

先看看ReentrantLock的類結構

可以看到有3個內部類Sync,NonfairSync和FairSync。其中NonfairSync和FairSync均繼承Sync,而Sync繼承了AQS。NonfairSync和FairSync的區別在於當一個線程釋放了鎖的時候,隊列里的其他線程是否按照FIFO的規則去獲取鎖的。

換句話說,FairSync能夠保證先到的線程先拿到鎖(有一個特殊情況,就是隊列里的線程在unpark到獲取鎖的過程中有新的還未加入到隊列中的線程獲取到鎖,不過這種情況發生的概率很小,基本不用考慮),而NonfairSync不保證

 

下面我們看看ReentrantLock是如何實現上鎖的,這是lock函數:

    public void lock() {
        sync.lock();
    }

 

 sync的lock函數為抽象的,由子類實現,這里只給出FairSync的實現

    final void lock() {
        acquire(1);
    }

 

acquire就是AQS提供的接口

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

 

這里涉及到4個新方法,讓我們慢慢來分析:

首先,acquire有一個參數arg,是用於判斷鎖持有的次數,也就是重入的次數,當鎖持有者需要釋放鎖的時候,則要將鎖的state減去arg的值。在上面可以看到,調用acquire時傳入的參數為1。

1. tryAcquire:AQS里tryAcquire的實現如下

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

明顯是不對的,只拋出了一個異常,所以應該是子類覆蓋了,那么看看FairSync的實現

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

getState()是從ReentrantLock獲取的狀態,表示鎖當前是否被持有,為0時表示沒有線程持有鎖。此時當前線程會去爭取鎖的持有權。

首先判斷隊列中是否有排在當前線程之前的線程,有的話放棄爭搶鎖。hasQueuedPredecessors是AQS里的方法:

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

tail記錄隊列里的尾節點,head就是頭結點。這個函數就是判斷當前線程是否是下一個可持鎖的線程

然后,當當前線程滿足條件時,就通過CAS設置c的值,同時通過AbstractOwnableSynchronizer類提供的setExclusiveOwnerThread接口將當前線程鎖住,用於防止volatile字段被其他線程修改(這里是看注釋后的個人理解)

如果c != 0,同時鎖持有者為當前線程,那么這個請求就是重入請求了,將c+=acquire。如果上訴兩個條件都不滿足,返回false。

2. addWaiter:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

根據注釋可以知道,首先快速插入隊列,其實就是判斷隊列是否為空,否則就要通過enq(node)將當前節點添加至隊列中,為什么這樣會慢些?我們看看enq函數

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

這個方法可以好好看看:用for無限循環,考慮下多線程同時加入隊列,通過CAS保證設置當前節點到尾節點。這里可能隊列里的線程已經執行完任務釋放鎖了,所以還需要重新判斷隊列是否為空,因此其執行效率當然會有影響。

否則,設置前置節點,加入隊列。用圖例來幫助理解:

由於本身沒有鎖,可以有多個線程進來,如果有多個線程並發進入這個if判定區域,可能就會同時存在多個這樣的數據結構,在各自形成數據結構后,多個線程都會去做compareAndSetHead(h)的動作,也就是嘗試將這個臨時h節點設置為head,

顯然並發時只有一個線程會成功,因此成功的那個線程會執行tail = node的操作,整個AQS的鏈表就成為:

3. acquireQueued:節點加入隊列之后,就通過該函數去等待獲取鎖

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

同樣是for無限循環,判斷當前節點之前是否沒有線程節點了,如果是,就去爭搶鎖。用上面給出的子類tryAcquire函數,成功的話設置相關參數,這里也解釋了釋放指針,幫助垃圾回收(GC)。

如果爭搶失敗,判斷是否需要阻塞當前線程

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

waitState參數有以下幾個:

  1. SIGNAL:當前線程持有鎖
  2. CANCELLED:超時獲取被中斷
  3. CONDITION
  4. PROPAGATE
  5. 0:不屬於以上任何一種情況

后面兩個沒怎么看,大致就是需要滿足多個條件,PROPAGATE用於shared鎖隊列。

回到上面的方法:因為AQS支持中斷等待,所以如果線程中斷了爭搶鎖(CANCELLED),那么就不需要阻塞,直接返回。acquireQueued方法沒有返回,而是設置一個interrupt參數為true而已,線程爭搶鎖失敗的話繼續休眠等待,而AQS里的doAcquireInterruptibly()發現爭搶失敗的話就直接throw new InterruptedException()。在ReentrantLock里需要調用ReentrantLock.lockInterruptibly()就會實現中斷返回。否則AQS會嘗試將當前線程狀態設置成SIGNAL,失敗就循環繼續嘗試

下面是parkAndCheckInterrupt

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

LockSupport.park通過Unsafe.park實現阻塞,它會設置一個AQS的blocker,讓隊列里的線程阻塞在一個地方,然后返回線程中斷的判斷

4. selfInterrupt

    private static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

acquireQueued會返回boolean值表示線程是否中斷,如果未中斷,就調用Thread.interrupt()中斷線程

---------------------------------------------------------------------------------------------------------------------

以上是鎖的實現原理,當tryAcquire()成功之后,線程獲取鎖,執行任務,執行完畢之后,會調用AQS的release方法:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease是子類Sync的實現:

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

其他就不多說了,比較簡單明晰。就看看unparkSuccessor怎么做的

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

通過CAS修改節點的waitStatus,然后將后續節點去除,這里會去遍歷后續節點,判斷其是否狀態為CANCELLED,將所有非CANCELLED的節點喚醒。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM