前言
今天看Jraft的時候發現了很多地方都用到了讀寫鎖,所以心血來潮想要分析以下讀寫鎖是怎么實現的。
先上一個doc里面的例子:
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
//加上一個讀鎖
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
//必須在加寫鎖之前釋放讀鎖
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
//雙重檢查
if (!cacheValid) {
//設置值
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
//鎖降級,反之則不行
rwl.readLock().lock();
} finally {
//釋放寫鎖,但是仍然持有寫鎖
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
//釋放讀鎖
rwl.readLock().unlock();
}
}
}}
我們一般實例化一個ReentrantReadWriteLock,一般是調用空的構造器創建,所以默認使用的是非公平鎖
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
//默認使用的是NonfairSync
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
//分別調用writeLock和readLock會返回讀寫鎖實例
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
ReentrantReadWriteLock內部類Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
//位移量
//在讀寫鎖中,state是一個32位的int,所以用state的高16位表示讀鎖,用低16位表示寫鎖
static final int SHARED_SHIFT = 16;
//因為讀鎖是高16位,所以用1向左移動16位表示讀鎖每次鎖狀態變化的量
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//最大的可重入次數
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//用來計算低16位的寫鎖狀態
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//獲取高16位讀鎖state次數,重入次數
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
//獲取低16位寫鎖state次數,重入次數
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
//用來記錄每個線程持有的讀鎖數量
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
private transient ThreadLocalHoldCounter readHolds;
// 用於緩存,記錄"最后一個獲取讀鎖的線程"的讀鎖重入次數
private transient HoldCounter cachedHoldCounter;
// 第一個獲取讀鎖的線程(並且其未釋放讀鎖),以及它持有的讀鎖數量
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
Sync() {
// 初始化 readHolds 這個 ThreadLocal 屬性
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
....
}
- 因為int是32位的,所以在ReentrantReadWriteLock中將state分為兩部分,高16位作為讀鎖的狀態控制器,低16位作為寫鎖的狀態控制器。
- 每次要獲取讀鎖的當前狀態都需要調用sharedCount傳入當前的state,將state向右移動16位來獲取
- 要獲取低16位則需要將1左移16位減一,獲得一個低16位全是1的數,然后和傳入的state進行取與操作獲取state的低16位的值
- cachedHoldCounter里面保存了最新的讀鎖的線程和調用次數
- firstReader 和 firstReaderHoldCount 將”第一個”獲取讀鎖的線程記錄在 firstReader 屬性中,這里的第一個不是全局的概念,等這個 firstReader 當前代表的線程釋放掉讀鎖以后,會有后來的線程占用這個屬性的。
讀鎖獲取
//readLock#lock
public void lock() {
//這里會調用父類AQS的acquireShared,嘗試獲取鎖
sync.acquireShared(1);
}
//AQS#acquireShared
public final void acquireShared(int arg) {
//返回值小於 0 代表沒有獲取到共享鎖
if (tryAcquireShared(arg) < 0)
//進入到阻塞隊列,然后等待前驅節點喚醒
doAcquireShared(arg);
}
這里的tryAcquireShared是調用ReentrantReadWriteLock的內部類Sync的tryAcquireShared的方法
protected final int tryAcquireShared(int unused) {
//獲取當前線程
Thread current = Thread.currentThread();
//獲取AQS中的state屬性值
int c = getState();
//exclusiveCount方法是用來獲取寫鎖狀態,不等於0代表有寫鎖
if (exclusiveCount(c) != 0 &&
//如果不是當前線程獲取的寫鎖,那么直接返回-1
getExclusiveOwnerThread() != current)
return -1;
//獲取讀鎖的鎖定次數
int r = sharedCount(c);
// 讀鎖獲取是否需要被阻塞
if (!readerShouldBlock() &&
r < MAX_COUNT &&
//因為高16位代表共享鎖,所以CAS需要加上一個SHARED_UNIT
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
//記錄一下首次讀線程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//firstReader 重入獲取讀鎖
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
// 如果 cachedHoldCounter 緩存的不是當前線程,設置為緩存當前線程的 HoldCounter
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
// return 大於 0 的數,代表獲取到了共享鎖
return 1;
}
return fullTryAcquireShared(current);
}
- 首先會去調用exclusiveCount方法來查看寫鎖是否被占用,如果被占用,那么查看當前線程是否是占用讀鎖的線程,如果不是則返回-1。通過這里可以看出可以先占用讀鎖再占用寫鎖
- 調用readerShouldBlock方法獲取是否需要阻塞讀鎖獲取,然后檢查一下高16位讀鎖重入次數是否超過了2^16-1,最后通過CAS操作將state高16進行加1操作,如果沒有其他線程搶占就會成功
- 如果state的高16位為零,那么就設置首次讀線程和首次數次數,如果不是則校驗首次讀線程是不是當前線程,是的話將firstReaderHoldCount次數加1。如果不是首次讀線程,那么校驗一下最后一次讀線程是不是當前線程,不是的話就從readHolds中獲取,並將HoldCounter計數加1,如果最后讀線程是當前線程那么計數加1
readerShouldBlock
//NonfairSync#readerShouldBlock
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
//AQS
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
在非公平模式中readerShouldBlock會調用AQS的方法,判斷當前頭節點的下一個節點,如果不是共享節點,那么readerShouldBlock就返回true,讀鎖就會阻塞。
//FairSync#readerShouldBlock
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
//AQS
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
在公平模式中會去看看隊列里有沒有其他元素在隊列里等待獲取鎖,如果有那么讀鎖就進行阻塞
ReentrantReadWriteLock#fullTryAcquireShared
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
//檢查是否寫鎖被占用
if (exclusiveCount(c) != 0) {
//被占用,但是占用讀鎖線程不是當前線程,返回阻塞
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
//檢查讀鎖是否應該被阻塞
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
//首次讀線程是當前線程,下面直接CAS
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
//設置最后一次讀線程
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
//如果發現 count == 0,也就是說,純屬上一行代碼初始化的,那么執行 remove
readHolds.remove();
}
}
//如果最后讀取線程次數為0,那么阻塞
if (rh.count == 0)
return -1;
}
}
//如果讀鎖重入次數達到上限,拋異常
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//嘗試CAS讀鎖重入次數加1
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 這里 CAS 成功,那么就意味着成功獲取讀鎖了
// 下面需要做的是設置 firstReader 或 cachedHoldCounter
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 下面這幾行,就是將 cachedHoldCounter 設置為當前線程
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
// 返回大於 0 的數,代表獲取到了讀鎖
return 1;
}
}
}
這個方法主要是用來處理重入鎖操作的。首先校驗一下寫鎖是否被占用,如果沒有被占用則判斷當前線程是否是第一次讀線程,如果不是則判斷最后一次讀線程是不是當前線程,如果不是則從readHolds獲取,並判斷HoldCounter實例中獲取讀鎖次數如果為0,那么就不是重入。
如果可以判斷當前線程是重入的,那么則對state高16進行加1操作,操作成功,則對firstReader或cachedHoldCounter進行設置,並返回1,表示獲取到鎖。
到這里我們看完了tryAcquireShared方法,我再把acquireShared方法貼出來:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
下面看doAcquireShared方法:
private void doAcquireShared(int arg) {
//實例化一個共享節點入隊
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取當前節點的上一個前置節點
final Node p = node.predecessor();
//前置節點如果是頭節點,那么代表隊列里沒有別的節點,先調用tryAcquireShared嘗試獲取鎖
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//醒隊列中其他共享節點
setHeadAndPropagate(node, r);
p.next = null; // help GC
//響應中斷
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//設置前置節點waitStatus狀態
if (shouldParkAfterFailedAcquire(p, node) &&
//阻塞當前線程
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireShared方法中會實例化一個共享節點並入隊。如果當前節點的前置節點是頭節點,那么直接調用tryAcquireShared先獲取一次鎖,如果返回大於0,那么表示可以獲取鎖,調用setHeadAndPropagate喚醒隊列中其他的線程;如果沒有返回則會調用shouldParkAfterFailedAcquire方法將前置節點的waitStatus設值成SIGNAL,然后調用parkAndCheckInterrupt方法阻塞
AQS#setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//把node節點設值為頭節點
setHead(node);
//因為是propagate大於零才進這個方法,所以這個必進這個if
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//獲取node的下一個節點
Node s = node.next;
//判斷下一個節點是否為空,或是共享節點
if (s == null || s.isShared())
//往下看
doReleaseShared();
}
}
這個方法主要是替換頭節點為當前節點,然后調用doReleaseShared進行喚醒節點的操作
AQS#doReleaseShared
private void doReleaseShared() {
for (;;) {
Node h = head;
// 1. h == null: 說明阻塞隊列為空
// 2. h == tail: 說明頭結點可能是剛剛初始化的頭節點,
// 或者是普通線程節點,但是此節點既然是頭節點了,那么代表已經被喚醒了,阻塞隊列沒有其他節點了
// 所以這兩種情況不需要進行喚醒后繼節點
if (h != null && h != tail) {
int ws = h.waitStatus;
//后面的節點會把前置節點設置為Node.SIGNAL
if (ws == Node.SIGNAL) {
//1
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 喚醒 head 的后繼節點,也就是阻塞隊列中的第一個節點
unparkSuccessor(h);
}
else if (ws == 0 &&
//2
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//3 如果被喚醒的節點已經占領head了,那么繼續循環,否則跳出循環
if (h == head) // loop if head changed
break;
}
}
- unparkSuccessor這里會喚醒下一個節點,那么下一個節點也會調用setHeadAndPropagate進行搶占頭節點;如果同時有當前線程和被喚醒的下一個線程同時走到這里,那么只會有一個成功,另一個返回false的就不進行喚醒操作
- 這里CAS失敗的原因可能是一個新的節點入隊,然后將前置節點設值為了Node.SIGNAL,所以導致當前的CAS失敗
- 如果被喚醒的節點搶占頭節點成功,那么h == head 就不成立,那么會進行下一輪的循環,否則就是head沒有被搶占成功
AQS#unparkSuccessor
private void unparkSuccessor(Node node) {
//如果當前節點小於零,那么作為頭節點要被清除一下狀態
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 下面的代碼就是喚醒后繼節點,但是有可能后繼節點取消了等待
// 從隊尾往前找,找到waitStatus<=0的所有節點中排在最前面的
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒線程
LockSupport.unpark(s.thread);
}
到這里加讀鎖的代碼就講解完畢了
讀鎖釋放
//ReadLock
public void unlock() {
sync.releaseShared(1);
}
// Sync
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
我們先看tryReleaseShared
Sync#tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//如果當前是firstReader,那么需要進行重置或重入減一
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 判斷 cachedHoldCounter 是否緩存的是當前線程,不是的話要到 ThreadLocal 中取
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
// 這一步將 ThreadLocal remove 掉,防止內存泄漏。因為已經不再持有讀鎖了
readHolds.remove();
//unlock了幾次的話會拋異常
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
// nextc 是 state 高 16 位減 1 后的值
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 如果 nextc == 0,那就是 state 全部 32 位都為 0,也就是讀鎖和寫鎖都空了
// 此時這里返回 true 的話,其實是幫助喚醒后繼節點中的獲取寫鎖的線程
return nextc == 0;
}
}
這個讀鎖的釋放,主要就是將 hold count 減 1,如果減到 0 的話,還要將 ThreadLocal 中的 remove 掉。然后是在 for 循環中將 state 的高 16 位減 1,如果發現讀鎖和寫鎖都釋放光了,那么喚醒后繼的獲取寫鎖的線程,因為只有讀鎖是不會被阻塞的,所以等待的線程只可能是寫鎖的線程。
寫鎖的獲取
//WriteLock
public void lock() {
sync.acquire(1);
}
//sync
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//AQS
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
//獲取state的低16位
int w = exclusiveCount(c);
//不為零說明讀鎖或寫鎖被持有了
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 看下這里返回 false 的情況:
// c != 0 && w == 0: 寫鎖可用,但是有線程持有讀鎖(也可能是自己持有)
// c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其他線程持有寫鎖
// 也就是說,只要有讀鎖或寫鎖被占用,這次就不能獲取到寫鎖
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 這里不需要 CAS,仔細看就知道了,能到這里的,只可能是寫鎖重入,不然在上面的 if 就攔截了
setState(c + acquires);
return true;
}
//檢查寫鎖是否需要block
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//走到這里說明寫鎖不需要block,並且CAS成功了
setExclusiveOwnerThread(current);
return true;
}
我們來看看writerShouldBlock
//NonfairSync
final boolean writerShouldBlock() {
return false; // writers can always barge
}
//FairSync
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
如果是非公平模式,那么 lock 的時候就可以直接用 CAS 去搶鎖,搶不到再排隊
如果是公平模式,那么如果阻塞隊列有線程等待的話,就乖乖去排隊
寫鎖釋放
public void unlock() {
sync.release(1);
}
//sync
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//如果獨占鎖釋放"完全",喚醒后繼節點
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//Sync
protected final boolean tryRelease(int releases) {
//檢查一下持有所的線程是不是當前線程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//將state減1
int nextc = getState() - releases;
//查看低16位是否為0
boolean free = exclusiveCount(nextc) == 0;
if (free)
//如果為0,那么說明寫鎖釋放
setExclusiveOwnerThread(null);
//設置狀態
setState(nextc);
return free;
}
