概述
ReentrantReadWriteLock是Lock的另一種實現方式,我們已經知道了ReentrantLock是一個排他鎖,同一時間只允許一個線程訪問,而ReentrantReadWriteLock允許多個讀線程同時訪問,但不允許寫線程和讀線程、寫線程和寫線程同時訪問。相對於排他鎖,提高了並發性。在實際應用中,大部分情況下對共享數據(如緩存)的訪問都是讀操作遠多於寫操作,這時ReentrantReadWriteLock能夠提供比排他鎖更好的並發性和吞吐量。
讀寫鎖內部維護了兩個鎖,一個用於讀操作,一個用於寫操作。所有 ReadWriteLock實現都必須保證 writeLock操作的內存同步效果也要保持與相關 readLock的聯系。也就是說,成功獲取讀鎖的線程會看到寫入鎖之前版本所做的所有更新。
ReentrantReadWriteLock支持以下功能:
1)支持公平和非公平的獲取鎖的方式;
2)支持可重入。讀線程在獲取了讀鎖后還可以獲取讀鎖;寫線程在獲取了寫鎖之后既可以再次獲取寫鎖又可以獲取讀鎖;
3)還允許從寫入鎖降級為讀取鎖,其實現方式是:先獲取寫入鎖,然后獲取讀取鎖,最后釋放寫入鎖。但是,從讀取鎖升級到寫入鎖是不允許的;
4)讀取鎖和寫入鎖都支持鎖獲取期間的中斷;
5)Condition支持。僅寫入鎖提供了一個 Conditon 實現;讀取鎖不支持 Conditon ,readLock().newCondition() 會拋出 UnsupportedOperationException。
使用
示例一:利用重入來執行升級緩存后的鎖降級
1 class CachedData { 2 Object data; 3 volatile boolean cacheValid; //緩存是否有效 4 ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); 5 6 void processCachedData() { 7 rwl.readLock().lock(); //獲取讀鎖 8 //如果緩存無效,更新cache;否則直接使用data 9 if (!cacheValid) { 10 // Must release read lock before acquiring write lock 11 //獲取寫鎖前須釋放讀鎖 12 rwl.readLock().unlock(); 13 rwl.writeLock().lock(); 14 // Recheck state because another thread might have acquired 15 // write lock and changed state before we did. 16 if (!cacheValid) { 17 data = ... 18 cacheValid = true; 19 } 20 // Downgrade by acquiring read lock before releasing write lock 21 //鎖降級,在釋放寫鎖前獲取讀鎖 22 rwl.readLock().lock(); 23 rwl.writeLock().unlock(); // Unlock write, still hold read 24 } 25 26 use(data); 27 rwl.readLock().unlock(); //釋放讀鎖 28 } 29 }
示例二:使用 ReentrantReadWriteLock 來提高 Collection 的並發性
通常在 collection 數據很多,讀線程訪問多於寫線程並且 entail 操作的開銷高於同步開銷時嘗試這么做。
1 class RWDictionary { 2 private final Map<String, Data> m = new TreeMap<String, Data>(); 3 private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); 4 private final Lock r = rwl.readLock(); //讀鎖 5 private final Lock w = rwl.writeLock(); //寫鎖 6 7 public Data get(String key) { 8 r.lock(); 9 try { return m.get(key); } 10 finally { r.unlock(); } 11 } 12 public String[] allKeys() { 13 r.lock(); 14 try { return m.keySet().toArray(); } 15 finally { r.unlock(); } 16 } 17 public Data put(String key, Data value) { 18 w.lock(); 19 try { return m.put(key, value); } 20 finally { w.unlock(); } 21 } 22 public void clear() { 23 w.lock(); 24 try { m.clear(); } 25 finally { w.unlock(); } 26 } 27 }
實現原理
ReentrantReadWriteLock 也是基於AQS實現的,它的自定義同步器(繼承AQS)需要在同步狀態(一個整型變量state)上維護多個讀線程和一個寫線程的狀態,使得該狀態的設計成為讀寫鎖實現的關鍵。如果在一個整型變量上維護多種狀態,就一定需要“按位切割使用”這個變量,讀寫鎖將變量切分成了兩個部分,高16位表示讀,低16位表示寫。
域
ReentrantReadWriteLock含有兩把鎖readerLock和writerLock,其中ReadLock和WriteLock都是內部類。
1 /** Inner class providing readlock */ 2 private final ReentrantReadWriteLock.ReadLock readerLock; 3 /** Inner class providing writelock */ 4 private final ReentrantReadWriteLock.WriteLock writerLock; 5 /** Performs all synchronization mechanics */ 6 final Sync sync;
寫鎖的獲取與釋放(WriteLock)
寫鎖是一個可重入的獨占鎖,使用AQS提供的獨占式獲取同步狀態的策略。
(一)獲取寫鎖
1 //獲取寫鎖 2 public void lock() { 3 sync.acquire(1); 4 } 5 6 //AQS實現的獨占式獲取同步狀態方法 7 public final void acquire(int arg) { 8 if (!tryAcquire(arg) && 9 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 10 selfInterrupt(); 11 } 12 13 //自定義重寫的tryAcquire方法 14 protected final boolean tryAcquire(int acquires) { 15 /* 16 * Walkthrough: 17 * 1. If read count nonzero or write count nonzero 18 * and owner is a different thread, fail. 19 * 2. If count would saturate, fail. (This can only 20 * happen if count is already nonzero.) 21 * 3. Otherwise, this thread is eligible for lock if 22 * it is either a reentrant acquire or 23 * queue policy allows it. If so, update state 24 * and set owner. 25 */ 26 Thread current = Thread.currentThread(); 27 int c = getState(); 28 int w = exclusiveCount(c); //取同步狀態state的低16位,寫同步狀態 29 if (c != 0) { 30 // (Note: if c != 0 and w == 0 then shared count != 0) 31 //存在讀鎖或當前線程不是已獲取寫鎖的線程,返回false 32 if (w == 0 || current != getExclusiveOwnerThread()) 33 return false; 34 //判斷同一線程獲取寫鎖是否超過最大次數,支持可重入 35 if (w + exclusiveCount(acquires) > MAX_COUNT) // 36 throw new Error("Maximum lock count exceeded"); 37 // Reentrant acquire 38 setState(c + acquires); 39 return true; 40 } 41 //此時c=0,讀鎖和寫鎖都沒有被獲取 42 if (writerShouldBlock() || 43 !compareAndSetState(c, c + acquires)) 44 return false; 45 setExclusiveOwnerThread(current); 46 return true; 47 }
從源代碼可以看出,獲取寫鎖的步驟如下:
1)判斷同步狀態state是否為0。如果state!=0,說明已經有其他線程獲取了讀鎖或寫鎖,執行2);否則執行5)。
2)判斷同步狀態state的低16位(w)是否為0。如果w=0,說明其他線程獲取了讀鎖,返回false;如果w!=0,說明其他線程獲取了寫鎖,執行步驟3)。
3)判斷獲取了寫鎖是否是當前線程,若不是返回false,否則執行4);
4)判斷當前線程獲取寫鎖是否超過最大次數,若超過,拋異常,反之更新同步狀態(此時當前線程以獲取寫鎖,更新是線程安全的),返回true。
5)此時讀鎖或寫鎖都沒有被獲取,判斷是否需要阻塞(公平和非公平方式實現不同),如果不需要阻塞,則CAS更新同步狀態,若CAS成功則返回true,否則返回false。如果需要阻塞則返回false。
writerShouldBlock() 表示當前線程是否應該被阻塞。NonfairSync和FairSync中有不同是實現。
1 //FairSync中需要判斷是否有前驅節點,如果有則返回false,否則返回true。遵循FIFO 2 final boolean writerShouldBlock() { 3 return hasQueuedPredecessors(); 4 } 5 6 //NonfairSync中直接返回false,可插隊。 7 final boolean writerShouldBlock() { 8 return false; // writers can always barge 9 }
(二)釋放寫鎖
1 //寫鎖釋放 2 public void unlock() { 3 sync.release(1); 4 } 5 6 //AQS提供獨占式釋放同步狀態的方法 7 public final boolean release(int arg) { 8 if (tryRelease(arg)) { 9 Node h = head; 10 if (h != null && h.waitStatus != 0) 11 unparkSuccessor(h); 12 return true; 13 } 14 return false; 15 } 16 17 //自定義重寫的tryRelease方法 18 protected final boolean tryRelease(int releases) { 19 if (!isHeldExclusively()) 20 throw new IllegalMonitorStateException(); 21 int nextc = getState() - releases; //同步狀態減去releases 22 //判斷同步狀態的低16位(寫同步狀態)是否為0,如果為0則返回true,否則返回false. 23 //因為支持可重入 24 boolean free = exclusiveCount(nextc) == 0; 25 if (free) 26 setExclusiveOwnerThread(null); 27 setState(nextc); //以獲取寫鎖,不需要其他同步措施,是線程安全的 28 return free; 29 }
讀鎖的獲取與釋放(ReadLock)
讀鎖是一個可重入的共享鎖,采用AQS提供的共享式獲取同步狀態的策略。
(一)獲取讀鎖
1 public void lock() { 2 sync.acquireShared(1); 3 } 4 5 //使用AQS提供的共享式獲取同步狀態的方法 6 public final void acquireShared(int arg) { 7 if (tryAcquireShared(arg) < 0) 8 doAcquireShared(arg); 9 } 10 11 //自定義重寫的tryAcquireShared方法,參數是unused,因為讀鎖的重入計數是內部維護的 12 protected final int tryAcquireShared(int unused) { 13 /* 14 * Walkthrough: 15 * 1. If write lock held by another thread, fail. 16 * 2. Otherwise, this thread is eligible for 17 * lock wrt state, so ask if it should block 18 * because of queue policy. If not, try 19 * to grant by CASing state and updating count. 20 * Note that step does not check for reentrant 21 * acquires, which is postponed to full version 22 * to avoid having to check hold count in 23 * the more typical non-reentrant case. 24 * 3. If step 2 fails either because thread 25 * apparently not eligible or CAS fails or count 26 * saturated, chain to version with full retry loop. 27 */ 28 Thread current = Thread.currentThread(); 29 int c = getState(); 30 //exclusiveCount(c)取低16位寫鎖。存在寫鎖且當前線程不是獲取寫鎖的線程,返回-1,獲取讀鎖失敗。 31 if (exclusiveCount(c) != 0 && 32 getExclusiveOwnerThread() != current) 33 return -1; 34 int r = sharedCount(c); //取高16位讀鎖, 35 //readerShouldBlock()用來判斷當前線程是否應該被阻塞 36 if (!readerShouldBlock() && 37 r < MAX_COUNT && //MAX_COUNT為獲取讀鎖的最大數量,為16位的最大值 38 compareAndSetState(c, c + SHARED_UNIT)) { 39 //firstReader是不會放到readHolds里的, 這樣,在讀鎖只有一個的情況下,就避免了查找readHolds。 40 if (r == 0) { // 是 firstReader,計數不會放入 readHolds。 41 firstReader = current; 42 firstReaderHoldCount = 1; 43 } else if (firstReader == current) { //firstReader重入 44 firstReaderHoldCount++; 45 } else { 46 // 非 firstReader 讀鎖重入計數更新 47 HoldCounter rh = cachedHoldCounter; //讀鎖重入計數緩存,基於ThreadLocal實現 48 if (rh == null || rh.tid != current.getId()) 49 cachedHoldCounter = rh = readHolds.get(); 50 else if (rh.count == 0) 51 readHolds.set(rh); 52 rh.count++; 53 } 54 return 1; 55 } 56 //第一次獲取讀鎖失敗,有兩種情況: 57 //1)沒有寫鎖被占用時,嘗試通過一次CAS去獲取鎖時,更新失敗(說明有其他讀鎖在申請) 58 //2)當前線程占有寫鎖,並且有其他寫鎖在當前線程的下一個節點等待獲取寫鎖,除非當前線程的下一個節點被取消,否則fullTryAcquireShared也獲取不到讀鎖 59 return fullTryAcquireShared(current); 60 }
從源代碼可以看出,獲取讀鎖的大致步驟如下:
1)通過同步狀態低16位判斷,如果存在寫鎖且當前線程不是獲取寫鎖的線程,返回-1,獲取讀鎖失敗;否則執行步驟2)。
2)通過readerShouldBlock判斷當前線程是否應該被阻塞,如果不應該阻塞則嘗試CAS同步狀態;否則執行3)。
3)第一次獲取讀鎖失敗,通過fullTryAcquireShared再次嘗試獲取讀鎖。
readerShouldBlock方法用來判斷當前線程是否應該被阻塞,NonfairSync和FairSync中有不同是實現。
1 //FairSync中需要判斷是否有前驅節點,如果有則返回false,否則返回true。遵循FIFO 2 final boolean readerShouldBlock() { 3 return hasQueuedPredecessors(); 4 } 5 final boolean readerShouldBlock() { 6 return apparentlyFirstQueuedIsExclusive(); 7 } 8 //當head節點不為null且head節點的下一個節點s不為null且s是獨占模式(寫線程)且s的線程不為null時,返回true。 9 //目的是不應該讓寫鎖始終等待。作為一個啟發式方法用於避免可能的寫線程飢餓,這只是一種概率性的作用,因為如果有一個等待的寫線程在其他尚未從隊列中出隊的讀線程后面等待,那么新的讀線程將不會被阻塞。 10 final boolean apparentlyFirstQueuedIsExclusive() { 11 Node h, s; 12 return (h = head) != null && 13 (s = h.next) != null && 14 !s.isShared() && 15 s.thread != null; 16 }
fullTryAcquireShared方法
1 final int fullTryAcquireShared(Thread current) { 2 /* 3 * This code is in part redundant with that in 4 * tryAcquireShared but is simpler overall by not 5 * complicating tryAcquireShared with interactions between 6 * retries and lazily reading hold counts. 7 */ 8 HoldCounter rh = null; 9 for (;;) { 10 int c = getState(); 11 //如果當前線程不是寫鎖的持有者,直接返回-1,結束嘗試獲取讀鎖,需要排隊去申請讀鎖 12 if (exclusiveCount(c) != 0) { 13 if (getExclusiveOwnerThread() != current) 14 return -1; 15 // else we hold the exclusive lock; blocking here 16 // would cause deadlock. 17 //如果需要阻塞,說明除了當前線程持有寫鎖外,還有其他線程已經排隊在申請寫鎖,故,即使申請讀鎖的線程已經持有寫鎖(寫鎖內部再次申請讀鎖,俗稱鎖降級)還是會失敗,因為有其他線程也在申請寫鎖,此時,只能結束本次申請讀鎖的請求,轉而去排隊,否則,將造成死鎖。 18 } else if (readerShouldBlock()) { 19 // Make sure we're not acquiring read lock reentrantly 20 if (firstReader == current) { 21 //如果當前線程是第一個獲取了寫鎖,那其他線程無法申請寫鎖 22 // assert firstReaderHoldCount > 0; 23 } else { 24 //從readHolds中移除當前線程的持有數,然后返回-1,然后去排隊獲取讀鎖。 25 if (rh == null) { 26 rh = cachedHoldCounter; 27 if (rh == null || rh.tid != current.getId()) { 28 rh = readHolds.get(); 29 if (rh.count == 0) 30 readHolds.remove(); 31 } 32 } 33 if (rh.count == 0) 34 return -1; 35 } 36 } 37 if (sharedCount(c) == MAX_COUNT) 38 throw new Error("Maximum lock count exceeded"); 39 if (compareAndSetState(c, c + SHARED_UNIT)) { 40 //示成功獲取讀鎖,后續就是更新readHolds等內部變量, 41 if (sharedCount(c) == 0) { 42 firstReader = current; 43 firstReaderHoldCount = 1; 44 } else if (firstReader == current) { 45 firstReaderHoldCount++; 46 } else { 47 if (rh == null) 48 rh = cachedHoldCounter; 49 if (rh == null || rh.tid != current.getId()) 50 rh = readHolds.get(); 51 else if (rh.count == 0) 52 readHolds.set(rh); 53 rh.count++; 54 cachedHoldCounter = rh; // cache for release 55 } 56 return 1; 57 } 58 } 59 }
(二)釋放讀鎖
1 public void unlock() { 2 sync.releaseShared(1); 3 } 4 5 public final boolean releaseShared(int arg) { 6 if (tryReleaseShared(arg)) { 7 doReleaseShared(); 8 return true; 9 } 10 return false; 11 } 12 13 protected final boolean tryReleaseShared(int unused) { 14 Thread current = Thread.currentThread(); 15 //更新計數 16 if (firstReader == current) { 17 // assert firstReaderHoldCount > 0; 18 if (firstReaderHoldCount == 1) 19 firstReader = null; 20 else 21 firstReaderHoldCount--; 22 } else { 23 HoldCounter rh = cachedHoldCounter; 24 if (rh == null || rh.tid != current.getId()) 25 rh = readHolds.get(); 26 int count = rh.count; 27 if (count <= 1) { 28 readHolds.remove(); 29 if (count <= 0) 30 throw unmatchedUnlockException(); 31 } 32 --rh.count; 33 } 34 //自旋CAS,減去1<<16 35 for (;;) { 36 int c = getState(); 37 int nextc = c - SHARED_UNIT; 38 if (compareAndSetState(c, nextc)) 39 // Releasing the read lock has no effect on readers, 40 // but it may allow waiting writers to proceed if 41 // both read and write locks are now free. 42 return nextc == 0; 43 } 44 }
參考資料
(java並發鎖ReentrantReadWriteLock讀寫鎖源碼分析)http://blog.csdn.net/prestigeding/article/details/53286756
《Java並發編程的藝術》