【JUC】JDK1.8源碼分析之ReentrantReadWriteLock(七)


一、前言

  在分析了鎖框架的其他類之后,下面進入鎖框架中最后一個類ReentrantReadWriteLock的分析,它表示可重入讀寫鎖,ReentrantReadWriteLock中包含了兩種鎖,讀鎖ReadLock和寫鎖WriteLock,可以通過這兩種鎖實現線程間的同步,下面開始進行分析。

二、ReentrantReadWriteLock數據結構

  分析源碼可以知道,ReentrantReadWriteLock底層是基於ReentrantLockAbstractQueuedSynchronizer來實現的,所以,ReentrantReadWriteLock的數據結構也依托於AQS的數據結構,在前面對AQS的分析中已經指出了其數據結構,在這里不再累贅。

三、ReentrantReadWriteLock源碼分析

  3.1. 類的繼承關系  

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {}

  說明:可以看到,ReentrantReadWriteLock實現了ReadWriteLock接口,ReadWriteLock接口定義了獲取讀鎖和寫鎖的規范,具體需要實現類去實現;同時其還實現了Serializable接口,表示可以進行序列化,在源代碼中可以看到ReentrantReadWriteLock實現了自己的序列化邏輯。

  3.2. 類的內部類

  ReentrantReadWriteLock有五個內部類,五個內部類之間也是相互關聯的。內部類的關系如下圖所示。

  說明:如上圖所示,Sync繼承自AQS、NonfairSync繼承自Sync類、FairSync繼承自Sync類;ReadLock實現了Lock接口、WriteLock也實現了Lock接口。

  ① Sync類

  1. 類的繼承關系 

abstract static class Sync extends AbstractQueuedSynchronizer {}

  說明:Sync抽象類繼承自AQS抽象類,Sync類提供了對ReentrantReadWriteLock的支持。

  2. 類的內部類

  Sync類內部存在兩個內部類,分別為HoldCounter和ThreadLocalHoldCounter,其中HoldCounter主要與讀鎖配套使用,其中,HoldCounter源碼如下。

        // 計數器
        static final class HoldCounter {
            // 計數
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            // 獲取當前線程的TID屬性的值
            final long tid = getThreadId(Thread.currentThread());
        }
View Code

  說明:HoldCounter主要有兩個屬性,count和tid,其中count表示某個讀線程重入的次數,tid表示該線程的tid字段的值,該字段可以用來唯一標識一個線程。ThreadLocalHoldCounter的源碼如下  

        // 本地線程計數器
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            // 重寫初始化方法,在沒有進行set的情況下,獲取的都是該HoldCounter值
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
View Code

  說明:ThreadLocalHoldCounter重寫了ThreadLocal的initialValue方法,ThreadLocal類可以將線程與對象相關聯。在沒有進行set的情況下,get到的均是initialValue方法里面生成的那個HolderCounter對象。

  3. 類的屬性 

    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 版本序列號
        private static final long serialVersionUID = 6317671515068378041L;        
        // 高16位為讀鎖,低16位為寫鎖
        static final int SHARED_SHIFT   = 16;
        // 讀鎖單位
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        // 讀鎖最大數量
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        // 寫鎖最大數量
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        // 本地線程計數器
        private transient ThreadLocalHoldCounter readHolds;
        // 緩存的計數器
        private transient HoldCounter cachedHoldCounter;
        // 第一個讀線程
        private transient Thread firstReader = null;
        // 第一個讀線程的計數
        private transient int firstReaderHoldCount;
    }
View Code

  說明:該屬性中包括了讀鎖、寫鎖線程的最大量。本地線程計數器等。

  4. 類的構造函數  

        // 構造函數
        Sync() {
            // 本地線程計數器
            readHolds = new ThreadLocalHoldCounter();
            // 設置AQS的狀態
            setState(getState()); // ensures visibility of readHolds
        }
View Code

  說明:在Sync的構造函數中設置了本地線程計數器和AQS的狀態state。

  5. 核心函數分析

  對ReentrantReadWriteLock對象的操作絕大多數都轉發至Sync對象進行處理。下面對Sync類中的重點函數進行分析

  I. sharedCount函數

  表示占有讀鎖的線程數量,源碼如下 

static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

  說明:直接將state右移16位,就可以得到讀鎖的線程數量,因為state的高16位表示讀鎖,對應的第十六位表示寫鎖數量。

  II. exclusiveCount函數

  表示占有寫鎖的線程數量,源碼如下  

static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

  說明:直接將狀態state和(2^16 - 1)做與運算,其等效於將state模上2^16。寫鎖數量由state的低十六位表示。

  III. tryRelease函數  

        protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            // 獲取當前線程
            Thread current = Thread.currentThread();
            // 獲取狀態
            int c = getState();
            // 寫線程數量
            int w = exclusiveCount(c);
            if (c != 0) { // 狀態不為0
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread()) // 寫線程數量為0或者當前線程沒有占有獨占資源
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT) // 判斷是否超過最高寫線程數量
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                // 設置AQS狀態
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires)) // 寫線程是否應該被阻塞
                return false;
            // 設置獨占線程
            setExclusiveOwnerThread(current);
            return true;
        }
View Code

  說明:此函數用於釋放寫鎖資源,首先會判斷該線程是否為獨占線程,若不為獨占線程,則拋出異常,否則,計算釋放資源后的寫鎖的數量,若為0,表示成功釋放,資源不將被占用,否則,表示資源還被占用。其函數流程圖如下。

  IV. tryAcquire函數  

        protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            // 獲取當前線程
            Thread current = Thread.currentThread();
            // 獲取狀態
            int c = getState();
            // 寫線程數量
            int w = exclusiveCount(c);
            if (c != 0) { // 狀態不為0
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread()) // 寫線程數量為0或者當前線程沒有占有獨占資源
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT) // 判斷是否超過最高寫線程數量
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                // 設置AQS狀態
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires)) // 寫線程是否應該被阻塞
                return false;
            // 設置獨占線程
            setExclusiveOwnerThread(current);
            return true;
        }
View Code

  說明:此函數用於獲取寫鎖,首先會獲取state,判斷是否為0,若為0,表示此時沒有讀鎖線程,再判斷寫線程是否應該被阻塞,而在非公平策略下總是不會被阻塞,在公平策略下會進行判斷(判斷同步隊列中是否有等待時間更長的線程,若存在,則需要被阻塞,否則,無需阻塞),之后在設置狀態state,然后返回true。若state不為0,則表示此時存在讀鎖或寫鎖線程,若寫鎖線程數量為0或者當前線程為獨占鎖線程,則返回false,表示不成功,否則,判斷寫鎖線程的重入次數是否大於了最大值,若是,則拋出異常,否則,設置狀態state,返回true,表示成功。其函數流程圖如下

  V. tryReleaseShared函數 

        protected final boolean tryReleaseShared(int unused) {
            // 獲取當前線程
            Thread current = Thread.currentThread();
            if (firstReader == current) { // 當前線程為第一個讀線程
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1) // 讀線程占用的資源數為1
                    firstReader = null;
                else // 減少占用的資源
                    firstReaderHoldCount--;
            } else { // 當前線程不為第一個讀線程
                // 獲取緩存的計數器
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current)) // 計數器為空或者計數器的tid不為當前正在運行的線程的tid
                    // 獲取當前線程對應的計數器
                    rh = readHolds.get();
                // 獲取計數
                int count = rh.count;
                if (count <= 1) { // 計數小於等於1
                    // 移除
                    readHolds.remove();
                    if (count <= 0) // 計數小於等於0,拋出異常
                        throw unmatchedUnlockException();
                }
                // 減少計數
                --rh.count;
            }
            for (;;) { // 無限循環
                // 獲取狀態
                int c = getState();
                // 獲取狀態
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc)) // 比較並進行設置
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }
View Code

  說明:此函數表示讀鎖線程釋放鎖。首先判斷當前線程是否為第一個讀線程firstReader,若是,則判斷第一個讀線程占有的資源數firstReaderHoldCount是否為1,若是,則設置第一個讀線程firstReader為空,否則,將第一個讀線程占有的資源數firstReaderHoldCount減1;若當前線程不是第一個讀線程,那么首先會獲取緩存計數器(上一個讀鎖線程對應的計數器 ),若計數器為空或者tid不等於當前線程的tid值,則獲取當前線程的計數器,如果計數器的計數count小於等於1,則移除當前線程對應的計數器,如果計數器的計數count小於等於0,則拋出異常,之后再減少計數即可。無論何種情況,都會進入無限循環,該循環可以確保成功設置狀態state。其流程圖如下

  VI. tryAcquireShared函數 

        private IllegalMonitorStateException unmatchedUnlockException() {
            return new IllegalMonitorStateException(
                "attempt to unlock read lock, not locked by current thread");
        }
        
        // 共享模式下獲取資源
        protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            // 獲取當前線程
            Thread current = Thread.currentThread();
            // 獲取狀態
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current) // 寫線程數不為0並且占有資源的不是當前線程
                return -1;
            // 讀鎖數量
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) { // 讀線程是否應該被阻塞、並且小於最大值、並且比較設置成功
                if (r == 0) { // 讀鎖數量為0
                    // 設置第一個讀線程
                    firstReader = current;
                    // 讀線程占用的資源數為1
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) { // 當前線程為第一個讀線程
                    // 占用資源數加1
                    firstReaderHoldCount++;
                } else { // 讀鎖數量不為0並且不為當前線程
                    // 獲取計數器
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) // 計數器為空或者計數器的tid不為當前正在運行的線程的tid
                        // 獲取當前線程對應的計數器
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0) // 計數為0
                        // 設置
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }
View Code

  說明:此函數表示讀鎖線程獲取讀鎖。首先判斷寫鎖是否為0並且當前線程不占有獨占鎖,直接返回;否則,判斷讀線程是否需要被阻塞並且讀鎖數量是否小於最大值並且比較設置狀態成功,若當前沒有讀鎖,則設置第一個讀線程firstReader和firstReaderHoldCount;若當前線程線程為第一個讀線程,則增加firstReaderHoldCount;否則,將設置當前線程對應的HoldCounter對象的值。流程圖如下。


  VII. fullTryAcquireShared函數 

        final int fullTryAcquireShared(Thread current) {
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = null;
            for (;;) { // 無限循環
                // 獲取狀態
                int c = getState();
                if (exclusiveCount(c) != 0) { // 寫線程數量不為0
                    if (getExclusiveOwnerThread() != current) // 不為當前線程
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) { // 寫線程數量為0並且讀線程被阻塞
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) { // 當前線程為第一個讀線程
                        // assert firstReaderHoldCount > 0;
                    } else { // 當前線程不為第一個讀線程
                        if (rh == null) { // 計數器不為空
                            // 
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) { // 計數器為空或者計數器的tid不為當前正在運行的線程的tid
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT) // 讀鎖數量為最大值,拋出異常
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) { // 比較並且設置成功
                    if (sharedCount(c) == 0) { // 讀線程數量為0
                        // 設置第一個讀線程
                        firstReader = current;
                        // 
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }
View Code

  說明:在tryAcquireShared函數中,如果下列三個條件不滿足(讀線程是否應該被阻塞、小於最大值、比較設置成功)則會進行fullTryAcquireShared函數中,它用來保證相關操作可以成功。其邏輯與tryAcquireShared邏輯類似,不再累贅。

  而其他內部類的操作基本上都是轉化到了對Sync對象的操作,在此不再累贅。

  3.3. 類的屬性  

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    // 版本序列號    
    private static final long serialVersionUID = -6992448646407690164L;    
    // 讀鎖
    private final ReentrantReadWriteLock.ReadLock readerLock;
    // 寫鎖
    private final ReentrantReadWriteLock.WriteLock writerLock;
    // 同步隊列
    final Sync sync;
    
    private static final sun.misc.Unsafe UNSAFE;
    // 線程ID的偏移地址
    private static final long TID_OFFSET;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            // 獲取線程的tid字段的內存地址
            TID_OFFSET = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("tid"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
View Code

  說明:可以看到ReentrantReadWriteLock屬性包括了一個ReentrantReadWriteLock.ReadLock對象,表示讀鎖;一個ReentrantReadWriteLock.WriteLock對象,表示寫鎖;一個Sync對象,表示同步隊列。

  3.4. 類的構造函數

  1. ReentrantReadWriteLock()型構造函數  

    public ReentrantReadWriteLock() {
        this(false);
    }
View Code

  說明:此構造函數會調用另外一個有參構造函數。

  2. ReentrantReadWriteLock(boolean)型構造函數 

    public ReentrantReadWriteLock(boolean fair) {
        // 公平策略或者是非公平策略
        sync = fair ? new FairSync() : new NonfairSync();
        // 讀鎖
        readerLock = new ReadLock(this);
        // 寫鎖
        writerLock = new WriteLock(this);
    }
View Code

  說明:可以指定設置公平策略或者非公平策略,並且該構造函數中生成了讀鎖與寫鎖兩個對象。

  3.5 核心函數分析

  對ReentrantReadWriteLock的操作基本上都轉化為了對Sync對象的操作,而Sync的函數已經分析過,不再累贅。

四、示例

  下面給出了一個使用ReentrantReadWriteLock的示例,源代碼如下。

package com.hust.grid.leesf.reentrantreadwritelock;

import java.util.concurrent.locks.ReentrantReadWriteLock;

class ReadThread extends Thread {
    private ReentrantReadWriteLock rrwLock;
    
    public ReadThread(String name, ReentrantReadWriteLock rrwLock) {
        super(name);
        this.rrwLock = rrwLock;
    }
    
    public void run() {
        System.out.println(Thread.currentThread().getName() + " trying to lock");
        try {
            rrwLock.readLock().lock();
            System.out.println(Thread.currentThread().getName() + " lock successfully");
            Thread.sleep(5000);        
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rrwLock.readLock().unlock();
            System.out.println(Thread.currentThread().getName() + " unlock successfully");
        }
    }
}

class WriteThread extends Thread {
    private ReentrantReadWriteLock rrwLock;
    
    public WriteThread(String name, ReentrantReadWriteLock rrwLock) {
        super(name);
        this.rrwLock = rrwLock;
    }
    
    public void run() {
        System.out.println(Thread.currentThread().getName() + " trying to lock");
        try {
            rrwLock.writeLock().lock();
            System.out.println(Thread.currentThread().getName() + " lock successfully");    
        } finally {
            rrwLock.writeLock().unlock();
            System.out.println(Thread.currentThread().getName() + " unlock successfully");
        }
    }
}

public class ReentrantReadWriteLockDemo {
    public static void main(String[] args) {
        ReentrantReadWriteLock rrwLock = new ReentrantReadWriteLock();
        ReadThread rt1 = new ReadThread("rt1", rrwLock);
        ReadThread rt2 = new ReadThread("rt2", rrwLock);
        WriteThread wt1 = new WriteThread("wt1", rrwLock);
        rt1.start();
        rt2.start();
        wt1.start();
    } 
}
View Code

  運行結果(某一次):  

rt1 trying to lock
rt2 trying to lock
wt1 trying to lock
rt1 lock successfully
rt2 lock successfully
rt1 unlock successfully
rt2 unlock successfully
wt1 lock successfully
wt1 unlock successfully

  說明:程序中生成了一個ReentrantReadWriteLock對象,並且設置了兩個讀線程,一個寫線程。根據結果,可能存在如下的時序圖。

  ① rt1線程執行rrwLock.readLock().lock操作,主要的函數調用如下。

  說明:此時,AQS的狀態state為2^16 次方,即表示此時讀線程數量為1。

  ② rt2線程執行rrwLock.readLock().lock操作,主要的函數調用如下。

  說明:此時,AQS的狀態state為2 * 2^16次方,即表示此時讀線程數量為2。

  ③ wt1線程執行rrwLock.writeLock().lock操作,主要的函數調用如下。

  說明:此時,在同步隊列Sync queue中存在兩個結點,並且wt1線程會被禁止運行。

  ④ rt1線程執行rrwLock.readLock().unlock操作,主要的函數調用如下。

  說明:此時,AQS的state為2^16次方,表示還有一個讀線程。

  ⑤ rt2線程執行rrwLock.readLock().unlock操作,主要的函數調用如下。

  說明:當rt2線程執行unlock操作后,AQS的state為0,並且wt1線程將會被unpark,其獲得CPU資源就可以運行。

  ⑥ wt1線程獲得CPU資源,繼續運行,需要恢復。由於之前acquireQueued函數中的parkAndCheckInterrupt函數中被禁止的,所以,恢復到parkAndCheckInterrupt函數中,主要的函數調用如下

  說明:最后,sync queue隊列中只有一個結點,並且頭結點尾節點均指向它,AQS的state值為1,表示此時有一個寫線程。

  ⑦ wt1執行rrwLock.writeLock().unlock操作,主要的函數調用如下。

  說明:此時,AQS的state為0,表示沒有任何讀線程或者寫線程了。並且Sync queue結構與上一個狀態的結構相同,沒有變化。

五、總結

  經過分析ReentrantReadWriteLock的源碼,可知其可以實現多個線程同時讀,此時,寫線程會被阻塞。並且,寫線程獲取寫入鎖后可以獲取讀取鎖,然后釋放寫入鎖,這樣寫入鎖變成了讀取鎖。至此,並發框架中的鎖框架就已經全部介紹完成了,通過分析源碼,有了不少收獲,謝謝各位園友的觀看~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM