讀鎖的調用,最終委派給其內部類 Sync extends AbstractQueuedSynchronizer
/** * 獲取讀鎖,如果寫鎖不是由其他線程持有,則獲取並立即返回; * 如果寫鎖被其他線程持有,阻塞,直到讀鎖被獲得。 */ public void lock() { sync.acquireShared(1); } /** * 以共享模式獲取對象,忽略中斷。通過至少先調用一次 tryAcquireShared(int) 來實現此方法,並在成功時返回。 * 否則將線程加入隊列,在阻塞和運行之間切換,重復調用tryAcquireShared直到成功 *( possibly repeatedly blocking and unblocking, invoking tryAcquireShared until success) */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg);
tryAcquireShared方法的邏輯:
整體思路如下,具體實現細節,請參考下面的源碼分析
一 寫鎖被占用的情況
1 寫鎖被其他線程占用,獲取失敗
2 寫鎖被自己占用,表示“鎖降級”,進入【循環cas取讀鎖】流程
二 寫鎖沒有被占用的情況
1 根據讀鎖獲取策略判斷是否阻塞(readerShouldBlock方法)
(1) 公平鎖策略:如果當前線程不是同步隊列中的第一個節點,則阻塞
(2) 非公平鎖策略:為了防止寫線程飢餓,如果同步隊列中的第一個節點是寫線程,則阻塞當前線程。
2 如果需要阻塞,分以下2種情況:
(1)當前線程是讀鎖重入,則不需要阻塞,進入【循環cas取讀鎖】流程
(2)當前線程不是讀鎖重入,則獲取失敗
3 如果不需要阻塞,進入【循環cas取讀鎖】流程
三 【循環cas取讀鎖】
1 tryAcquireShared方法先進行了一次cas取鎖操作,如果獲取失敗,則調用fullTryAcquireShared方法,循環獲取。
2 什么是cas?請參考CAS
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); //c是鎖狀態為:高位16位表示共享鎖的數量,低位16位表示獨占鎖的數量
int c = getState(); //exclusiveCount(c) 取低16位的值,也就是寫鎖狀態位:不等於0表示寫鎖被占用
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) //寫鎖被其他線程占用,獲取讀鎖失敗
return -1; //取高16位的值,讀鎖狀態位
int r = sharedCount(c); //readerShouldBlock()根據讀鎖獲取策略,返回是否阻塞當前讀鎖獲取操作。后面會詳細說明此方法
if (!readerShouldBlock() && r < MAX_COUNT &&
//cas修改高16位的讀鎖狀態,即獲取讀鎖
compareAndSetState(c, c + SHARED_UNIT)) { //首次獲取讀鎖
if (r == 0) { //緩存首次獲取讀鎖的線程,及其讀鎖重入次數
firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { //cachedHoldCounter是最后獲取鎖的線程的讀鎖重入次數
HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) //readHolds是緩存了當前線程的讀鎖重入次數的ThreadLocal //當前線程自然是最后獲取鎖的線程,故將當前線程的holdCounter賦給cachedHoldCounter
cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) //緩存當前線程的holdCounter //fullTryAcquireShared()方法中,
//獲取讀鎖失敗的線程會執行:readHolds.remove(),故此時需要重新設置
readHolds.set(rh); rh.count++; } return 1; } //首次獲取讀鎖失敗后,重試獲取
return fullTryAcquireShared(current); }
/** * Full version of acquire for reads, that handles CAS misses * and reentrant reads not dealt with in tryAcquireShared. */
final int fullTryAcquireShared(Thread current) { //rh表示當前線程的鎖計數器
HoldCounter rh = null; for (;;) { int c = getState(); //寫鎖被占用
if (exclusiveCount(c) != 0) { //如果其他線程占用,讀鎖獲取失敗。如果當前線程占用,表示“鎖降級”。
if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { //重入鎖不需要阻塞。 // Make sure we're not acquiring read lock reentrantly
if (firstReader == current) { // assert firstReaderHoldCount > 0; //當前線程就是第一個獲取讀鎖的線程,那么此時當然是重入鎖。
} else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) { rh = readHolds.get(); if (rh.count == 0) //線程阻塞之前,清空readHolds
readHolds.remove(); } } if (rh.count == 0) //當前線程的鎖計數器為0,非重入鎖,需要阻塞。
return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //cas設置讀鎖狀態位
if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { //緩存首次獲取讀鎖的線程,以及鎖計數器
firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); else if (rh.count == 0) //鎖計數器放入ThreadLocal
readHolds.set(rh); rh.count++; //此時rh就是最后獲取讀鎖的線程的鎖計數器
cachedHoldCounter = rh; // cache for release
} return 1; } } }
/** * 非公平鎖的讀鎖獲取策略 */ final boolean readerShouldBlock() { //為了防止寫線程飢餓 //如果同步隊列中的第一個線程是以獨占模式獲取鎖(寫鎖), //那么當前獲取讀鎖的線程需要阻塞,讓隊列中的第一個線程先執行 return apparentlyFirstQueuedIsExclusive(); } /** * 公平鎖的讀鎖獲取策略 */ final boolean readerShouldBlock() { //如果當前線程不是同步隊列頭結點的next節點(head.next) //則阻塞當前線程 return hasQueuedPredecessors(); }