先前,筆者和大家一起了解了ReentrantReadWriteLock的寫鎖實現,其實寫鎖本身實現的邏輯很少,基本上還是復用AQS內部的等待隊列思想。下面,我們來看看ReentrantReadWriteLock的讀鎖實現。
當調用讀鎖的lock()方法時,會調用到Sync的父類AQS實現的acquireShared(int arg)方法,在這個方法又會調用子類實現的tryAcquireShared(arg)方法嘗試獲取讀鎖,如果返回大於等於0,則代表獲取讀鎖成功,返回結果小於0代表獲取讀鎖失敗。則會執行doAcquireShared(arg)方法進入等待隊列。
下面我們來看看在tryAcquireShared(arg)方法中是如何嘗試獲取讀鎖的,tryAcquireShared(arg)方法首先會獲取讀寫鎖當前狀態c,如果exclusiveCount(c)的結果不為0則代表有線程占有寫鎖,會接着判斷寫鎖的獨占線程是否是當前請求讀鎖的線程,如果不是則進入<1>處的分支返回獲取讀鎖失敗。只有寫鎖不被其他線程持有,或者占有寫鎖的線程請求讀鎖,才可以跳過分支<1>前進到代碼<2>處。
在代碼<2>處會獲取目前讀鎖被獲取的次數,之后會執行下readerShouldBlock()判斷當前請求讀鎖的線程是否應該阻塞,Sync並沒有實現這個方法,而是交由子類公平鎖和非公平鎖實現,這個方法的實現一般是判斷讀寫鎖的等待隊列中是否有線程,如果是公平鎖的實現,只要隊列中有等待讀寫鎖的線程,就會判定需要阻塞當前讀線程,如果是非公平鎖的實現,就會判斷當前隊列中最前面需要鎖的線程是讀線程還是寫線程,如果是讀線程則就不阻塞,大家可以一起共享讀鎖,如果是寫線程則需要阻塞。
之后會判斷獲取讀鎖的次數是否已到達MAX_COUNT,如果沒有到達才會執行CAS嘗試對獲取讀鎖的次數+1,由於獲取讀鎖的次數是存儲在state的前16位,所以這里會加上SHARED_UNIT,並且這里我們也看到tryAcquireShared(int unused)方法給傳入的獲取次數用的變量命名是unused,讀鎖在這里並不使用外部傳入的獲取次數,因為這個獲取次數可能大於1,會出現當前獲取讀鎖的次數+1(SHARED_UNIT)剛好到MAX_COUNT,+2(2*SHARED_UNIT)超過MAX_COUNT。
如果判斷當前請求讀鎖的線程不阻塞,當前獲取讀鎖的次數小於MAX_COUNT且CAS對獲取讀鎖+1成功,則會進入<3>處的分支,如果原先讀鎖被獲取的次數為0,即r為0,代表當前線程是第一個獲取讀鎖的線程,之前說過第一個獲取讀鎖的線程會做一個優化,Sync的字段firstReader用於指向第一個獲取讀鎖的線程,firstReaderHoldCount用於統計第一個線程獲取讀鎖的次數,因為可能出現線程在獲取讀鎖后又重新獲取的情況。當判斷線程是當前第一個獲取讀鎖的線程,會進入<4>處的分支,將firstReader指向當前線程,firstReaderHoldCount賦值為1,代表當前線程獲取一次讀鎖。如果原先讀鎖被獲取的次數不為0,且當前獲取讀鎖的線程為第一個獲取讀鎖的線程,則代表讀鎖被重入,這里會進入<5>處的分支對firstReaderHoldCount+1。
如果<4>、<5>兩個條件都不滿足,原先讀鎖的獲取次數不為0,且當前獲取讀鎖的線程不是第一個獲取讀鎖的線程,則會進入<6>處的分支,這里會先獲取cachedHoldCounter指向的HoldCounter對象,cachedHoldCounter會指向最后一個獲取讀鎖線程對應的HoldCounter對象(除了第一個獲取讀鎖的線程外),HoldCounter對象用於存儲不同線程獲取讀鎖的次數。如果rh為null,或者rh的線程id不是當前當前的線程id,這里會進入<7>處,獲取線程局部變量HoldCounter對象,並賦值給cachedHoldCounter。
之后判斷rh指向的HoldCounter對象獲取鎖的次數是否為0,如果為0會調用ThreadLocal.set(T value)保存進線程的局部變量,大家思考下為什么這里要調用ThreadLocal.set(T value)保存局部變量呢?如果當前讀鎖已經有線程在共享,當有新的線程獲取到讀鎖后會調用readHolds.get()方法執行ThreadLocalHoldCounter.initialValue()方法生成一個HoldCounter對象,雖然此時HoldCounter對象的獲取讀鎖次數count為0,但此時這個對象也緩存在線程的局部變量中了,為什么這里還要調用ThreadLocal.set(T value)保存局部變量呢?這是因為在線程釋放讀鎖的時候,如果判斷rh.count為0,會將線程對應的HoldCounter對象從線程局部變量中移除。這里可能出現cachedHoldCounter指向最后一個獲取讀鎖的線程的HoldCounter對象,線程釋放讀鎖后將HoldCounter對象從局部變量中移除,但此時cachedHoldCounter依舊指向原先線程對應HoldCounter對象,並且線程在釋放讀鎖后又重新獲取讀鎖,且期間沒有其他線程獲取讀鎖,所以這里判斷cachedHoldCounter指向的對象的線程id和當前線程id相同,就不會再調用readHolds.get()生成新的HoldCounter對象,而是復用舊的HoldCounter對象,如果HoldCounter為0,除了是新生成的,也有可能是上面所說的情況,這里會重新保存HoldCounter對象到線程的局部變量中。之后對HoldCounter對象的count字段+1表示獲取一次讀鎖,最后返回1表示獲取讀鎖成功。
上面的流程僅針對獲取讀鎖較為順利的情況,但在高並發場景下,很多事情都難以預料,比如在<3>處的readerShouldBlock() 方法返回線程應該阻塞,或者獲取讀鎖的次數已經到達MAX_COUNT,又或者執行CAS的時候失敗,有別的線程先當前線程獲取了讀鎖或者寫鎖,當前讀寫鎖的狀態已經和原始的狀態c不同了。只要出現這些情況都無法進入<3>處的分支。如果線程應該阻塞、讀鎖獲取次數到達上限,又或者寫鎖被占有這些條件倒也罷了,如果僅僅是有線程先當前線程獲取了讀鎖改變了讀寫鎖狀態state,導致<3>處的CAS失敗從而無法獲取讀鎖,則顯得有些憋屈,因為讀鎖是允許共享的。因此,這里還會在執行<9>處的fullTryAcquireShared(current)避免出現CAS未命中的情況。
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
//...
abstract static class Sync extends AbstractQueuedSynchronizer {
//...
abstract boolean readerShouldBlock();
//...
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)//<1>
return -1;
int r = sharedCount(c);//<2>
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {//<3>
if (r == 0) {//<4>
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {//<5>
firstReaderHoldCount++;
} else {//<6>
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))//<7>
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)//<8>
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);//<9>
}
//...
}
//...
public static class ReadLock implements Lock, java.io.Serializable {
//...
public void lock() {
sync.acquireShared(1);
}
//...
}
//...
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
//...
}
fullTryAcquireShared(Thread current)是對嘗試獲取讀鎖的再一重保障,實現思路其實也與上面的tryAcquireShared(int unused)很類似,只是多了一個輪詢直到判斷獲取讀鎖成功,或者讀寫鎖狀態不允許獲取讀鎖。
這里依舊是在<1>處判斷當前是否有線程占有寫鎖,如果寫鎖被占有且獨占寫鎖的線程不是當前請求讀鎖的線程則退出。再判斷寫鎖沒有被占有的前提下,再判斷讀線程是否應該被阻塞,一般只有兩種情況會進入到分支<2>,當前有線程正在共享讀寫鎖的讀鎖,如果公平鎖發現目前等待隊列中有線程,這里會判斷請求讀鎖的線程應該阻塞,如果是非公平鎖,則判斷等待隊列中是否有線程,如果有的話等待時長最久的線程是否請求寫鎖,如果是的話則要阻塞當前請求讀鎖的線程,如果不是當前請求讀鎖的線程可以和其他線程一起共享讀鎖。所以這個分支是針對目前已經有線程共享讀鎖,且等待隊列中有線程,又有新的線程來請求讀鎖做的判斷。如果出現這種情況則進入到分支<2>處,這里會判斷下請求線程對應的HoldCounter對象的獲取讀鎖次數是否為0,正常情況應該為0,會把HoldCounter對象從局部變量中移除,之后判斷獲取讀鎖次數為0,則返回-1表示獲取讀鎖失敗。正常情況下第一個獲取讀鎖的線程是不會進入到分支<2>,而除了第一個獲取讀鎖的線程外,其他已經獲取讀鎖的線程如果又重入讀鎖,是有會進入到分支<2>的,但這里會判斷不是第一個線程,於是跳過分支<3>會進入分支<4>,又因為已經獲取到讀鎖只是重入讀鎖的線程對應的獲取讀鎖次數不為0,所以對應的HoldCounter對象不會被移除,也不會判斷獲取讀鎖次數為0而返回。
如果判斷當前線程不應阻塞,或者當前線程應當阻塞后又發現當前線程早已獲取到讀鎖,會繼而執行到<5>處的代碼,判斷如果讀鎖被獲取的次數已達上限則報錯。如果未達上限,則會執行<6>處的CAS對讀鎖的獲取次數+1(SHARED_UNIT),如果CAS成功則會增加當前線程對讀鎖的獲取次數。如果<6>處的CAS失敗,可能有別的線程先當前線程修改了讀寫鎖的狀態,這里會重新開始一輪新的循環,直到成功獲取到讀鎖,或者判斷有別的線程占有了寫鎖。
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
//...
abstract static class Sync extends AbstractQueuedSynchronizer {
//...
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {//<1>
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {//<2>
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {//<3>
// assert firstReaderHoldCount > 0;
} else {//<4>
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)//<5>
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {//<6>
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
//...
}
//...
}
下面我們再來看看公平鎖和非公平鎖是如何判斷是否應該阻塞當前請求讀鎖的線程, 首先可以看到公平鎖的實現非常簡單,就僅僅是判斷隊列中是否有等待線程,哪怕這些線程都是讀線程,可以一起共享讀鎖,公平鎖也會要求當前請求讀鎖的線程先入隊等待,直到它前一個線程獲取讀鎖后喚醒自己。而非公平鎖則是先判斷隊列中頭節點的后繼節點是否為null,如果非空再判斷是否是讀鎖,如果是寫鎖那當前請求讀鎖的線程只能先乖乖入隊,如果當前線程和頭節點的后繼節點同為讀線程,就判斷不阻塞,當前線程可以嘗試獲取讀鎖。非公平鎖相較公平鎖,多了一種高並發下隊列中的線程被無限期推遲的可能,如果頭節點的后繼節點是寫線程倒也好說,讀線程只能乖乖入隊,不會延期寫線程獲取寫鎖,但如果后繼節點為讀線程,且不斷有新的讀線程成功獲取讀鎖,那么后繼節點的讀線程將被延期,因為每次嘗試用CAS修改讀寫鎖的狀態都會失敗,這里的延期也包括后繼節點之后的所有節點,不管是共享節點還是獨占節點。
static final class FairSync extends Sync {
//...
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
static final class NonfairSync extends Sync {
//...
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
//...
}
在了解讀鎖是如何加鎖后,我們來看看如何釋放讀鎖。當調用讀鎖(ReadLock)的unlock()方法時,會調用Sync的父類AQS實現的releaseShared(int arg)方法,在這個方法又會先調用子類實現的tryReleaseShared(arg)釋放讀鎖,如果釋放成功后,再調用doReleaseShared()嘗試喚醒后繼節點。下面我們就來了解下Sync類實現的tryReleaseShared(int unused),看看這個方法是如何釋放讀鎖的。
在Sync的tryReleaseShared(int unused)方法中,會先判斷當前釋放讀鎖的線程是否是第一個獲取讀鎖的線程,如果是的話則進入<1>處的分支,判斷第一個線程獲取讀鎖的次數,如果大於1,代表第一個線程在第一次獲取讀鎖后又重入讀鎖,這里簡單地對獲取讀鎖的次數-1,如果判斷釋放的時候獲取次數是1,則代表第一個線程將完全釋放讀鎖,這里會置空firstReader的指向,但不會將firstReaderHoldCount清零,因為當所有讀線程都完全釋放讀鎖后,如果再有新的讀線程請求讀鎖,會重新對firstReader、firstReaderHoldCount賦值。
如果釋放讀鎖的線程不是第一個線程,會先獲取cachedHoldCounter對象,即上一個獲取讀鎖線程對應的HoldCounter對象,判斷HoldCounter對象對應的線程是否是當前線程,如果是的話則不需要去線程局部變量查找與之對應的HoldCounter,如果不是則需要查找,在確定好線程對應的HoldCounter對象后,如果判斷線程對鎖的獲取次數是1,則代表當前線程將完全釋放讀鎖,這里會將HoldCounter對象從局部變量移除,再判斷HoldCounter對象的獲取次數是否為0,如果為0則代表當前線程沒有先獲取讀鎖就先釋放讀鎖,這里會拋出unmatchedUnlockException異常。之后會對線程獲取鎖的次數-1。
最后會用CAS的方式對讀寫鎖的讀鎖被獲取數量-1(SHARED_UNIT),這里可能存在讀線程並發釋放讀鎖的情況,所以這里可能存在CAS失敗的情況,如果這里失敗則會一直輪詢直到CAS成功,如果CAS成功則判斷當前狀態是否有線程占有讀鎖或者寫鎖,如果CAS成功后讀寫鎖的狀態為0,代表當前無讀鎖也無寫鎖,則會返回讀鎖被完全釋放。
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
//...
abstract static class Sync extends AbstractQueuedSynchronizer {
//...
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {//<1>
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {//<2>
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {//<3>
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
//...
}
//...
public static class ReadLock implements Lock, java.io.Serializable {
//...
public void unlock() {
sync.releaseShared(1);
}
//...
}
//...
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//...
}
讀鎖的tryReadLock()方法只以非公平的方式獲取讀寫鎖,不管其本身實現是公平鎖還是非公平鎖,這里會直接調用Sync實現的tryReadLock()方法,在這個方法中會先判斷當前是否有線程占有寫鎖,如果有線程占有寫鎖,且當前請求讀鎖的線程和占有寫鎖的線程不是同一個,這里會返回獲取讀寫鎖失敗。否則會判斷當前讀鎖被獲取次數是否已達上限MAX_COUNT,達到上限則報錯。如果讀鎖被獲取次數未達上限,才可以用CAS的方式對讀鎖的獲取次數+1(SHARED_UNIT),如果CAS失敗,代表當前有其他線程一起獲取讀鎖,狀態c已經被改變,會重新開始新的一輪嘗試獲取讀鎖流程。如果CAS成功則會增加線程對讀鎖的引用次數,之所以有這個HoldCounter對象,或者用firstReaderHoldCount字段統計第一個線程引用所的次數,主要是為了確保在線程執行釋放讀鎖的時候,線程一定是之前獲取過讀鎖的線程。如果讀鎖不能保證釋放讀鎖的線程一定是之前獲取過讀鎖的線程,則會出現線程A獲取了讀鎖但尚未釋放,此時線程B未獲取讀鎖但直接釋放讀鎖,讀寫鎖狀態回到0,可由讀線程或者寫線程獲取,線程C獲取寫鎖,那就出現了本章最開始講的,一個線程在訪問資源,另一個線程在修改資源,這是非常危險的。
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
//...
abstract static class Sync extends AbstractQueuedSynchronizer {
//...
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
//...
}
//...
public static class ReadLock implements Lock, java.io.Serializable {
//...
public boolean tryLock() {
return sync.tryReadLock();
}
}
//...
}
最后,我們簡單看下讀鎖的tryLock(long timeout, TimeUnit unit)方法,這個方法會調用到AQS實現的tryAcquireSharedNanos(int arg, long nanosTimeout),相信大家看到這個方法的實現不會陌生,首先會調用子類實現的tryAcquireShared(arg)嘗試獲取讀鎖,如果獲取失敗,則會調用doAcquireSharedNanos(arg, nanosTimeout)嘗試將當前請求讀鎖的線程掛起。這兩個方法先前已經講過,這里就不再贅述。
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
//...
public static class ReadLock implements Lock, java.io.Serializable {
//...
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//...
}
//...
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
//...
}
至此,讀寫鎖的源碼解析到此結束,這里很多AQS的方法之前筆者在ReentrantLock和Semaphore的源碼解析已經講過,請在看此章之前一定一定一定要先去看或者兩個章節的源碼解析。如果有徹底理解這兩個章節,大家就會知道其實不管是可重入互斥鎖、信號量、可重入讀寫鎖本身實現的業務並不多,它們的核心思想就是把線程視為一個個可入隊的節點,只是這些節點有的會獨占互斥鎖或者寫鎖,有的可以和別的節點一起共享一個鎖,通過用不同的角度看待AQS,可以實現適用於不同場景下的並發類。
