Java顯式鎖學習總結之五:ReentrantReadWriteLock源碼分析


概述

我們在介紹AbstractQueuedSynchronizer的時候介紹過,AQS支持獨占式同步狀態獲取/釋放、共享式同步狀態獲取/釋放兩種模式,對應的典型應用分別是ReentrantLock和Semaphore,AQS還可以混合兩種模式使用,讀寫鎖ReentrantReadWriteLock就是如此。

設想以下情景:我們在系統中有一個多線程訪問的緩存,多個線程都可以對緩存進行讀或寫操作,但是讀操作遠遠多於寫操作,要求寫操作要線程安全,且寫操作執行完成要求對當前的所有讀操作馬上可見。

分析上面的需求:因為有多個線程可能會執行寫操作,因此多個線程的寫操作必須同步串行執行;而寫操作執行完成要求對當前的所有讀操作馬上可見,這就意味着當有線程正在讀的時候,要阻塞寫操作,當正在執行寫操作時,要阻塞讀操作。一個簡單的實現就是將數據直接加上互斥鎖,同一時刻不管是讀還是寫線程,都只能有一個線程操作數據。但是這樣的問題就是如果當前只有N個讀線程,沒有寫線程,這N個讀線程也要傻呵呵的排隊讀,盡管其實是可以安全並發提高效率的。因此理想的實現是:

當有寫線程時,則寫線程獨占同步狀態。

當沒有寫線程時只有讀線程時,則多個讀線程可以共享同步狀態。

讀寫鎖就是為了實現這種效果而生。

使用示例

我們先來看一下讀寫鎖怎么使用,這里我們基於hashmap(本身線程不安全)做一個多線程並發安全的緩存:

public class ReadWriteCache {
    private static Map<String, Object> data = new HashMap<>();
    private static ReadWriteLock lock = new ReentrantReadWriteLock(false);
    private static Lock rlock = lock.readLock();
    private static Lock wlock = lock.writeLock();

    public static Object get(String key) {
        rlock.lock();
        try {
            return data.get(key);
        } finally {
            rlock.unlock();
        }
    }

    public static Object put(String key, Object value) {
        wlock.lock();
        try {
            return data.put(key, value);
        } finally {
            wlock.unlock();
        }
    }

}

限於篇幅我們只實現2個方法,get和put。從代碼可以看出,我們先創建一個  ReentrantReadWriteLock 對象,構造函數 false 代表是非公平的(非公平的含義和ReentrantLock相同)。然后通過readLock、writeLock方法分別獲取讀鎖和寫鎖。在做讀操作的時候,也就是get方法,我們要先獲取讀鎖;在做寫操作的時候,即put方法,我們要先獲取寫鎖。

通過以上代碼,我們就構造了一個線程安全的緩存,達到我們之前說的:寫線程獨占同步狀態,多個讀線程可以共享同步狀態。

源碼分析

我們先來看下 ReentrantReadWriteLock 類的整體結構:

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** Performs all synchronization mechanics */
    final Sync sync;

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }


    abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
public static class ReadLock implements Lock, java.io.Serializable {}
public static class WriteLock implements Lock, java.io.Serializable {} }

可以看到,在公平鎖與非公平鎖的實現上,與ReentrantLock一樣,也是有一個繼承AQS的內部類Sync,然后NonfairSync和FairSync都繼承Sync,通過構造函數傳入的布爾值決定要構造哪一種Sync實例。

讀寫鎖比ReentrantLock多出了兩個內部類:ReadLock和WriteLock, 用來定義讀鎖和寫鎖,然后在構造函數中,會構造一個讀鎖和一個寫鎖實例保存到成員變量 readerLock 和 writerLock。我們在上面的示例中使用到的 readLock() 和 writeLock() 方法就是返回這兩個成員變量保存的鎖實例。

我們在Sync類中可以看到下列代碼:

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT); //每次要讓共享鎖+1,就應該讓state加 1<<16
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;  //每種鎖的最大重入數量
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** Returns the number of shared holds represented in count  */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds represented in count  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

可以看到主要是幾個位移操作,通過上面的整體結構,我們知道了在讀寫鎖內保存了讀鎖和寫鎖的兩個實例。之前在ReentrantLock中,我們知道鎖的狀態是保存在Sync實例的state字段中的(繼承自父類AQS),現在有了讀寫兩把鎖,然而可以看到還是只有一個Sync實例,那么一個Sync實例的state是如何同時保存兩把鎖的狀態的呢?答案就是用了位分隔:

state字段是32位的int,讀寫鎖用state的低16位保存寫鎖(獨占鎖)的狀態;高16位保存讀鎖(共享鎖)的狀態。

因此要獲取獨占鎖當前的重入數量,就是 state & ((1 << 16) -1) (即 exclusiveCount 方法)

要獲取共享鎖當前的重入數量,就是 state >>> 16 (即 sharedCount 方法)

下面我們具體看寫鎖和讀鎖的實現。

寫鎖

看下WriteLock類中的lock和unlock方法:

        public void lock() {
            sync.acquire(1);
        }

        public void unlock() {
            sync.release(1);
        }

可以看到就是調用的獨占式同步狀態的獲取與釋放,因此真實的實現就是Sync的 tryAcquire和 tryRelease。

寫鎖的獲取

看下tryAcquire:

        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c); //獲取獨占鎖的重入數
            if (c != 0) {
                // 當前state不為0,此時:如果寫鎖狀態為0說明讀鎖此時被占用返回false;如果寫鎖狀態不為0且寫鎖沒有被當前線程持有返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded"); //寫鎖重入數溢出
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
//到這里了說明state為0,嘗試直接cas。writerShouldBlock是為了實現公平或非公平策略的
if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }

邏輯很簡單,直接看注釋就能理解。

寫鎖的釋放

看下tryRelease:

        protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();  //非獨占模式直接拋異常
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free) 
                setExclusiveOwnerThread(null); //如果獨占模式重入數為0了,說明獨占模式被釋放
            setState(nextc);  //不管獨占模式是否被釋放,更新獨占重入數
            return free;
        }

邏輯很簡單,直接看注釋就能理解。

讀鎖

類似於寫鎖,讀鎖的lock和unlock的實際實現對應Sync的 tryAcquireShared 和 tryReleaseShared方法。

讀鎖的獲取

        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1; //如果獨占模式被占且不是當前線程持有,則獲取失敗
            int r = sharedCount(c);
//如果公平策略沒有要求阻塞且重入數沒有到達最大值,則直接嘗試CAS更新state
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
//更新成功后會在firstReaderHoldCount中或readHolds(ThreadLocal類型的)的本線程副本中記錄當前線程重入數(淺藍色代碼),這是為了實現jdk1.6中加入的getReadHoldCount()方法的,這個方法能獲取當前線程重入共享鎖的次數(state中記錄的是多個線程的總重入次數),加入了這個方法讓代碼復雜了不少,但是其原理還是很簡單的:如果當前只有一個線程的話,還不需要動用ThreadLocal,直接往firstReaderHoldCount這個成員變量里存重入數,當有第二個線程來的時候,就要動用ThreadLocal變量readHolds了,每個線程擁有自己的副本,用來保存自己的重入數。
if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); //用來處理CAS沒成功的情況,邏輯和上面的邏輯是類似的,就是加了無限循環 }

 下面這個方法就不用細說了,和上面的處理邏輯類似,加了無限循環用來處理CAS失敗的情況。

        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

讀鎖的釋放

 

        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
//淺藍色代碼也是為了實現jdk1.6中加入的getReadHoldCount()方法,在更新當前線程的重入數。
if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; }
//這里是真正的釋放同步狀態的邏輯,就是直接同步狀態-SHARED_UNIT,然后CAS更新,沒啥好說的
for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }

 

補充內容

通過上面的源碼分析,我們可以發現一個現象:

在線程持有讀鎖的情況下,該線程不能取得寫鎖(因為獲取寫鎖的時候,如果發現當前的讀鎖被占用,就馬上獲取失敗,不管讀鎖是不是被當前線程持有)

在線程持有寫鎖的情況下,該線程可以繼續獲取讀鎖(獲取讀鎖時如果發現寫鎖被占用,只有寫鎖沒有被當前線程占用的情況才會獲取失敗)

仔細想想,這個設計是合理的:因為當線程獲取讀鎖的時候,可能有其他線程同時也在持有讀鎖,因此不能把獲取讀鎖的線程“升級”為寫鎖;而對於獲得寫鎖的線程,它一定獨占了讀寫鎖,因此可以繼續讓它獲取讀鎖,當它同時獲取了寫鎖和讀鎖后,還可以先釋放寫鎖繼續持有讀鎖,這樣一個寫鎖就“降級”為了讀鎖。

綜上:

一個線程要想同時持有寫鎖和讀鎖,必須先獲取寫鎖再獲取讀鎖;

寫鎖可以“降級”為讀鎖;

讀鎖不能“升級”為寫鎖。

總結

讀寫鎖還是很實用的,因為一般場景下,數據的並發操作都是讀多於寫,在這種情況下,讀寫鎖能夠提供比排它鎖更好的並發性。

在讀寫鎖的實現方面,本來以為會比較復雜,結果看完源碼的感受也是快刀切西瓜,看來AQS的設計真的很棒,在AQS的基礎上構建的組件實現都很簡單。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM