簡單看看讀寫鎖ReentantReadWriteLock


  前面我們看了可重入鎖ReentrantLock,其實這個鎖只適用於寫多讀少的情況,就是多個線程去修改一個數據的時候,適合用這個鎖,但是如果多個線程都去讀一個數據,還用這個鎖的話會降低效率,因為同一時刻只能是一個線程去讀取!

  本次我們看看讀寫鎖ReentantReadWriteLock,這個鎖采用了讀寫分離的策略,分成了讀鎖和寫鎖,多個線程可以同時獲取讀鎖;

 

一.簡單使用讀寫鎖

  啥也別問,問就是先會用了再說,還記得前面用ReentrantLock實現了一個線程安全的List嗎?我們可以使用讀寫鎖稍微改造一下就好了;

package com.example.demo.study;

import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Study0204 {
    // 線程不安全的List
    private ArrayList<String> list = new ArrayList<String>();
    // 讀寫鎖
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    // 獲取讀鎖
    private final Lock readLock = lock.readLock();
    // 獲取寫鎖
    private final Lock writeLock = lock.writeLock();

    // 往集合中添加元素,寫鎖
    public void add(String str) {
        writeLock.lock();
        try {
            list.add(str);
        } finally {
            writeLock.unlock();
        }
    }

    // 刪除集合中的元素,寫鎖
    public void remove(String str) {
        writeLock.lock();
        try {
            list.remove(str);
        } finally {
            writeLock.unlock();
        }
    }

    // 根據索引獲取集合中某個元素,讀鎖
    public String get(int index) {
        readLock.lock();
        try {
            return list.get(index);
        } finally {
            readLock.unlock();
        }
    }

}

 

二.讀寫鎖的結構

  這里最核心的還是用了AQS,可以看到里面還是有一個Sync這個內部工具類,然后還有兩個內部工具類,一個是讀鎖ReadLock,一個是寫鎖WriteLock

 

  我們還能看到Sync這個類就是繼承AQS,然后有NonfairSync和FairSync這兩個類去繼承Sync,到這里結構還是和ReentrantLock是一樣的;

 

  我們再看看讀鎖和寫鎖,可以看出來就是實現了Lock這個接口,然后通過傳進去的sync對象去實現Lock中的所有方法

 

  大概的結構就是這樣的,我們可以使用下面這個圖顯示出來,其實ReentrantReadWriteLock最重要的就是三個類:

    一個是Sync工具類用於操作AQS阻塞隊列和state的值,而且有基於Sync實現的公平策略和非公平策略;

    一個是寫鎖,實現了Lock接口,內部有個Sync字段,在Lock的實現方法中就是調用Sync對象的方法去實現的

    另外一個是讀鎖,和寫鎖一樣,實現了Lock接口,內部有個Sync字段,在Lock的實現方法也都是調用Sync對象的方法實現

 

 二.分析Sync 

  上篇博客中我們知道了在ReentrantLock中的state表示的是鎖的可重入次數,而且state是AQS中定義的int類型,那么在讀寫鎖這里有兩個狀態是怎么表示呢?

  總有一些人會想到一些花里胡哨的東西,還別說,挺管用的,由於state是int類型,共有32位,我們可以一分為二,前面的16位叫做高16位,表示獲取讀鎖的次數,后面的叫做的低16位,表示寫鎖的可重入次數,具體的,我們可以看看Sync類的屬性,主要是涉及到基本的二進制運算,有興趣的可以研究一下;

abstract static class Sync extends AbstractQueuedSynchronizer {

    //這個可以說是讀鎖(共享鎖)移動的位數
    static final int SHARED_SHIFT   = 16;
    //讀鎖狀態單位值,這里就是將1有符號左移16位,1用二進制表示為:00000000 00000000 00000000 00000001
    //左移16位之后:00000000 00000001 00000000 00000000,也就是2的16次方,就是65536
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    //讀鎖的線程最大數65535個
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

    //寫鎖(排它鎖)掩碼,這里用二進制表示 00000000 00000000 11111111 11111111
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    //返回讀鎖的個數,這里也就是將state無符號右移16位,那么有效數字肯定就是高16位,轉成十進制后就是獲取讀鎖的次數
    static int sharedCount(int c)    { 
        return c >>> SHARED_SHIFT; 
    }

    //返回寫鎖的個數,這里就是將state和上面的寫鎖掩碼做按位與運算,高16位被置為0,有效數字位第16位,轉成十進制就是寫鎖的可重入次數
    static int exclusiveCount(int c) { 
        return c & EXCLUSIVE_MASK;
    }

    //省略很多代碼
}

 

  其中,Sync中還有幾個比較重要的屬性如下,不懂不要緊,后面用到了再回頭看看就好;

 
 //記錄第一個獲取讀鎖的線程
 private transient Thread firstReader = null;

 //記錄第一個獲取讀鎖的線程繼續獲取讀鎖的可重入次數
 private transient int firstReaderHoldCount;

 //記錄最后一個獲取讀鎖的線程獲取讀鎖的可重入次數,HoldCounter類如下
 private transient HoldCounter cachedHoldCounter;

static final class HoldCounter {
    int count = 0;
    final long tid = getThreadId(Thread.currentThread());
}

//記錄除去第一個獲取讀鎖的線程外,獲取的讀鎖的可重入次數,ThreadLocalHoldCounter類如下
private transient ThreadLocalHoldCounter readHolds;

static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}
  
        

 

三.寫鎖的獲取和釋放

  寫鎖在獲取的時候有一個前提:沒有其他線程持有寫鎖或者讀鎖,當前線程才能獲取寫鎖,否則就把當前線程丟到阻塞隊列里去了,記住,不能一邊讀還一邊寫

  1.lock方法:

  寫鎖主要是ReentantReadWriteLock中的內部類WriteLock類實現的,這是一個獨占鎖,同一時刻只能有一個線程可以獲取該鎖,時刻重入的;

public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
    
    Thread current = Thread.currentThread();
    int c = getState();
    //這個方法的是實現static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; },就是將state的低十六位轉為十進制,
    //也就是寫鎖的可重入次數
    int w = exclusiveCount(c);
    //state不為0,說明讀鎖或者寫鎖被占用了
    if (c != 0) {
        //如果w==0,而c!=0,說明c的高16為不為0,即有線程獲取了讀鎖,此時寫鎖是不能獲取的,注意,別人在讀的時候是不能寫入的呀!返回false
        //如果w!=0表示有線程獲取了寫鎖,但是占用鎖的線程不是當前線程,那么線程獲取寫鎖失敗,返回false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        //到這里說明寫鎖可以獲取成功,那么就要判斷寫鎖的可重入次數是否大於65535
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //將state加一
        setState(c + acquires);
        return true;
    }
    //能到這里來,說明c==0,也就是說讀鎖和寫鎖都在空閑着,下面我們要看看公平策略下和非公平下的writerShouldBlock實現
    if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

 

  我們要看看最后的那里的if語句,其中writerShouldBlock的實現,能到這里說明讀鎖和寫鎖都在空閑這,可以隨時去獲取;

  非公平策略下始終返回的是false,於是會走到compareAndSetState(c, c + acquires),這里是用CAS嘗試獲取寫鎖,獲取失敗的話就返回發了;獲取成功的話就走到setExclusiveOwnerThread(current);設置占用讀鎖的線程是當前線程;

 

  公平策略下的話,這個方法前面好像說過,就是判斷當前線程節點前面有沒有前驅節點,如果有的話那就肯定獲取失敗啊,要讓前驅節點先獲取,於是在上面最后的if那里直接返回false;如果這里判斷沒有前驅節點,這里返回true,那么上面就會走到最后setExclusiveOwnerThread(current)設置當前線程占有寫鎖

 

  2.tryLock方法

  這個方法和上面的lock方法一樣,注意,這里最后那里默認使用的是非公平模式;

public boolean tryLock( ) {
    return sync.tryWriteLock();
}

final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    //這里默認使用的是非公平模式
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

 

 

  3.unlock方法

public void unlock() {
    sync.release(1);
}
//這個是AQS中的方法,說過tryRelease留給具體子類去實現的,重點看看怎么實現的
public final boolean release(int arg) {
if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
        unparkSuccessor(h);
    return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
    //isHeldExclusively方法在下面,因為是當前線程調用的release方法,要判斷當前線程是不是持有寫鎖的線程,不是的話就拋錯了
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //state減一
    int nextc = getState() - releases;
    //獲取低十六位,看是不是等於0,如果等於0,說明此時沒有線程占用寫鎖,於是就調用setExclusiveOwnerThread(null)
    //將占用寫鎖的線程設置為null,最后更新state就行了
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

 

四.讀鎖的獲取和釋放

  結合前面的寫鎖一起說一下:

    (1).沒有其他線程持有寫鎖或者讀鎖,當前線程才能獲取寫鎖,否則就把當前線程丟到阻塞隊列里去了;當前線程獲取了寫鎖之后,其他線程不能獲取寫鎖和讀鎖;

    (2)沒有其他線程獲取寫鎖時,當前線程才能獲取讀鎖,否則就丟到阻塞隊列里去了,不能 一邊讀一邊寫;當前線程獲取了讀鎖之后,其他線程只能獲取讀鎖,不能獲取寫鎖;

    (3)當前線程獲取的寫鎖之后,還能繼續獲取寫鎖,這叫做可重入;也可以繼續獲取讀鎖,這叫做鎖降級;

    (4)當前線程獲取的讀鎖之后,還能繼續獲取讀鎖;

  1.lock方法

public void lock() {
    //acquireShared方法在AQS中
    sync.acquireShared(1);
}

public final void acquireShared(int arg) {
    //tryAcquireShared實現在ReentrantReadWriteLock中的Sync中
    if (tryAcquireShared(arg) < 0)
        //這個方法在AQS中,主要是將當前線程放到阻塞隊列中
        doAcquireShared(arg);
    }
protected final int tryAcquireShared(int unused) {
    
    Thread current = Thread.currentThread();
    int c = getState();
    //這里就是判斷:如果其他線程獲取了寫鎖,那么就返回-1
    //先判斷寫鎖的可重入次數不為0,表示有線程占用寫鎖,而且還不是當前線程,那么直接返回-1
    //這里注意一下:一個線程在獲取寫鎖之后,還可以再獲取讀鎖,釋放的時候兩個所都要釋放啊!!!
    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
        return -1;
    //這個方法獲取讀鎖的次數,讀鎖的次數最多只能是65535個
    int r = sharedCount(c);
    //readerShouldBlock方法分為公平策略和非公平策略,這個方法的意思就是:當前線程獲取已經獲取讀鎖,再讀鎖被阻塞了,那么說明還有其他線程正在獲取寫鎖
    //如果返回false,說明此時沒有線程獲取寫鎖,而且這個方法分為公平策略和非公平策略
    //公平策略的話,如果當前線程節點有前驅節點就返回true,沒有前驅節點返回false;
    //非公平策略的話,判斷阻塞隊列中哨兵節點后面的那個節點是不是正在獲取寫鎖,是的話返回true,不是的話返回false
    //compareAndSetState(c, c + SHARED_UNIT)方法中,SHARED_UNIT表示65536,這個CAS表示對高16為增加1,對於整個32位來說,就是加2的16次方
    if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
        //r==0表示讀鎖空閑,於是記錄第一個讀鎖,和第一個獲取讀鎖的線程獲取讀鎖的可重入次數
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        //如果當前線程就是第一個獲取讀鎖的線程,再獲取讀鎖,這里就將可重入次數加一即可
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            //能到這里就說明讀鎖已經被其他線程占用,當前線程是最后一個是最后獲取讀鎖的線程,我們更新一下cacheHoldCounter和readHolds就行了
            //cacheHoldCounter表示最后一個獲取讀鎖的線程獲取讀鎖的可重入次數
            //readHolds記錄了除了第一個獲取讀鎖的線程外,其他線程獲取讀鎖的可重入次數
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    //能到這里說明readerShouldBlock方法返回的是true,而且當前線程在之前已經獲取了寫鎖,再獲取讀鎖,就是鎖降級!!!
    return fullTryAcquireShared(current);
}

//鎖降級操作
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        //寫鎖被其他線程占用,就返回-1
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
        //獲取讀鎖被阻塞,此時還有其他線程在獲取寫鎖,
        } else if (readerShouldBlock()) {
            if (firstReader == current) {
            } 
            else {
                //有其他線程在嘗試獲取寫鎖,結束當前線程獲取讀鎖,就更新一下readHolds就行了
                //就從readHolds中移除當前線程的持有數,然后返回-1,結束嘗試獲取鎖步驟
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        //讀鎖數量達到了最大數量就拋錯
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //CAS更新讀鎖的數量,然后更新一些變量
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            //讀鎖數量為0,就讓當前線程作為第一個獲取讀鎖的線程
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            //當前線程已經獲取過讀鎖了,就把第一個獲取讀鎖的可重入次數加一
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
            //這里前面說過了 if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

 

 

  2.tryLock方法

public boolean tryLock() {
    return sync.tryReadLock();
}

final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        //如果當前寫鎖已經被占用,獲取讀鎖失敗
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        //讀鎖不能超過最大數量
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //更新state,將高16位加一,更新成功,如果讀鎖沒有線程占有,就把當前線程更新為第一個獲取讀鎖的線程和更新第一個獲取讀鎖的可重入次數
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            //當前線程就是第一個獲取讀鎖的線程,就將可重入次數加一
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
            //到這里說明當前線程獲取讀鎖成功,雖然不是第一個獲取讀鎖的線程,於是更新一下cachedHoldCounter和readHolds
            //cachedHoldCounter:最后一個線程獲取讀鎖的可重入次數
            //readHolds:除去第一個線程,其他線程獲取讀鎖的可重入次數
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

 

 

  3.unlock方法

public void unlock() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    //實現在下面,就是嘗試釋放讀鎖,並判斷還有沒有線程占用讀鎖,沒有線程占用讀鎖,就會進入到if里面doReleaseShared方法
    if (tryReleaseShared(arg)) {
        //前面可能有一些線程在獲取寫鎖的時候,由於當前線程讀鎖沒有釋放,所以那些線程就被阻塞了
        //當前方法就是把那些線程釋放一個
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //如果當前線程是第一個獲取讀鎖的線程
    if (firstReader == current) {
        //第一個線程獲取讀鎖的可重入次數為1,就釋放,否則,可重入次數減一
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    //當前線程不是第一個獲取讀鎖的線程就更新cachedHoldCounter和readHolds    
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    //這里一個無限循環,獲取state,將高十六位減一,用CAS更新,更新成功的話,就判斷讀鎖有沒有被占用
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

 

  

五.總結

  我們用下面這個圖來總結一下ReentrantReadWriteLock,這個鎖就是利用state是32位的,高16位用於表示讀鎖的個數,低16位表示寫鎖的可重入次數,通過CAS對其進行了讀寫分離,適用於讀多寫少的場景;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM