【JUC】JDK1.8源碼分析之ReentrantReadWriteLock


 

重入鎖ReentrantLock是排他鎖,排他鎖在同一時刻僅有一個線程可以進行訪問,但是在大多數場景下,大部分時間都是提供讀服務,而寫服務占有的時間較少。然而讀服務不存在數據競爭問題,如果一個線程在讀時禁止其他線程讀勢必會導致性能降低。所以就提供了讀寫鎖。

讀寫鎖維護着一對鎖,一個讀鎖和一個寫鎖,讀鎖是共享鎖,寫鎖是獨占鎖。通過分離讀鎖和寫鎖,使得並發性比一般的排他鎖有了較大的提升:在同一時間可以允許多個讀線程同時訪問,但是在寫線程訪問時,所有讀線程和寫線程都會被阻塞。

讀寫鎖的主要特性:

  1. 公平性:支持公平性和非公平性。
  2. 重入性:支持重入。讀寫鎖最多支持65535個遞歸寫入鎖和65535個遞歸讀取鎖。
  3. 鎖降級:遵循獲取寫鎖、獲取讀鎖在釋放寫鎖的次序,寫鎖能夠降級成為讀鎖

讀寫鎖ReentrantReadWriteLock實現接口ReadWriteLock,該接口維護了一對相關的鎖,一個用於只讀操作,另一個用於寫入操作。只要沒有 writer,讀取鎖可以由多個 reader 線程同時保持。寫入鎖是獨占的。

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

ReadWriteLock定義了兩個方法。readLock()返回用於讀操作的鎖,writeLock()返回用於寫操作的鎖。ReentrantReadWriteLock定義如下:

/** 內部類  讀鎖 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 內部類  寫鎖 */
    private final ReentrantReadWriteLock.WriteLock writerLock;

    final Sync sync;

    /** 使用默認(非公平)的排序屬性創建一個新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /** 使用給定的公平策略創建一個新的 ReentrantReadWriteLock */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    /** 返回用於寫入操作的鎖 */
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    /** 返回用於讀取操作的鎖 */
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

    abstract static class Sync extends AbstractQueuedSynchronizer {
        /**
         * 省略其余源代碼
         */
    }
    public static class WriteLock implements Lock, java.io.Serializable{
        /**
         * 省略其余源代碼
         */
    }

    public static class ReadLock implements Lock, java.io.Serializable {
        /**
         * 省略其余源代碼
         */
    }
View Code

 ReentrantReadWriteLock與ReentrantLock一樣,其鎖主體依然是Sync,它的讀鎖、寫鎖都是依靠Sync來實現的。所以ReentrantReadWriteLock實際上只有一個鎖,只是在獲取讀取鎖和寫入鎖的方式上不一樣而已,它的讀寫鎖其實就是兩個類:ReadLock、writeLock,這兩個類都是lock實現。

讀寫狀態的設計

讀寫鎖同樣依賴自定義同步器來實現同步功能,而讀寫狀態就是其同步器的同步狀態。回想ReentrantLock中自定義同步器的實現,同步狀態表示鎖被一個線程重復獲取的次數,而讀寫鎖的自定義同步器需要在同步狀態(一個整型變量)上維護多個讀線程和一個寫線程的狀 
,使得該狀態的設計成為讀寫鎖實現的關鍵。如果在一個整型變量上維護多種狀態,就一定需要“按位切割使用”這個變量,讀寫鎖將變量切分成了兩個部分,16位表示讀,低16位表示,划分方式如下圖所示 
這里寫圖片描述
當前同步狀態表示一個線程已經獲取了寫鎖,且重進入了兩次,同時也連續獲取了兩次讀鎖。讀寫鎖是如何迅速確定讀和寫各自的狀態呢?答案是通過位運算。假設當前同步狀態值為S,寫狀態等於S&0x0000FFFF(直接將狀態state和(2^16 - 1)做與運算,其等效於將state模上2^16。將高16位全部抹去),讀狀態等於S>>>16(無符號補0右移16位)。當寫狀態增加1時,等於S+1,當讀狀態增加1時,等於S+(1<<16),也就是S+0x00010000。根據狀態的划分能得出一個推論:S不等於0時,當寫狀態(S&0x0000FFFF)等於0時,則讀狀態(S>>>16)大於0,即讀鎖已被獲取。

 

寫鎖

寫鎖就是一個支持可重入的排他鎖。

寫鎖的獲取

寫鎖的獲取最終會調用tryAcquire(int arg),該方法在內部類Sync中實現:

protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        //當前鎖個數
        int c = getState();
        //寫鎖
        int w = exclusiveCount(c);
        if (c != 0) {
            //c != 0 && w == 0 表示存在讀鎖
            //當前線程不是已經獲取寫鎖的線程
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            //超出最大范圍
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            setState(c + acquires);
            return true;
        }
        //是否需要阻塞
        if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
            return false;
        //設置獲取鎖的線程為當前線程
        setExclusiveOwnerThread(current);
        return true;
    }
View Code

該方法和ReentrantLock的tryAcquire(int arg)大致一樣,在判斷重入時增加了一項條件:讀鎖是否存在。因為要確保寫鎖的操作對讀鎖是可見的,如果在存在讀鎖的情況下允許獲取寫鎖,那么那些已經獲取讀鎖的其他線程可能就無法感知當前寫線程的操作。因此只有等讀鎖完全釋放后,寫鎖才能夠被當前線程所獲取,一旦寫鎖獲取了,所有其他讀、寫線程均會被阻塞

寫鎖的釋放

獲取了寫鎖用完了則需要釋放,WriteLock提供了unlock()方法釋放寫鎖:

public void unlock() {
        sync.release(1);
    }

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
View Code

 寫鎖的釋放最終還是會調用AQS的模板方法release(int arg)方法,該方法首先調用tryRelease(int arg)方法嘗試釋放鎖,tryRelease(int arg)方法為讀寫鎖內部類Sync中定義了,如下:

 protected final boolean tryRelease(int releases) {
        //釋放的線程不為鎖的持有者
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        //若寫鎖的新線程數為0,則將鎖的持有者設置為null
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }
View Code

寫鎖釋放鎖的整個過程和獨占鎖ReentrantLock相似,每次釋放均是減少寫狀態,當寫狀態為0時表示 寫鎖已經完全釋放了,從而等待的其他線程可以繼續訪問讀寫鎖,獲取同步狀態,同時此次寫線程的修改對后續的線程可見。

讀鎖

讀鎖為一個可重入的共享鎖,它能夠被多個線程同時持有,在沒有其他寫線程訪問時,讀鎖總是或獲取成功。

讀鎖的獲取

讀鎖的獲取可以通過ReadLock的lock()方法:

 public void lock() {
            sync.acquireShared(1);
        }

Sync的acquireShared(int arg)定義在AQS中:

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

tryAcqurireShared(int arg)嘗試獲取讀同步狀態,該方法主要用於獲取共享式同步狀態,獲取成功返回 >= 0的返回結果,否則返回 < 0 的返回結果。

 protected final int tryAcquireShared(int unused) {
        //當前線程
        Thread current = Thread.currentThread();
        int c = getState();
        //exclusiveCount(c)計算寫鎖
        //如果存在寫鎖,且鎖的持有者不是當前線程,直接返回-1
        //存在鎖降級問題,后續闡述
        if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
            return -1;
        //讀鎖
        int r = sharedCount(c);

        /*
         * readerShouldBlock():讀鎖是否需要等待(公平鎖原則)
         * r < MAX_COUNT:持有線程小於最大數(65535)
         * compareAndSetState(c, c + SHARED_UNIT):設置讀取鎖狀態
         */
        if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
            /*
             * holdCount部分后面講解
             */
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);
    }
View Code

讀鎖獲取的過程相對於獨占鎖而言會稍微復雜下,整個過程如下:

1.因為存在鎖降級情況,占有寫鎖的線程可以獲取讀鎖,如果存在寫鎖且鎖的持有者不是當前線程則直接返回失敗,否則繼續。鎖降級就意味着寫鎖是可以降級為讀鎖的,但是需要遵循先獲取寫鎖、獲取讀鎖在釋放寫鎖的次序。

2.依據公平性原則,判斷讀鎖是否需要阻塞,讀鎖持有線程數小於最大值(65535),且設置鎖狀態成功,執行以下代碼(對於HoldCounter下面再闡述),並返回1。如果不滿足改條件,執行fullTryAcquireShared()。

 

 final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            //鎖降級
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
            }
            //讀鎖需要阻塞
            else if (readerShouldBlock()) {
                //列頭為當前線程
                if (firstReader == current) {
                }
                //HoldCounter后面講解
                else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        return -1;
                }
            }
            //讀鎖超出最大范圍
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            //CAS設置讀鎖成功
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                //如果是第1次獲取“讀取鎖”,則更新firstReader和firstReaderHoldCount
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                }
                //如果想要獲取鎖的線程(current)是第1個獲取鎖(firstReader)的線程,則將firstReaderHoldCount+1
                else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    //更新線程的獲取“讀取鎖”的共享計數
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }
View Code

fullTryAcquireShared(Thread current)會根據“是否需要阻塞等待”,“讀取鎖的共享計數是否超過限制”等等進行處理。如果不需要阻塞等待,並且鎖的共享計數沒有超過限制,則通過CAS嘗試獲取鎖,並返回1

讀鎖的釋放

與寫鎖相同,讀鎖也提供了unlock()釋放讀鎖:

 public void unlock() {
            sync.releaseShared(1);
        }

unlcok()方法內部使用Sync的releaseShared(int arg)方法,該方法定義在AQS中:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
View Code

調用tryReleaseShared(int arg)嘗試釋放讀鎖,該方法定義在讀寫鎖的Sync內部類中:

protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        //如果想要釋放鎖的線程為第一個獲取鎖的線程
        if (firstReader == current) {
            //僅獲取了一次,則需要將firstReader 設置null,否則 firstReaderHoldCount - 1
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        }
        //獲取rh對象,並更新“當前線程獲取鎖的信息”
        else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        //CAS更新同步狀態
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
View Code

HoldCounter

在讀鎖獲取鎖和釋放鎖的過程中,我們一直都可以看到一個變量rh (HoldCounter ),該變量在讀鎖中扮演着非常重要的作用。

我們了解讀鎖的內在機制其實就是一個共享鎖,為了更好理解HoldCounter ,我們暫且認為它不是一個鎖的概率,而相當於一個計數器。一次共享鎖的操作就相當於在該計數器的操作。獲取共享鎖,則該計數器 + 1,釋放共享鎖,該計數器 - 1。只有當線程獲取共享鎖后才能對共享鎖進行釋放、重入操作。所以HoldCounter的作用就是當前線程持有共享鎖的數量,這個數量必須要與線程綁定在一起,否則操作其他線程鎖就會拋出異常。我們先看HoldCounter的定義:

static final class HoldCounter {
            int count = 0;
            final long tid = getThreadId(Thread.currentThread());
        }
View Code

HoldCounter 定義非常簡單,就是一個計數器count 和線程 id tid 兩個變量。按照這個意思我們看到HoldCounter 是需要和某給線程進行綁定了,我們知道如果要將一個對象和線程綁定僅僅有tid是不夠的,而且從上面的代碼我們可以看到HoldCounter 僅僅只是記錄了tid,根本起不到綁定線程的作用。那么怎么實現呢?答案是ThreadLocal(聯系講ThreadLocal,定義如下:

static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
View Code

通過上面代碼HoldCounter就可以與線程進行綁定了。故而,HoldCounter應該就是綁定線程上的一個計數器,而ThradLocalHoldCounter則是線程綁定的ThreadLocal。從上面我們可以看到ThreadLocal將HoldCounter綁定到當前線程上,同時HoldCounter也持有線程Id,這樣在釋放鎖的時候才能知道ReadWriteLock里面緩存的上一個讀取線程(cachedHoldCounter)是否是當前線程。這樣做的好處是可以減少ThreadLocal.get()的次數,因為這也是一個耗時操作。需要說明的是這樣HoldCounter綁定線程id而不綁定線程對象的原因是避免HoldCounter和ThreadLocal互相綁定而GC難以釋放它們(盡管GC能夠智能的發現這種引用而回收它們,但是這需要一定的代價),所以其實這樣做只是為了幫助GC快速回收對象而已。

看到這里我們明白了HoldCounter作用了,我們在看一個獲取讀鎖的代碼段:

else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
View Code

這段代碼涉及了幾個變量:firstReader 、firstReaderHoldCount、cachedHoldCounter 。我們先理清楚這幾個變量:

private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
private transient HoldCounter cachedHoldCounter;
View Code

firstReader 看名字就明白了為第一個獲取讀鎖的線程,firstReaderHoldCount為第一個獲取讀鎖的重入數,cachedHoldCounter為HoldCounter的緩存。

理清楚上面所有的變量了,HoldCounter也明白了,我們就來給上面那段代碼標明注釋,如下:

//如果獲取讀鎖的線程為第一次獲取讀鎖的線程,則firstReaderHoldCount重入數 + 1
    else if (firstReader == current) {
        firstReaderHoldCount++;
    } else {
        //非firstReader計數
        if (rh == null)
            rh = cachedHoldCounter;
        //rh == null 或者 rh.tid != current.getId(),需要獲取rh
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
            //加入到readHolds中
        else if (rh.count == 0)
            readHolds.set(rh);
        //計數+1
        rh.count++;
        cachedHoldCounter = rh; // cache for release
    }
View Code

這里解釋下為何要引入firstRead、firstReaderHoldCount。這是為了一個效率問題,firstReader是不會放入到readHolds中的,如果讀鎖僅有一個的情況下就會避免查找readHolds。

鎖降級

上開篇是LZ就闡述了讀寫鎖有一個特性就是鎖降級,鎖降級就意味着寫鎖是可以降級為讀鎖的,但是需要遵循先獲取寫鎖、獲取讀鎖在釋放寫鎖的次序。注意如果當前線程先獲取寫鎖,然后釋放寫鎖,再獲取讀鎖這個過程不能稱之為鎖降級,鎖降級一定要遵循那個次序。

在獲取讀鎖的方法tryAcquireShared(int unused)中,有一段代碼就是來判讀鎖降級的:

int c = getState();
        //exclusiveCount(c)計算寫鎖
        //如果存在寫鎖,且鎖的持有者不是當前線程,直接返回-1
        //存在鎖降級問題,后續闡述
        if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
            return -1;
        //讀鎖
        int r = sharedCount(c);
View Code

鎖降級中讀鎖的獲取釋放為必要?肯定是必要的。試想,假如當前線程A不獲取讀鎖而是直接釋放了寫鎖,這個時候另外一個線程B獲取了寫鎖,那么這個線程B對數據的修改是不會對當前線程A可見的。如果獲取了讀鎖,則線程B在獲取寫鎖過程中判斷如果有讀鎖還沒有釋放則會被阻塞,只有當前線程A釋放讀鎖后,線程B才會獲取寫鎖成功。

 

http://cmsblogs.com/?p=2213

https://www.cnblogs.com/leesf456/p/5419132.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM