基於AQS實現的Java並發工具類


本文主要介紹一下基於AQS實現的Java並發工具類的作用,然后簡單談一下該工具類的實現原理。其實都是AQS的相關知識,只不過在AQS上包裝了一下而已。本文也是基於您在有AQS的相關知識基礎上,進行講解的

CountDownLatch

作用

CountDownLatch是一個同步工具類,它允許一個或多個線程一直等待,直到其他一個或者多個線程的操作執行完后再執行。

單詞Latch的中文翻譯是門閂,也就是有“門鎖”的功能,所以當門沒有打開時,N個人是不能進入屋內的,也就是N個線程是不能繼續往下運行的,支持這樣的特性可以控制線程執行任務的時機

單詞CountDown的中文翻譯是倒計時,倒計時一定是從某個值開始往下遞減,直到減到0才結束。

所以,CountDownLatch是通過一個計數器來實現的,計數器的初始化值為同步狀態數量。每當一個線程完成了自己的任務后,就會消耗一個同步狀態,計數器的值會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然后在閉鎖上等待的線程就可以恢復執行任務了。

常用API

//count初始化計數值,一旦count初始化完成后,就不可重新初始化或者修改CountDownLatch對象的內部計數器的值。
public CountDown(int count){} 
//使當前線程掛起,直到計數值為0時,才繼續往下執行。
public void await() {}; 
// 有超時的等待
public boolean await(long timeout , TimeUnit timeUnit) throws InterruptExcetion {};
public void  countDown() {} //將count值減1

常見應用場景

多線程做資源初始化,主線程先暫停等待初始化結束;每個線程初始化結束后都countDown一次,等全部線程都初始化結束后(state=0),此時主線程再繼續往下執行

實現原理

Sync(int count) {
    setState(count); // count的值表示的就是當前已經有count數量的線程獲得同步鎖了。
}

int getCount() {
    return getState();
}

protected int tryAcquireShared(int acquires) {,
    return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

public void countDown() {
    sync.releaseShared(1);
}
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

CountDownLatch其實就是AQS共享式同步狀態獲取的一種具體實現。構造方法傳入的count值就表示當前已經有count數量的線程獲得同步狀態了,然后每個調用countDown()方法的線程都是去做了一次releaseShared釋放同步狀態的操作。而await()方法則是嘗試去獲得同步狀態。由於CountDownLatch重寫了tryAcquireShared方法,只有state=0,才能獲得共享同步狀態。所以就實現了一個線程await,等待其他多個線程countDown到0,再繼續往下執行。

CyclicBarrier

作用

CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier)。CyclicBarrier的作用是讓一組線程之間相互等待,任何一個線程到達屏障點后就阻塞,直到最后一個線程到達,才都繼續往下執行。個人理解:CyclicBarrier可以看成是一道大門或者關卡,先到的線程會被阻塞在大門口,直到最后一個線程到達屏障時,大門才被打開,所有被阻塞的線程才會繼續干活。就像是朋友聚餐,只有最后一個朋友到達時,才會開吃!

循環使用指的是在大門被打開后,可以再次關閉;即再讓指定數目的線程在屏障前阻塞等待,然后再次打開大門。

常用API

//parties表示屏障前可阻塞的線程數,當阻塞的線程數到達parties時,屏障被打開,所有阻塞的線程將會被喚醒
public CyclicBarrier(int parties);

// 此構造方法不同於上面的是在屏障被打開時將優先執行barrierAction,方便處理更負責的業務場景
public CyclicBarrier(int parties, Runnable barrierAction) ;

// 等待屏障的打開
public int await() throws InterruptedException,BrokenBarrierException ;

//等待屏障的打開 超時會拋出 TimeoutException 
public int await(long timeout, TimeUnit unit) throws 
    InterruptedException,
    BrokenBarrierException,
    TimeoutException ;
    
// 將屏障重置為其初始化狀態即重置為構造函數傳入的parties值。
public void reset() 

常見應用場景

用於多線程計算數據,最后合並計算結果的場景。每個parter負責一部分計算,最后的線程barrierAction線程進行數據匯總。

實現原理

Semaphore

作用

Semaphore是基於計數的信號量,可以用來控制能同時訪問特定資源的線程數量;可以通過設定一個閾值,基於此,多個線程爭搶獲取許可信號,做完自己的操作后歸還許可信號,超過閾值后,線程申請許可信號將會被阻塞,直到有其他線程釋放許可信號。

簡單來說,Semaphore就是看門的老大爺,人滿了,就不讓進了,只有有人離開,空出來位子,才給進去。

常用API

  • 構造方法:
// 用給定的允許數量和默認的非公平設置創建Semaphore對象。
Semaphore(int permits)  
//用給定的允許數量和給定的公平設置創建一個Semaphore對象。
Semaphore(int permits , boolean fair) 
  • 常用方法
1) void acquire()
從信號量里獲取一個可用的許可,如果沒有可用的許可,那么當前線程將被禁用以進行線程調度,並且處於休眠狀態。
2) void tryAcquire()
嘗試獲取信號量,獲取失敗立刻返回
3) void release() 
釋放一個許可,將其返回給信號量
4) int availablePermits()
返回此信號量中當前可用的許可數量。
5) boolean hasQueuedThreads()
查詢是否有線程正在等待獲取。

常見應用場景

Semaphore可以用來做流量控制,特別公用資源有限的應用場景,比如數據庫連接。假設有一個需求,要讀取幾萬個文件的數據,因為都是IO密集型任務,我們可以啟動幾十個線程並發的讀取,但是如果讀到內存后,還需要進行存儲到數據庫中,而數據庫的連接數只有10幾個,這時我們必須控制只有十個線程同時獲取數據庫連接保存數據,否則會報錯無法獲取數據庫連接。這個時候,我們就可以使用Semaphore來做流控。

實現原理

protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

Semaphore也是基於AQS實現的,state值為初始化時傳入的permits信號量,Semaphore也重寫了tryAcquireShared方法,tryAcquireShared方法返回>=0,才表示獲得同步量。

有一點不同的是Semaphore實現了公平搶占和非公平搶占,公平搶占就是搶占前先判斷自己是否是同步隊列中第一個要出隊列的,不是則進入同步隊列等待。非公平搶占,則不關心同步隊列等待情況,直接嘗試獲取。

重入鎖ReentrantLock

作用

如果鎖具備可重入性,則稱作為可重入鎖。像synchronized和ReentrantLock都是可重入鎖,可重入性在我看來實際上表明了鎖的分配機制:基於線程的分配,而不是基於方法調用的分配。舉個簡單的例子,當一個線程執行到某個synchronized方法時,比如說method1,而在method1中會調用另外一個synchronized方法method2,此時線程不必重新去申請鎖,而是可以直接執行方法method2。

實現原理

ReentrantLock其實是AQS 獨占式獲取同步狀態的一種具體實現,

  1. 可重入實現原理:
    可重入需要記錄重入次數,在ReentrantLock中是用state來記錄重入次數的。一個線程嘗試獲取同步狀態時,會判斷當前線程是否是同步狀態的獨占擁有者,如果是,則將state加上請求同步量(對於鎖一般都是1),來記錄重入次數,如果不是,則進入同步隊列爭搶同步狀態。
    釋放時,也會首先判斷當前線程是否是同步狀態的獨占擁有者,不是則拋出異常。如是,則減去釋放量,減到state為0時,釋放對同步狀態的獨占,其實就是將setExclusiveOwnerThread(null);

  2. 公平鎖與非公平鎖實現原理
    和Semaphore一樣,公平鎖在嘗試爭搶同步狀態時的時候,會判斷當前線程是否是同步隊列中的第一個節點hasQueuedPredecessors(),如果不是則爭搶失敗,進入同步隊列等待。非公平鎖則直接爭搶。

讀寫鎖(ReentrantReadWriteLock)

作用

而讀寫鎖是維護了一對鎖(一個讀鎖和一個寫鎖),通過分離讀鎖和寫鎖,使得同一時刻可以允許多個讀線程訪問,但是在寫線程進行訪問時,所有的讀線程和其他寫線程均被阻塞。讀寫就是AQS中共享式爭搶同步狀態的具體實現。寫鎖就是AQS中獨占式爭搶同步狀態的具體實現。

常見使用場景

一般情況下,讀寫鎖的性能都會比排它鎖好,因為大多數場景讀是多於寫的。在讀多於寫的情況下,讀寫鎖能夠提供比排它鎖更好的並發性和吞吐量。

在常見的開發中,我們經常會定義一個共享的用作內存緩存的數據結構;比如一個大Map,緩存全部的城市Id和城市name對應關系。這個大Map絕大部分時間提供讀服務(根據城市Id查詢城市名稱等);而寫操作占有的時間很少,通常是在服務啟動時初始化,然后可以每隔一定時間再刷新緩存的數據。但是寫操作開始到結束之間,不能再有其他讀操作進來,並且寫操作完成之后的更新數據需要對后續的讀服務可見。

實現原理

這里,我們先介紹ReentrantReadWriteLock的特性:

image.png-190.6kB

  • 讀寫狀態的設計
    我們知道,在AQS內部是以單個int類型的原子變量來表示同步狀態的,而對於ReentrantReadWriteLock為了在單個int類型的變量上既維護讀狀態也維護寫狀態,所以ReentrantReadWriteLock對state進行“按位切割使用”,將變量切分成了兩個部分,高16位表示讀,低16位表示寫。

image.png-151kB

當前同步狀態表示一個線程已經獲取了寫鎖,且重進入了兩次,同時也連續獲取了兩次讀鎖。讀寫鎖是如何迅速確定讀和寫各自的狀態呢?答案是通過位運算。假設當前同步狀態值為S,寫狀態等於S&0x0000FFFF(將高16位全部抹去),讀狀態等於S>>>16(無符號補0右移16位)。當寫狀態增加1時,等於S+1,當讀狀態增加1時,等於S+(1<<16),也就是S+0x00010000。

根據狀態的划分能得出一個推論:S不等於0時,當寫狀態(S&0x0000FFFF)等於0時,則讀狀態(S>>>16)大於0,即讀鎖已被獲取。

  • 寫鎖的獲取與釋放

寫鎖是一個支持重進入的排它鎖,如果當前線程已經獲取了寫鎖,則增加寫狀態。如果當前線程在獲取寫鎖時,讀鎖已經被獲取(讀狀態不為0)或者該線程不是已經獲取寫鎖的線程,則當前線程進入等待狀態。

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

protected final boolean tryAcquire(int acquires) {
  
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // 存在讀鎖或者存在寫鎖但當前線程不是已經獲取寫鎖的線程
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    // writerShouldBlock() 是公平性的保證,在獲取寫鎖前,看看自己是否是隊列中第一個出隊列節點
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

從上面的代碼邏輯,我們知道寫鎖獲取成功的條件是c != 0 && (w == 0 || current == getExclusiveOwnerThread()) ,這是因為c!=0c = getState())表示當前有線程獲得鎖(可能是讀鎖,也可能是寫鎖),此時如果寫鎖的數量(int w = exclusiveCount(c);)也為0,不是寫鎖就是讀鎖則表示當前有讀鎖存在,則寫鎖只能進入同步隊列等待。如果寫鎖的數量大於0,因為讀寫鎖是沖突的,不可能同時存在,也就是說當前一定是寫鎖存在,此時只要出於重入性考慮,判斷寫鎖擁有者是不是自己就行。

寫鎖的釋放沒有太復雜的邏輯,只要判斷自己重入的次數都釋放完,將當前獨占鎖擁有線程改為null即可。

  • 讀鎖的獲取與釋放
    讀鎖是一個支持重進入的共享鎖,它能夠被多個線程同時獲取,在沒有其他寫線程訪問(或者寫狀態為0)時,讀鎖總會被成功地獲取,而所做的也只是(線程安全的)增加讀狀態。如果當前線程已經獲取了讀鎖,則增加讀狀態。如果當前線程在獲取讀鎖時,寫鎖已被其他線程獲取,則進入等待狀態。

獲取讀鎖的實現從Java 5到Java 6變得復雜許多,主要原因是新增了一些功能,例如getReadHoldCount()方法,作用是返回當前線程獲取讀鎖的次數。讀狀態是所有線程獲取讀鎖次數的總和,而每個線程各自獲取讀鎖的次數只能選擇保存在ThreadLocal中,由線程自身維護,這使獲取讀鎖的實現變得復雜

  protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        // 驗證是否有寫鎖存儲,如果存在且自己是哪個寫鎖擁有者,因為支持鎖降級,所以可以擁有讀鎖。否則,返回-1
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        int r = sharedCount(c);
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) { //注意這里加的是SHARED_UNIT,而非unused
            if (r == 0) { 
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
}    
在tryAcquireShared(int unused)方法中,如果其他線程已經獲取了寫鎖,則當前線程獲取讀鎖失敗,進入等待狀態。如果當前線程獲取了寫鎖或者寫鎖未被獲取,則當前線程(線程安全,依靠CAS保證)增加讀狀態,成功獲取讀鎖。

從上面state增加的代碼片段`compareAndSetState(c, c + SHARED_UNIT))`,我們知道每個線程的重入次數,並不是通過state的值來體現的,每輪第一個獲取讀鎖的重入次數是通過`firstReaderHoldCount`來體現的,而這輪后面獲得讀寫的線程重入次數是保存在ThreadLocal中的。這里輪的概念指的是state的值從0到n(n>0)再到0,為1輪(我自己的認知)。state的值可以體現出當前獲得讀鎖的線程總重入次數。

``` java
protected final boolean tryReleaseShared(int unused) {

    Thread current = Thread.currentThread();
    
    if (firstReader == current) {
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
```

從上面釋放鎖的代碼,讀鎖的每次釋放(線程安全的,可能有多個讀線程同時釋放讀鎖)均減少讀狀態,減少的值是(1<<16)。
  • 鎖降級
    鎖降級指的是寫鎖降級成為讀鎖。如果當前線程擁有寫鎖,然后將其釋放,最后再獲取讀鎖,這種分段完成的過程不能稱之為鎖降級。鎖降級是指把持住(當前擁有的)寫鎖,再獲取到讀鎖,隨后釋放(先前擁有的)寫鎖的過程。因為鎖降級的存在,所以獲取寫鎖的線程可以再次獲取讀鎖,但獲取讀鎖的線程不能再次獲取寫鎖。也就是說,如果你先獲取寫鎖,然后獲取讀鎖,可以成功:

    //可以這樣做
    w.lock();
    try {
    	r.lock();
    	try {
    		// do something
    	} finally {
    		r.unlock();
    	}
    } finally {
    	w.unlock();
    }
    

    而如果你先獲取讀鎖,再獲取寫鎖,你的線程將永遠無法成功:

    //線程將永遠阻塞,無法完成
    r.lock();
    try {
    	w.lock();
    	try {
    		// do something
    	} finally {
    		w.unlock();
    	}
    } finally {
    	r.unlock();
    }
    

    需要注意的是,即使存在鎖降級,也需要手動釋放寫鎖。

    因為寫鎖是獨占式的,並且寫鎖在加鎖時,需要判斷是否有讀鎖存在,如果有讀鎖存在,則不能進行寫鎖加鎖,所以一個線程在獲得讀鎖后,再嘗試進行加讀寫,這時因為有讀鎖的存在,所以永遠不能成功加上寫鎖。但是,對於先擁有寫鎖,再嘗試加讀鎖時,由於if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)在嘗試進行加鎖時,如果當前寫鎖數量大於0,會判斷當前線程是否就是寫鎖擁有者,如果是,則繼續加讀鎖,所以從寫鎖降級到讀鎖是允許的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM