AQS詳解,並發編程的半壁江山


千呼萬喚始出來,終於寫到AQS這個一章了,其實為了寫這一章,前面也是做了很多的鋪墊,比如之前的

深度理解volatile關鍵字 線程之間的協作(等待通知模式) JUC 常用4大並發工具類 CAS 原子操作 顯示鎖 了解LockSupport工具類

這些文章其實都是為了讓大家理解AQS而寫的鋪墊,就像吃東西需要一口一口的吃一樣

AQS概述及其實現類:

  AQS,是AbstractQuenedSynchronizer的縮寫,中文名稱為抽象的隊列式同步器,是java並發編程這一塊的半壁江山,這個類存在於在java.util.concurrent.locks包,AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現都依賴於它,比如之前寫的顯示鎖ReentrantLock,,讀寫鎖ReentrantReadWriteLock,JUC的四大並發工具類中的Semaphore,CountDownLatch,線程池暫時還沒寫之后再寫

 

 

 在JDK1.7之前,FutureTask,應該也是繼承了AQS來實現的,但是1.8之后就改變了

 

 

 但是實現思想應該沒有太大改變,,所以說AQS是並發編程的半壁江山

 

核心思想:

  如果被請求的共享資源空閑,則將當前請求資源的線程設置為有效的工作線程,並將共享資源設置為鎖定狀態,如果被請求的共享資源被占用,那么就需要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH隊列鎖實現的,即將暫時獲取不到鎖的線程加入到隊列中。

CLH(Craig,Landin,and Hagersten)隊列是一個虛擬的雙向隊列,虛擬的雙向隊列即不存在隊列實例,僅存在節點之間的關聯關系。
AQS是將每一條請求共享資源的線程封裝成一個CLH鎖隊列的一個結點(Node),來實現鎖的分配。

其實在我理解來說,AQS就是基於CLH隊列,用volatile修飾共享變量state,來保證變量的可見性,線程通過CAS去改變狀態符,保證狀態的原子性,成功則獲取鎖成功,失敗則進入等待隊列,等待被喚醒。

注意:AQS是自旋鎖:在等待喚醒的時候,經常會使用自旋(while(!cas()))的方式,不停地嘗試獲取鎖,直到被其他線程獲取成功

 

框架:

 

 

通過這個圖得知,AQS維護了一個volatile int state和一個FIFO線程等待隊列,多線程爭用資源被阻塞的時候就會進入這個隊列。state就是共享資源,其訪問方式有如下三種:
getState();setState();compareAndSetState();

AQS 定義了兩種資源共享方式:
1.Exclusive:獨占,只有一個線程能執行,如ReentrantLock
2.Share:共享,多個線程可以同時執行,如Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier

不同的自定義的同步器爭用共享資源的方式也不同。

AQS底層使用了模板方法模式

同步器的設計是基於模板方法模式的,如果不了解的可以去看看模板方法設計模式,之前在寫設計模式的六大設計原則的時候也說了,看看設計模式有助於理解源碼,如果需要自定義同步器一般的方式是這樣:

  1. 使用者繼承AbstractQueuedSynchronizer並重寫指定的方法。
  2. 將AQS組合在自定義同步組件的實現中,並調用其模板方法,而這些模板方法會調用使用者重寫的方法,就類似於我定義了一個骨架,你填充東西一樣

自定義同步器在實現的時候只需要實現共享資源state的獲取和釋放方式即可,至於具體線程等待隊列的維護,AQS已經在頂層實現好了。自定義同步器實現的時候主要實現下面幾種方法:

  • isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現它。
  • tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回true,否則返回false。

  以ReentrantLock為例,state初始化為0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨占該鎖並將state+1。此后,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機會獲取該鎖。當然,釋放鎖之前,A線程自己是可以重復獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多么次,這樣才能保證state是能回到零態的。

  再以CountDownLatch以例,任務分為N個子線程去執行,state也初始化為N(注意N要與線程個數一致)。這N個子線程是並行執行的,每個子線程執行完后countDown()一次,state會CAS減1。等到所有子線程都執行完后(即state=0),會unpark()主調用線程,然后主調用線程就會從await()函數返回,繼續后余動作。

  一般來說,自定義同步器要么是獨占方法,要么是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支持自定義同步器同時實現獨占和共享兩種方式,如ReentrantReadWriteLock。

  在acquire() acquireShared()兩種方式下,線程在等待隊列中都是忽略中斷的,acquireInterruptibly()/acquireSharedInterruptibly()是支持響應中斷的。

繼承AQS,手寫獨占式可重入鎖:

  說了那么多,但是說一千道一萬不如自己手寫試試,接下來看代碼

package org.dance.day4.aqs;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 采用主類實現Lock接口,內部類繼承AQS,封裝細節
 * 自定義鎖
 * @author ZYGisComputer
 */
public class CustomerLock implements Lock {

    private final Sync sync = new Sync();

    /**
     * 采用內部類來繼承AQS,封裝細節
     *  實現獨占鎖,通過控制state狀態開表示鎖的狀態
     *      state:1 代表鎖已被占用
     *      state:0 代表鎖可以被占用
     */
    private static class Sync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            if(compareAndSetState(0,1)){
                // 當前線程獲取到鎖
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }else{
                return false;
            }
        }

        @Override
        protected boolean tryRelease(int arg) {
            // 如果狀態為沒人占用,還去釋放,就報錯
            if(getState()==0){
                throw new UnsupportedOperationException();
            }
            // 把鎖的占用者制空
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        /**
         * 判斷線程是否占用資源
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {
            return getState()==1;
        }

        /**
         * 獲取Condition接口
         * @return
         */
        public Condition getCondition(){
            return new ConditionObject();
        }
    }

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.getCondition();
    }
}

工具類:

package org.dance.tools;

import java.util.concurrent.TimeUnit;

/**
 * 類說明:線程休眠輔助工具類
 */
public class SleepTools {

    /**
     * 按秒休眠
     * @param seconds 秒數
     */
    public static final void second(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
        }
    }

    /**
     * 按毫秒數休眠
     * @param seconds 毫秒數
     */
    public static final void ms(int seconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(seconds);
        } catch (InterruptedException e) {
        }
    }
}

測試類:

package org.dance.day4.aqs;


import org.dance.tools.SleepTools;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 *類說明:測試手寫鎖
 */
public class TestMyLock {
    public static void main(String[] args) {
        TestMyLock testMyLock = new TestMyLock();
        testMyLock.test();
    }
    public void test() {
        // 先使用ReentrantLock 然后替換為我們自己的Lock
        final Lock lock = new ReentrantLock();

        class Worker extends Thread {
            @Override
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                        SleepTools.second(1);
                        System.out.println(Thread.currentThread().getName());
                        SleepTools.second(1);
                    } finally {
                        lock.unlock();
                    }
                    SleepTools.second(2);
                }
            }
        }
        // 啟動10個子線程
        for (int i = 0; i < 10; i++) {
            Worker w = new Worker();
            w.setDaemon(true);
            w.start();
        }
        // 主線程每隔1秒換行
        for (int i = 0; i < 10; i++) {
            SleepTools.second(1);
            System.out.println();
        }
    }
}

執行結果:

Thread-0



Thread-1

Thread-2



Thread-3

Thread-4

通過結果可以看出來每次都是只有一個線程在執行的,線程的鎖獲取沒有問題,接下來換我們自己的鎖

final Lock lock = new CustomerLock();

再次執行測試

執行結果:

Thread-0


Thread-1


Thread-2


Thread-3


Thread-4

由此可見,這個手寫的鎖,和ReentrantLock是一樣的效果,是不是感覺也挺簡單的,也沒有多少行代碼

那么獨占鎖,被一個線程占用着,其他線程去了哪里?不要走開接下來進入AQS的源碼看看

 

理論:

 

 

 

 在AQS中的數據結構是采用同步器+一個雙向循環鏈表的數據結構,來存儲等待的節點的,因為雙向鏈表是沒有頭的,但是為了保證喚醒的操作,同步器中的head標志了鏈表中的一個節點為頭節點,也就是將要喚醒的,也標識了一個尾節點

 

 

結點狀態waitStatus,需要保證可見性,用volatile修飾

      這里我們說下Node。Node結點是對每一個等待獲取資源的線程的封裝,其包含了需要同步的線程本身及其等待狀態,如是否被阻塞、是否等待喚醒、是否已經被取消等。變量waitStatus則表示當前Node結點的等待狀態,共有5種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。

  • CANCELLED(1):表示當前結點已取消調度。當timeout或被中斷(響應中斷的情況下),會觸發變更為此狀態,進入該狀態后的結點將不會再變化。

  • SIGNAL(-1):表示后繼結點在等待當前結點喚醒。后繼結點入隊時,會將前繼結點的狀態更新為SIGNAL。

  • CONDITION(-2):表示結點等待在Condition上,當其他線程調用了Condition的signal()方法后,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。

  • PROPAGATE(-3):共享模式下,前繼結點不僅會喚醒其后繼結點,同時也可能會喚醒后繼的后繼結點。

  • 0:新結點入隊時的默認狀態。

注意,負值表示結點處於有效等待狀態,而正值表示結點已被取消。所以源碼中很多地方用>0、<0來判斷結點的狀態是否正常

同步隊列中節點的增加和移除

 

 

 通過圖可以看出來,在增加 尾節點的時候需要通過CAS設置,因為可能是多個線程同時設置,但是移除首節點的時候是不需要的,因為這個操作是由同步器操作的,並且首節點只有一個

獨占式同步狀態的獲取與釋放

 

 

 AQS的從線程的獲取同步狀態到,對同步隊列的維護,到釋放,的流程圖就是這樣的,有興趣看源碼的自己去跟一下,就是主要實現的模板方法,

注意:其實在這個給大家提個醒,看源碼的時候,找核心的看,找主要的看,不要一行一行的扣着看,沒有意義,還有就是調用過程復雜,體會核心流程就可以

 

之前寫了<<Lock接口之Condition接口>>這一章,然后在這里寫一下Condition接口在AQS里面的實現吧,因為不管自己寫鎖也好,默認鎖的實現也好,用的Condition都是AQS默認寫好的

Condition實現分析:

 

 

 

 一個鎖是可以有多個Condition的,每個Condition都包含一個自己的等待隊列,不同於Object屬於同一個對象等待,他存在一個單鏈表結構的等待隊列,清晰的知道要喚醒自己的等待隊列中的節點,所以采用signal方法而不是signalall

當然采用的類還是Node類當然單鏈表其實就是沒有上一個節點的引用而已

等待隊列和同步隊列采用的是相同的類,只不過是實現的數據機構確是不一樣的而已

最終一個鎖的實例化會成為上圖中第二個圖的這種形式,Demo也就是之前寫的<<Lock接口之Condition接口>>中的用的鎖最終形成的結構及時就是維持了一個同步隊列和兩個等待隊列,鎖用於控制並發,而兩個隊列用於控制地點變化和公里數變化的不同的等待通知模式

節點在隊列中的移動

 

 

 就是在當前線程await的時候從同步隊列移除后加入到等待隊列尾部,而喚醒就是從等待隊列移除后加入到同步隊列尾部,兩個隊列相互轉換的過程,之所以采用同一個類,就是為了方便的在不同隊列中相互轉化

當然這也是為什么不推薦使用SignalAll方法的原因,因為如果一個等待隊列中有很多的線程在等待,全部喚醒后,最多且只能有一個線程獲取到同步狀態,其他線程全部要被加入到同步隊列的末尾,而且也可能當前的同步狀態被別人持有,一個線程也獲取不到,全部都要被加入同步隊列中,所以不推薦使用SignalAll,推薦是用Signal

其實也可以想象,比如wait和notify/notifyAll 在寫<<線程之間的協作(等待通知模式)>>這篇文章的時候的最后一個問題也可以大概想象一下,應該也是維持了一個同步隊列,但是等待隊列應該是只有一個,所以,被喚醒的是第一個等待的節點,但是它沒有辦法保證要被喚醒的節點一定是在頭一個,只能喚醒全部的節點,來保證需要喚醒的線程一定被喚醒,大概也是這樣的一個節點的移動,根據網絡文章的描述,應該八九不離十

根據猜測,結合上方的Condition接口分析,所以說,在wait,notify/notifyAll中推薦使用notifyAll,防止第一個節點不是需要喚醒的節點,造成喚醒錯誤,但是Condition是知道的,被喚醒的一定是需要喚醒的,不會喚醒錯誤,所以說,推薦使用signal

能看到這里的證明你真的很愛這個行業,你是最棒的!加油

回顧Lock的實現

ReentrantLock

其實在上面手寫的鎖,是有一些缺陷的,因為判斷的是不是等於1,所以他是一個不支持可重入的,一旦重入,就會造成死鎖,自己鎖住自己,但是ReentrantLock就不會

他支持鎖的可重入,並且支持鎖的公平和非公平

 

 通過源碼可以看到,他是通過狀態的累加完成的鎖的可重入,當然前提是已經拿到鎖的線程,會有這樣一個判斷

 

 所以可想而知,釋放的時候,每次釋放就遞減,最終等於0的時候完成鎖的釋放

 

 在實現公平鎖的時候,就是判斷當前節點是否有前期節點,是不是第一個,如果有,不是第一個,抱歉你不能搶鎖

 

 可想而知在非公平鎖中就是不判斷而已

因為不需要判斷,並且是誰搶到鎖,鎖就是誰的,所以說非公平鎖比公平鎖效率高

 

ReentrantReadWriteLock

在讀寫鎖中,一個狀態如何 保存兩個狀態呢?采用位數分割

 

 應該有知道 int是32位的,他把32位一分為二,采用低位保存寫的狀態,高位保存讀的狀態

寫鎖,應該都知道,只能同時被一個線程持有,所以重入的話,也比較好保存

但是讀鎖不一樣,可以被多個線程同時持有,是共享鎖,並且重入的次數是不一樣的,那么該則么保存呢?采用高位只保存被多少線程持有

 

 采用每個持有鎖的線程中的一個HoldCounter對象保存,使用ThreadLocalHoldCounter繼承ThreadLocal來保存線程變量,區別不同線程

讀寫鎖的升級和降級

讀寫鎖支持寫鎖降級為讀鎖,但是不支持讀鎖升級為寫鎖,為了保證線程安全和數據可見性,因為在寫鎖執行期間,讀鎖是被阻塞的,所以說寫鎖降級為讀鎖是沒有問題的,但是如果是讀鎖升級為寫鎖,在其他線程使用完寫鎖的時候,讀鎖是看不見的,為了保證線程安全,所以不支持讀鎖升級成寫鎖

 

到此AQS就寫完了,因為AQS涉及的知識太多,能看到現在的也都是大神了,恭喜你們,掌握了並發編程的半壁江上,為了自己的夢想更近了一步,加油,因為知識點多,所以大家多看幾遍,不理解的可以百度,也可以評論區提問

 

作者:彼岸舞

時間:2020\11\18

內容關於:並發編程

本文來源於網絡,只做技術分享,一概不負任何責任

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM