AQS原理及應用


 To use this class as the basis of a synchronizer, redefine the
 * following methods, as applicable, by inspecting and/or modifying
 * the synchronization state using {@link #getState}, {@link
 * #setState} and/or {@link #compareAndSetState}:
 *
 * <ul>
 * <li> {@link #tryAcquire}
 * <li> {@link #tryRelease}
 * <li> {@link #tryAcquireShared}
 * <li> {@link #tryReleaseShared}
 * <li> {@link #isHeldExclusively}
 * </ul>

 上面這段話是AQS源碼的一段注解,意思是使用AQS實現一個同步器的話需要覆蓋實現上面li標簽中的這些方法,並且使用getState、setState、compareAndSetState這幾個方法來對狀態進行操作。

如果你對JDK源碼較為熟悉的話,你會發現AQS(AbstractQueuedSynchronizer)是並發過程中很常見的一個抽象類,我們常用的CountDownLatch、ReentrantLock、FutureTask(1.8不再使用AQS)、Semaphore等類都是在內部定義了一個叫做Sync的內部類,而Sync類繼承了AQS抽象類並重寫了一些必要的方法。可重入鎖ReentrantLock中,state可以用來表示當前線程獲取鎖的可重入次數;對於讀寫鎖ReentrantReadWriteLock來說,state的高16位表示獲取到的讀鎖的線程的可重入次數,低16位是寫鎖;對於Semaphore來說,state用來表示當前可用信號的個數;對於CountDownlatch來說,state用來表示計數器當前的值

 abstract static class Sync extends AbstractQueuedSynchronizer {
    ...
    ...
    ...
}

下面說說AQS到底是用來做什么的:

AQS抽象類的內部是一個final的Node類,類中定義了一個volatile的整形狀態量state,通過這個Node類來建立一個FIFO的線程等待隊列(多線程搶占資源失敗被阻塞時會進入此隊列)。

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

而這個隊列中的每個結點對應一個線程,結點內封裝了這個線程的基本信息,狀態和等待的資源類型等。

     * <p>To enqueue into a CLH lock, you atomically splice it in as new
     * tail. To dequeue, you just set the head field.
     * <pre>
     *      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      +------+       +-----+       +-----+
     * </pre>
     *
     * <p>Insertion into a CLH queue requires only a single atomic
     * operation on "tail", so there is a simple atomic point of
     * demarcation from unqueued to queued. Similarly, dequeuing
     * involves only updating the "head". However, it takes a bit
     * more work for nodes to determine who their successors are,
     * in part to deal with possible cancellation due to timeouts
     * and interrupts.

這里借用網上的一張圖來說明這個隊列的框架。

AQS定義了兩種資源共享的方式,獨占和共享,Exclusive和Share,前者常用的是ReentrantLock,后者常用的是前面提到的CountDownLatch、Semaphore等。

繼承AQS,實現state標志位的獲取、釋放方式,即可實現自定義同步器,至於具體的等待隊列的維護、阻塞入隊、喚醒出隊等在AQS中已經實現好了。標志位涉及到的實現函數如下:

  • isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現它。
  • tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回true,否則返回false。

 自定義同步器

獨占鎖以ReentrantLock為例,state初始值為0,表示未鎖狀態,當一個線程執行了lock()方法之后,會調用tryAcquire方法獨占該鎖並將state+1,此后其他線程嘗試獲取鎖時便會失敗,進入隊列,知道剛才的線程釋放鎖將state值更改為0時,其他線程才有機會獲取這個獨占鎖。當然,這個鎖時可重入的,獲得鎖的線程可以繼續state+1,不過只有state為0的時候其他線程才會喚醒。

共享鎖以CountDownLatch為例,初始化線程數為n,及將state初始值設置為n,表示可以有n個線程同時獲取這個共享鎖,當有線程執行一countDown(),state就會CAS的方式減少1,知道所有線程執行完,state值為0的時候,會unpark主線程,然后主線程會從await狀態被喚醒返回,繼續執行其他指令。

一般情況自定義同步器要么是獨占方法,要么是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。需要說明的是AQS也支持同時實現獨占同步和共享同步,比如ReentrantReadWriteLock。


通常,如果一個線程沒有拿到鎖,便會被封裝成結點加入隊尾,檢查狀態之后會調用park方法進入waiting狀態,等待unpark方法或interrupt方法喚醒自己,當它被喚醒之后會檢查自己是否有資格拿到鎖,如果成功,head會指向當前節點,並檢查是否被中斷過;否則會繼續等待。

AQS是支持中斷的,比如acquireInterruptibly方法和acquireSharedInterruptibly方法。下面將書上看到的互斥鎖Mutex源碼貼在下面供大家研究,它不支持重入,只有0(未鎖定)和1(鎖定)兩種狀態:

class Mutex implements Lock, java.io.Serializable {
    // 自定義同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 判斷是否鎖定狀態
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 嘗試獲取資源,立即返回。成功則返回true,否則false。
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // 這里限定只能為1個量
            if (compareAndSetState(0, 1)) {//state為0才設置為1,不可重入!
                setExclusiveOwnerThread(Thread.currentThread());//設置為當前線程獨占資源
                return true;
            }
            return false;
        }

        // 嘗試釋放資源,立即返回。成功則為true,否則false。
        protected boolean tryRelease(int releases) {
            assert releases == 1; // 限定為1個量
            if (getState() == 0)//既然來釋放,那肯定就是已占有狀態了。只是為了保險,多層判斷!
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);//釋放資源,放棄占有狀態
            return true;
        }
    }

    // 真正同步類的實現都依賴繼承於AQS的自定義同步器!
    private final Sync sync = new Sync();

    //lock<-->acquire。兩者語義一樣:獲取資源,即便等待,直到成功才返回。
    public void lock() {
        sync.acquire(1);
    }

    //tryLock<-->tryAcquire。兩者語義一樣:嘗試獲取資源,要求立即返回。成功則為true,失敗則為false。
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    //unlock<-->release。兩者語文一樣:釋放資源。
    public void unlock() {
        sync.release(1);
    }

    //鎖是否占有狀態
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM