概述
我們在介紹AbstractQueuedSynchronizer的時候介紹過,AQS支持獨占式同步狀態獲取/釋放、共享式同步狀態獲取/釋放兩種模式,對應的典型應用分別是ReentrantLock和Semaphore,AQS還可以混合兩種模式使用,讀寫鎖ReentrantReadWriteLock就是如此。
設想以下情景:我們在系統中有一個多線程訪問的緩存,多個線程都可以對緩存進行讀或寫操作,但是讀操作遠遠多於寫操作,要求寫操作要線程安全,且寫操作執行完成要求對當前的所有讀操作馬上可見。
分析上面的需求:因為有多個線程可能會執行寫操作,因此多個線程的寫操作必須同步串行執行;而寫操作執行完成要求對當前的所有讀操作馬上可見,這就意味着當有線程正在讀的時候,要阻塞寫操作,當正在執行寫操作時,要阻塞讀操作。一個簡單的實現就是將數據直接加上互斥鎖,同一時刻不管是讀還是寫線程,都只能有一個線程操作數據。但是這樣的問題就是如果當前只有N個讀線程,沒有寫線程,這N個讀線程也要傻呵呵的排隊讀,盡管其實是可以安全並發提高效率的。因此理想的實現是:
當有寫線程時,則寫線程獨占同步狀態。
當沒有寫線程時只有讀線程時,則多個讀線程可以共享同步狀態。
讀寫鎖就是為了實現這種效果而生。
使用示例
我們先來看一下讀寫鎖怎么使用,這里我們基於hashmap(本身線程不安全)做一個多線程並發安全的緩存:
public class ReadWriteCache { private static Map<String, Object> data = new HashMap<>(); private static ReadWriteLock lock = new ReentrantReadWriteLock(false); private static Lock rlock = lock.readLock(); private static Lock wlock = lock.writeLock(); public static Object get(String key) { rlock.lock(); try { return data.get(key); } finally { rlock.unlock(); } } public static Object put(String key, Object value) { wlock.lock(); try { return data.put(key, value); } finally { wlock.unlock(); } } }
限於篇幅我們只實現2個方法,get和put。從代碼可以看出,我們先創建一個 ReentrantReadWriteLock 對象,構造函數 false 代表是非公平的(非公平的含義和ReentrantLock相同)。然后通過readLock、writeLock方法分別獲取讀鎖和寫鎖。在做讀操作的時候,也就是get方法,我們要先獲取讀鎖;在做寫操作的時候,即put方法,我們要先獲取寫鎖。
通過以上代碼,我們就構造了一個線程安全的緩存,達到我們之前說的:寫線程獨占同步狀態,多個讀線程可以共享同步狀態。
源碼分析
我們先來看下 ReentrantReadWriteLock 類的整體結構:
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { private final ReentrantReadWriteLock.ReadLock readerLock; /** Inner class providing writelock */ private final ReentrantReadWriteLock.WriteLock writerLock; /** Performs all synchronization mechanics */ final Sync sync; public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
public static class ReadLock implements Lock, java.io.Serializable {}
public static class WriteLock implements Lock, java.io.Serializable {} }
可以看到,在公平鎖與非公平鎖的實現上,與ReentrantLock一樣,也是有一個繼承AQS的內部類Sync,然后NonfairSync和FairSync都繼承Sync,通過構造函數傳入的布爾值決定要構造哪一種Sync實例。
讀寫鎖比ReentrantLock多出了兩個內部類:ReadLock和WriteLock, 用來定義讀鎖和寫鎖,然后在構造函數中,會構造一個讀鎖和一個寫鎖實例保存到成員變量 readerLock 和 writerLock。我們在上面的示例中使用到的 readLock() 和 writeLock() 方法就是返回這兩個成員變量保存的鎖實例。
我們在Sync類中可以看到下列代碼:
static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); //每次要讓共享鎖+1,就應該讓state加 1<<16 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //每種鎖的最大重入數量 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
可以看到主要是幾個位移操作,通過上面的整體結構,我們知道了在讀寫鎖內保存了讀鎖和寫鎖的兩個實例。之前在ReentrantLock中,我們知道鎖的狀態是保存在Sync實例的state字段中的(繼承自父類AQS),現在有了讀寫兩把鎖,然而可以看到還是只有一個Sync實例,那么一個Sync實例的state是如何同時保存兩把鎖的狀態的呢?答案就是用了位分隔:
state字段是32位的int,讀寫鎖用state的低16位保存寫鎖(獨占鎖)的狀態;高16位保存讀鎖(共享鎖)的狀態。
因此要獲取獨占鎖當前的重入數量,就是 state & ((1 << 16) -1) (即 exclusiveCount 方法)
要獲取共享鎖當前的重入數量,就是 state >>> 16 (即 sharedCount 方法)
下面我們具體看寫鎖和讀鎖的實現。
寫鎖
看下WriteLock類中的lock和unlock方法:
public void lock() { sync.acquire(1); } public void unlock() { sync.release(1); }
可以看到就是調用的獨占式同步狀態的獲取與釋放,因此真實的實現就是Sync的 tryAcquire和 tryRelease。
寫鎖的獲取
看下tryAcquire:
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //獲取獨占鎖的重入數 if (c != 0) { // 當前state不為0,此時:如果寫鎖狀態為0說明讀鎖此時被占用返回false;如果寫鎖狀態不為0且寫鎖沒有被當前線程持有返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //寫鎖重入數溢出 // Reentrant acquire setState(c + acquires); return true; }
//到這里了說明state為0,嘗試直接cas。writerShouldBlock是為了實現公平或非公平策略的 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
邏輯很簡單,直接看注釋就能理解。
寫鎖的釋放
看下tryRelease:
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //非獨占模式直接拋異常 int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); //如果獨占模式重入數為0了,說明獨占模式被釋放 setState(nextc); //不管獨占模式是否被釋放,更新獨占重入數 return free; }
邏輯很簡單,直接看注釋就能理解。
讀鎖
類似於寫鎖,讀鎖的lock和unlock的實際實現對應Sync的 tryAcquireShared 和 tryReleaseShared方法。
讀鎖的獲取
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //如果獨占模式被占且不是當前線程持有,則獲取失敗 int r = sharedCount(c);
//如果公平策略沒有要求阻塞且重入數沒有到達最大值,則直接嘗試CAS更新state if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
//更新成功后會在firstReaderHoldCount中或readHolds(ThreadLocal類型的)的本線程副本中記錄當前線程重入數(淺藍色代碼),這是為了實現jdk1.6中加入的getReadHoldCount()方法的,這個方法能獲取當前線程重入共享鎖的次數(state中記錄的是多個線程的總重入次數),加入了這個方法讓代碼復雜了不少,但是其原理還是很簡單的:如果當前只有一個線程的話,還不需要動用ThreadLocal,直接往firstReaderHoldCount這個成員變量里存重入數,當有第二個線程來的時候,就要動用ThreadLocal變量readHolds了,每個線程擁有自己的副本,用來保存自己的重入數。 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); //用來處理CAS沒成功的情況,邏輯和上面的邏輯是類似的,就是加了無限循環 }
下面這個方法就不用細說了,和上面的處理邏輯類似,加了無限循環用來處理CAS失敗的情況。
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
讀鎖的釋放
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread();
//淺藍色代碼也是為了實現jdk1.6中加入的getReadHoldCount()方法,在更新當前線程的重入數。 if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; }
//這里是真正的釋放同步狀態的邏輯,就是直接同步狀態-SHARED_UNIT,然后CAS更新,沒啥好說的 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
補充內容
通過上面的源碼分析,我們可以發現一個現象:
在線程持有讀鎖的情況下,該線程不能取得寫鎖(因為獲取寫鎖的時候,如果發現當前的讀鎖被占用,就馬上獲取失敗,不管讀鎖是不是被當前線程持有)
在線程持有寫鎖的情況下,該線程可以繼續獲取讀鎖(獲取讀鎖時如果發現寫鎖被占用,只有寫鎖沒有被當前線程占用的情況才會獲取失敗)
仔細想想,這個設計是合理的:因為當線程獲取讀鎖的時候,可能有其他線程同時也在持有讀鎖,因此不能把獲取讀鎖的線程“升級”為寫鎖;而對於獲得寫鎖的線程,它一定獨占了讀寫鎖,因此可以繼續讓它獲取讀鎖,當它同時獲取了寫鎖和讀鎖后,還可以先釋放寫鎖繼續持有讀鎖,這樣一個寫鎖就“降級”為了讀鎖。
綜上:
一個線程要想同時持有寫鎖和讀鎖,必須先獲取寫鎖再獲取讀鎖;
寫鎖可以“降級”為讀鎖;
讀鎖不能“升級”為寫鎖。
總結
讀寫鎖還是很實用的,因為一般場景下,數據的並發操作都是讀多於寫,在這種情況下,讀寫鎖能夠提供比排它鎖更好的並發性。
在讀寫鎖的實現方面,本來以為會比較復雜,結果看完源碼的感受也是快刀切西瓜,看來AQS的設計真的很棒,在AQS的基礎上構建的組件實現都很簡單。