Java並發(7)- 你真的了解ReentrantReadWriteLock嗎?


引言

在前幾篇文章中了解了ReentrantLock、Semaphore與CountDownLatch后,J.U.C包中基於AQS實現的並發工具類還剩一個比較重要的:讀寫鎖ReentrantReadWriteLock。讀寫鎖在Java面試過程中是一個經常性考的題目,他涉及到的知識點比較多,導致很多人不能透徹的理解他。舉幾個面試中常見的問題:

  • ReentrantReadWriteLock和ReentrantLock的區別?
  • 你能用ReentrantReadWriteLock實現一個簡單的緩存管理嗎?
  • 你能自己實現一個簡單的讀寫鎖嗎?
  • ReentrantReadWriteLock會發生寫飢餓的情況嗎?如果發生,有沒有比較好的解決辦法?

上面的問題大家都能回答出來嗎?如果都能很好的回答,那么這篇文章也許對你沒有太大幫助。如果不能,別着急,下面就來一一分析解答上面的幾個問題。

1. ReentrantReadWriteLock和ReentrantLock的區別?

這個問題大家應該都會回答:ReentrantLock是獨占鎖,ReentrantReadWriteLock是讀寫鎖。那么這就引申出下一個問題:ReentrantReadWriteLock中的讀鎖和寫鎖分別是獨占還是共享的?他們之間的關系又是什么樣的?要了解透徹這兩個問題,分析源碼是再好不過的了。

1.1 ReentrantReadWriteLock中的讀鎖和寫鎖的實現方式

使用ReentrantReadWriteLock讀寫鎖的方式,會調用readLock()和writeLock()兩個方法,看下他們的源碼:

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

可以看到用到了WriteLock和ReadLock兩個靜態內部類,他們對鎖的實現如下:

public static class ReadLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.acquireShared(1); //共享
    }

    public void unlock() {
        sync.releaseShared(1); //共享
    }
}

public static class WriteLock implements Lock, java.io.Serializable {
    public void lock() {
        sync.acquire(1); //獨占
    }

    public void unlock() {
        sync.release(1); //獨占
    }
}

abstract static class Sync extends AbstractQueuedSynchronizer {}

看到這里發現了ReentrantReadWriteLock和ReentrantLock的一個相同點和不同點,相同的是使用了同一個關鍵實現AbstractQueuedSynchronizer,不同的是ReentrantReadWriteLock使用了兩個鎖分別實現了AQS,而且WriteLock和ReentrantLock一樣,使用了獨占鎖。而ReadLock和Semaphore一樣,使用了共享鎖。再往下的內容估計看過前面幾篇文章的都很熟悉了,獨占鎖通過state變量的0和1兩個狀態來控制是否有線程占有鎖,共享鎖通過state變量0或者非0來控制多個線程訪問。在上面的代碼中,ReadLock和WriteLock使用了同一個AQS,那么在ReentrantReadWriteLock中又是怎么控制讀鎖和寫鎖關系的呢?

1.2 ReadLock和WriteLock共享變量

讀寫鎖定義為:一個資源能夠被多個讀線程訪問,或者被一個寫線程訪問,但是不能同時存在讀寫線程。

通過這句話很容易聯想到通過兩個不同的變量來控制讀寫,當獲取到讀鎖時對讀變量+1,當獲取懂啊寫變量時對寫變量+1。但AQS中並沒有為ReadLock和WriteLock添加額外的變量,還是通過一個state來實現的。那是怎么做到讀寫分離的呢?來看看下面這段代碼:1。但AQS中並沒有為ReadLock和WriteLock添加額外的變量,還是通過一個state來實現的。那是怎么做到讀寫分離的呢?來看看下面這段代碼:

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count  */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

這段代碼在Sync靜態內部類中,這里有兩個關鍵方法sharedCount和exclusiveCount,通過名字可以看出sharedCount是共享鎖的數量,exclusiveCount是獨占鎖的數量。共享鎖通過對c像右位移16位獲得,獨占鎖通過和16位的1與運算獲得。舉個例子,當獲取讀鎖的線程有3個,寫鎖的線程有1個(當然這是不可能同時有的),state就表示為0000 0000 0000 0011 0000 0000 0000 0001,高16位代表讀鎖,通過向右位移16位(c >>> SHARED_SHIFT)得倒10進制的3,通過和0000 0000 0000 0000 1111 1111 1111 1111與運算(c & EXCLUSIVE_MASK),獲得10進制的1。弄懂了着幾個方法,就明白了為什么通過一個state實現了讀寫共享。

這當中還有一個問題,由於16位最大全1表示為65535,所以讀鎖和寫鎖最多可以獲取65535個。

1.3 WriteLock和ReentrantLock獲取鎖的區別

上面說過,WriteLock也是獨占鎖,那么他和ReentrantLock有什么區別呢?最大的區別就在獲取鎖時WriteLock不僅需要考慮是否有其他寫鎖占用,同時還要考慮是否有其他讀鎖,而ReentrantLock只需要考慮自身是否被占用就行了。來看下WriteLock獲取鎖的源代碼:

public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) && //嘗試獲取獨占鎖
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //獲取失敗后排隊
        selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {

    Thread current = Thread.currentThread();
    int c = getState();  //獲取共享變量state
    int w = exclusiveCount(c); //獲取寫鎖數量
    if (c != 0) { //有讀鎖或者寫鎖
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread()) //寫鎖為0(證明有讀鎖),或者持有寫鎖的線程不為當前線程
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);  //當前線程持有寫鎖,為重入鎖,+acquires即可
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires)) //CAS操作失敗,多線程情況下被搶占,獲取鎖失敗。CAS成功則獲取鎖成功
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

這段代碼是不是很熟悉?和ReentrantLock中獲取鎖的代碼很相似,差別在於其中調用了exclusiveCount方法來獲取是否存在寫鎖,然后通過c != 0和w == 0判斷了是否存在讀鎖。acquireQueued和addWaiter就不詳細解說了,需要了解的可以查看前面ReentrantLock的文章。
到這里大家應該對ReentrantReadWriteLock和ReentrantLock的區別應該做到心里有數了吧。

1.4 ReadLock和Semaphore獲取鎖的區別

WriteLock是獨占模式,我們比較了它和ReentrantLock獨占鎖獲取鎖的區別,這里我們再看看ReadLock在獲取鎖上有什么不同呢?先看下面的源代碼:

protected final int tryAcquireShared(int unused) {

	Thread current = Thread.currentThread();
	int c = getState();
	if (exclusiveCount(c) != 0 &&
		getExclusiveOwnerThread() != current) //寫鎖不等於0的情況下,驗證是否是當前寫鎖嘗試獲取讀鎖
		return -1;
	int r = sharedCount(c);  //獲取讀鎖數量
	if (!readerShouldBlock() && //讀鎖不需要阻塞
		r < MAX_COUNT &&  //讀鎖小於最大讀鎖數量
		compareAndSetState(c, c + SHARED_UNIT)) { //CAS操作嘗試設置獲取讀鎖 也就是高位加1
		if (r == 0) {  //當前線程第一個並且第一次獲取讀鎖,
			firstReader = current;
			firstReaderHoldCount = 1;
		} else if (firstReader == current) { //當前線程是第一次獲取讀鎖的線程
			firstReaderHoldCount++;
		} else { // 當前線程不是第一個獲取讀鎖的線程,放入線程本地變量
			HoldCounter rh = cachedHoldCounter;
			if (rh == null || rh.tid != getThreadId(current))
				cachedHoldCounter = rh = readHolds.get();
			else if (rh.count == 0)
				readHolds.set(rh);
			rh.count++;
		}
		return 1;
	}
	return fullTryAcquireShared(current);
}ß

在上面的代碼中嘗試獲取讀鎖的過程和獲取寫鎖的過程也很相似,不同在於讀鎖只要沒有寫鎖占用並且不超過最大獲取數量都可以嘗試獲取讀鎖,而寫鎖不僅需要考慮讀鎖是否占用,也要考慮寫鎖是否占用。上面的代碼中firstReader,firstReaderHoldCount以及cachedHoldCounter都是為readHolds(ThreadLocalHoldCounter)服務的,用來記錄每個讀鎖獲取線程的獲取次數,方便獲取當前線程持有鎖的次數信息。在ThreadLocal基礎上添加了一個Int變量來統計次數,可以通過他們的實現來理解:

static final class ThreadLocalHoldCounter
	extends ThreadLocal<HoldCounter> { //ThreadLocal變量ß
	public HoldCounter initialValue() {
		return new HoldCounter();
	}
}

static final class HoldCounter {
	int count = 0;           //當前線程持有鎖的次數
	// Use id, not reference, to avoid garbage retention
	final long tid = getThreadId(Thread.currentThread());   //當前線程ID
}

2. 你能用ReentrantReadWriteLock實現一個簡單的緩存嗎?

先來分析一個簡單緩存需要滿足的功能,這里我們為了實現簡單,不考慮緩存過期策略等復雜因素。

  • 緩存主要提供兩個功能:讀和寫。
  • 讀時如果緩存中存在數據,則立即返回數據。
  • 讀時如果緩存中不存在數據,則需要從其他途徑獲取數據,同時寫入緩存。
  • 在寫入緩存的同時,為了避免其他線程同時獲取這個緩存中不存在的數據,需要阻塞其他讀線程。
    下面我們就來通過ReentrantReadWriteLock實現上述功能:
public static void ReentrantReadWriteLockCacheSystem() {

	//這里為了實現簡單,將緩存大小設置為4。
	Map<String, String> cacheMap = new HashMap<>(4); 
	ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

	for (int i = 0; i < 20; i++) { //同時開啟20個線程訪問緩存

		final String key = String.valueOf(i % 4); 
		Thread thread = new Thread(new Runnable() {

			@Override
			public void run() {
				try {
					//①讀取緩存時獲取讀鎖
					readWriteLock.readLock().lock();  
					//獲取讀鎖后通過key獲取緩存中的值
					String valueStr = cacheMap.get(key); 
					//緩存值不存在
					if (valueStr == null) { 
						//③釋放讀鎖后再嘗試獲取寫鎖
						readWriteLock.readLock().unlock();  
						try {
							//④獲取寫鎖來寫入不存在的key值,
							readWriteLock.writeLock().lock();  
							valueStr = cacheMap.get(key);
							if (valueStr == null) {
								valueStr = key + " --- value";
								cacheMap.put(key, valueStr); //寫入值
								System.out.println(Thread.currentThread().getName() + " --------- put " + valueStr);
							}
							// ⑥鎖降級,避免被其他寫線程搶占后再次更新值,保證這一次操作的原子性
							readWriteLock.readLock().lock(); 
							System.out.println(Thread.currentThread().getName() + " --------- get new " + valueStr);
						} finally {
							readWriteLock.writeLock().unlock(); //⑤釋放寫鎖
						}
						
					} else {
						System.out.println(Thread.currentThread().getName() + " ------ get cache value");
					}
				} finally {
					readWriteLock.readLock().unlock();  //②釋放讀鎖
				}
			}
		}, String.valueOf(i));
		thread.start();
	}
}

首先線程會嘗試去獲取數據,需要獲取讀鎖①,如果存在值,則直接讀取並釋放讀鎖②。如果不存在值,則首先釋放已經獲取的讀鎖③,然后嘗試獲取寫鎖④。獲取到寫鎖之后,再次檢查值,因為此時可能存在其他寫鎖已經更新值,這時只需要讀取,然后釋放寫鎖⑤。如果還是沒有值,則通過其他途徑獲取值並更新然后獲取讀鎖⑥,這一步鎖降級操作是為了直接搶占讀鎖,避免釋放寫鎖之后再次獲取讀鎖時被其他寫線程搶占,這樣保證了這一次讀取數據的原子性。之后再執行⑤釋放寫鎖和②釋放讀鎖。

執行后輸出結果如下,每次執行可能輸出不同:

//1 --------- put 1 --- value
//1 --------- get new 1 --- value
//0 --------- put 0 --- value
//0 --------- get new 0 --- value
//9 ------ get cache value
//4 ------ get cache value
//2 --------- put 2 --- value
//2 --------- get new 2 --- value
//11 --------- put 3 --- value
//11 --------- get new 3 --- value
//5 ------ get cache value
//13 ------ get cache value
//6 ------ get cache value
//8 ------ get cache value
//7 ------ get cache value
//3 --------- get new 3 --- value
//10 ------ get cache value
//12 ------ get cache value
//14 ------ get cache value
//15 ------ get cache value
//16 ------ get cache value
//17 ------ get cache value
//18 ------ get cache value
//19 ------ get cache value

3. 你能自己實現一個簡單的讀寫鎖嗎?

經過上面對讀寫鎖原理的初步分析和使用,現在你能自己實現一個簡單的讀寫鎖了嗎?這里列出了一步步實現一個簡單的讀寫鎖的過程,你可以按這個步驟自己先實現一遍。

  • 1 定義一個讀寫鎖共享變量state
  • 2 state高16位為讀鎖數量,低16位為寫鎖數量。盡量模擬ReentrantReadWriteLock的實現
  • 3 獲取讀鎖時先判斷是否有寫鎖,有則等待,沒有則將讀鎖數量加1
  • 4 釋放讀鎖數量減1,通知所有等待線程
  • 5 獲取寫鎖時需要判斷讀鎖和寫鎖是否都存在,有則等待,沒有則將寫鎖數量加1
  • 6 釋放寫鎖數量減1,通知所有等待線程
    我給出的實現代碼如下:
public class MyReadWriteLock {

	private int state = 0; //1. 定義一個讀寫鎖共享變量state
	
	//2. state高16位為讀鎖數量
	private int GetReadCount() { 
		return state >>> 16;
	}
	
	//2. 低16位為寫鎖數量
	private int GetWriteCount() { 
		return state & ((1 << 16) - 1);
	}
	
	//3. 獲取讀鎖時先判斷是否有寫鎖,有則等待,沒有則將讀鎖數量加1
	public synchronized void lockRead() throws InterruptedException{  
		
		while (GetWriteCount() > 0) {
			wait();
		}
		
		System.out.println("lockRead ---" + Thread.currentThread().getName()); 
		state = state + (1 << 16);
	}
	
	//4. 釋放讀鎖數量減1,通知所有等待線程
	public synchronized void unlockRead() {  
		state = state - ((1 << 16));
		notifyAll();
	}
	
	//5. 獲取寫鎖時需要判斷讀鎖和寫鎖是否都存在,有則等待,沒有則將寫鎖數量加1
	public synchronized void lockWriters() throws InterruptedException{  
		
		while (GetReadCount() > 0 || GetWriteCount() > 0) {
			wait();
		}
		System.out.println("lockWriters ---" + Thread.currentThread().getName());
		state++;
	}
	
	//6. 釋放寫鎖數量減1,通知所有等待線程
	public synchronized void unlockWriters(){  
		
		state--;
		notifyAll();
	}
}

4. 讀寫鎖會發生寫飢餓的情況嗎?如果發生,有沒有比較好的解決辦法?

在讀寫的過程中,寫操作一般是優先的,不能因為讀操作過多,寫操作一直等待的狀況發生,這樣會導致數據一直得不到更新,發生寫飢餓。現在大家思考一下上面我們實現的簡單讀寫鎖,是否能做到這點呢?答案很明顯,在讀寫線程都wait的情況下,通過notifyAll並不能保證寫優先執行。那在這個例子中怎么改進這一點呢?

這里我通過添加一個中間變量來達到這個目的,這個中間變量在獲取寫鎖之前先記錄一個寫請求,這樣一旦notifyAll,優先檢查是否有寫請求,如果有,則讓寫操作優先執行,具體代碼如下:

public class MyReadWriteLock {

	private int state = 0; //1. 定義一個讀寫鎖共享變量state
	private int writeRequest = 0; //記錄寫請求數量
	
	//2. state高16位為讀鎖數量
	private int GetReadCount() { 
		return state >>> 16;
	}
	
	//2. 低16位為寫鎖數量
	private int GetWriteCount() { 
		return state & ((1 << 16) - 1);
	}
	
	//3. 獲取讀鎖時先判斷是否有寫鎖,有則等待,沒有則將讀鎖數量加1
	public synchronized void lockRead() throws InterruptedException{  
		//寫鎖數量大於0或者寫請求數量大於0的情況下都優先執行寫
		while (GetWriteCount() > 0 || writeRequest > 0) { 
			wait();
		}
		
		System.out.println("lockRead ---" + Thread.currentThread().getName()); 
		state = state + (1 << 16);
	}
	
	//4. 釋放讀鎖數量減1,通知所有等待線程
	public synchronized void unlockRead() {  
		state = state - ((1 << 16));
		notifyAll();
	}
	
	//5. 獲取寫鎖時需要判斷讀鎖和寫鎖是否都存在,有則等待,沒有則將寫鎖數量加1
	public synchronized void lockWriters() throws InterruptedException{  
		
		writeRequest++; //寫請求+1
		while (GetReadCount() > 0 || GetWriteCount() > 0) {
			wait();
		}
		writeRequest--; //獲取寫鎖后寫請求-1
		System.out.println("lockWriters ---" + Thread.currentThread().getName());
		state++;
	}
	
	//6. 釋放寫鎖數量減1,通知所有等待線程
	public synchronized void unlockWriters(){  
		
		state--;
		notifyAll();
	}
}

大家可以測試下上面的代碼看看是不是寫請求都優先執行了呢?現在我們把這個問題放到ReentrantReadWriteLock中來考慮,顯而易見,ReentrantReadWriteLock也會發生寫請求飢餓的情況,因為寫請求一樣會排隊,不管是公平鎖還是非公平鎖,在有讀鎖的情況下,都不能保證寫鎖一定能獲取到,這樣只要讀鎖一直占用,就會發生寫飢餓的情況。那么JDK就沒有提供什么好辦法來解決這個問題嗎?當然是有的,那就是JDK8中新增的改進讀寫鎖---StampedLock,在下一篇文章中將會對StampedLock進行詳細講解。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM