詳見:http://blog.yemou.net/article/query/info/tytfjhfascvhzxcyt206
讀寫鎖 ReadWriteLock讀寫鎖維護了一對相關的鎖,一個用於只讀操作,一個用於寫入操作。只要沒有writer,讀取鎖可以由多個reader線程同時保持。寫入鎖是獨占的。
互斥鎖一次只允許一個線程訪問共享數據,哪怕進行的是只讀操作;讀寫鎖允許對共享數據進行更高級別的並發訪問:對於寫操作,一次只有一個線程(write線程)可以修改共享數據,對於讀操作,允許任意數量的線程同時進行讀取。
與互斥鎖相比,使用讀寫鎖能否提升性能則取決於讀寫操作期間讀取數據相對於修改數據的頻率,以及數據的爭用——即在同一時間試圖對該數據執行讀取或寫入操作的線程數。
讀寫鎖適用於讀多寫少的情況。
可重入讀寫鎖 ReentrantReadWriteLock
屬性ReentrantReadWriteLock 也是基於 AbstractQueuedSynchronizer 實現的,它具有下面這些屬性(來自Java doc文檔):
* 獲取順序:此類不會將讀取者優先或寫入者優先強加給鎖訪問的排序。
* 非公平模式(默認):連續競爭的非公平鎖可能無限期地推遲一個或多個reader或writer線程,但吞吐量通常要高於公平鎖。
* 公平模式:線程利用一個近似到達順序的策略來爭奪進入。當釋放當前保持的鎖時,可以為等待時間最長的單個writer線程分配寫入鎖,如果有一組等待時間大於所有正在等待的writer線程的reader,將為該組分配讀者鎖。
* 試圖獲得公平寫入鎖的非重入的線程將會阻塞,除非讀取鎖和寫入鎖都自由(這意味着沒有等待線程)。
* 重入:此鎖允許reader和writer按照 ReentrantLock 的樣式重新獲取讀取鎖或寫入鎖。在寫入線程保持的所有寫入鎖都已經釋放后,才允許重入reader使用讀取鎖。
writer可以獲取讀取鎖,但reader不能獲取寫入鎖。
* 鎖降級:重入還允許從寫入鎖降級為讀取鎖,實現方式是:先獲取寫入鎖,然后獲取讀取鎖,最后釋放寫入鎖。但是,從讀取鎖升級到寫入鎖是不可能的。
* 鎖獲取的中斷:讀取鎖和寫入鎖都支持鎖獲取期間的中斷。
* Condition 支持:寫入鎖提供了一個 Condition 實現,對於寫入鎖來說,該實現的行為與 ReentrantLock.newCondition() 提供的Condition 實現對 ReentrantLock 所做的行為相同。當然,此 Condition 只能用於寫入鎖。
讀取鎖不支持 Condition,readLock().newCondition() 會拋出 UnsupportedOperationException。
* 監測:此類支持一些確定是讀取鎖還是寫入鎖的方法。這些方法設計用於監視系統狀態,而不是同步控制。
實現AQS 回顧在之前的文章已經提到,AQS以單個 int 類型的原子變量來表示其狀態,定義了4個抽象方法( tryAcquire(int)、tryRelease(int)、tryAcquireShared(int)、tryReleaseShared(int),前兩個方法用於獨占/排他模式,后兩個用於共享模式 )留給子類實現,用於自定義同步器的行為以實現特定的功能。
對於 ReentrantLock,它是可重入的獨占鎖,內部的 Sync 類實現了 tryAcquire(int)、tryRelease(int) 方法,並用狀態的值來表示重入次數,加鎖或重入鎖時狀態加 1,釋放鎖時狀態減 1,狀態值等於 0 表示鎖空閑。
對於 CountDownLatch,它是一個關卡,在條件滿足前阻塞所有等待線程,條件滿足后允許所有線程通過。內部類 Sync 把狀態初始化為大於 0 的某個值,當狀態大於 0 時所有wait線程阻塞,每調用一次 countDown 方法就把狀態值減 1,減為 0 時允許所有線程通過。利用了AQS的共享模式。
現在,要用AQS來實現 ReentrantReadWriteLock。
一點思考問題
* AQS只有一個狀態,那么如何表示 多個讀鎖 與 單個寫鎖 呢?
* ReentrantLock 里,狀態值表示重入計數,現在如何在AQS里表示每個讀鎖、寫鎖的重入次數呢?
* 如何實現讀鎖、寫鎖的公平性呢?
一點提示
* 一個狀態是沒法既表示讀鎖,又表示寫鎖的,不夠用啊,那就辦成兩份用了,客家話說一個飯粒咬成兩半吃,狀態的高位部分表示讀鎖,低位表示寫鎖,由於寫鎖只有一個,所以寫鎖的重入計數也解決了,這也會導致寫鎖可重入的次數減小。
* 由於讀鎖可以同時有多個,肯定不能再用辦成兩份用的方法來處理了,但我們有 ThreadLocal,可以把線程重入讀鎖的次數作為值存在 ThreadLocal 里。
* 對於公平性的實現,可以通過AQS的等待隊列和它的抽象方法來控制,在狀態值的另一半里存儲當前持有讀鎖的線程數。如果讀線程申請讀鎖,當前寫鎖重入次數不為 0 時,則等待,否則可以馬上分配;如果是寫線程申請寫鎖,當前狀態為 0 則可以馬上分配,否則等待。
源碼分析現在來看看具體的實現源碼。
辦成兩份AQS 的狀態是32位(int 類型)的,辦成兩份,讀鎖用高16位,表示持有讀鎖的線程數(sharedCount),寫鎖低16位,表示寫鎖的重入次數 (exclusiveCount)。狀態值為 0 表示鎖空閑,sharedCount不為 0 表示分配了讀鎖,exclusiveCount 不為 0 表示分配了寫鎖,sharedCount和exclusiveCount 肯定不會同時不為 0。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
abstract
static
class
Sync
extends
AbstractQueuedSynchronizer {
static
final
int
SHARED_SHIFT =
16
;
// 由於讀鎖用高位部分,所以讀鎖個數加1,其實是狀態值加 2^16
static
final
int
SHARED_UNIT = (
1
<< SHARED_SHIFT);
// 寫鎖的可重入的最大次數、讀鎖允許的最大數量
static
final
int
MAX_COUNT = (
1
<< SHARED_SHIFT) -
1
;
// 寫鎖的掩碼,用於狀態的低16位有效值
static
final
int
EXCLUSIVE_MASK = (
1
<< SHARED_SHIFT) -
1
;
// 讀鎖計數,當前持有讀鎖的線程數
static
int
sharedCount(
int
c) {
return
c >>> SHARED_SHIFT; }
// 寫鎖的計數,也就是它的重入次數
static
int
exclusiveCount(
int
c) {
return
c & EXCLUSIVE_MASK; }
}
|
讀鎖重入計數
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
abstract
static
class
Sync
extends
AbstractQueuedSynchronizer {
/**
* 每個線程特定的 read 持有計數。存放在ThreadLocal,不需要是線程安全的。
*/
static
final
class
HoldCounter {
int
count =
0
;
// 使用id而不是引用是為了避免保留垃圾。注意這是個常量。
final
long
tid = Thread.currentThread().getId();
}
/**
* 采用繼承是為了重寫 initialValue 方法,這樣就不用進行這樣的處理:
* 如果ThreadLocal沒有當前線程的計數,則new一個,再放進ThreadLocal里。
* 可以直接調用 get。
* */
static
final
class
ThreadLocalHoldCounter
extends
ThreadLocal<HoldCounter> {
public
HoldCounter initialValue() {
return
new
HoldCounter();
}
}
/**
* 保存當前線程重入讀鎖的次數的容器。在讀鎖重入次數為 0 時移除。
*/
private
transient
ThreadLocalHoldCounter readHolds;
/**
* 最近一個成功獲取讀鎖的線程的計數。這省卻了ThreadLocal查找,
* 通常情況下,下一個釋放線程是最后一個獲取線程。這不是 volatile 的,
* 因為它僅用於試探的,線程進行緩存也是可以的
* (因為判斷是否是當前線程是通過線程id來比較的)。
*/
private
transient
HoldCounter cachedHoldCounter;
/**
* firstReader是這樣一個特殊線程:它是最后一個把 共享計數 從 0 改為 1 的
* (在鎖空閑的時候),而且從那之后還沒有釋放讀鎖的。如果不存在則為null。
* firstReaderHoldCount 是 firstReader 的重入計數。
*
* firstReader 不能導致保留垃圾,因此在 tryReleaseShared 里設置為null,
* 除非線程異常終止,沒有釋放讀鎖。
*
* 作用是在跟蹤無競爭的讀鎖計數時非常便宜。
*
* firstReader及其計數firstReaderHoldCount是不會放入 readHolds 的。
*/
private
transient
Thread firstReader =
null
;
private
transient
int
firstReaderHoldCount;
Sync() {
readHolds =
new
ThreadLocalHoldCounter();
setState(getState());
// 確保 readHolds 的內存可見性,利用 volatile 寫的內存語義。
}
}
|
寫鎖獲取與釋放寫鎖的獲取與釋放通過 tryAcquire 和 tryRelease 方法實現,源碼文件里有這么一段說明:tryRelease 和 tryAcquire 可能被Conditions 調用。因此可能出現參數里包含在條件等待和用 tryAcquire 重新獲取到鎖的期間內已經釋放的 讀和寫 計數。
這說明看起來像是在 tryAcquire 里設置狀態時要考慮方法參數(acquires)的高位部分,其實是不需要的。由於寫鎖是獨占的,acquires 表示的只能是寫鎖的計數,如果當前線程成功獲取寫鎖,只需要簡單地把當前狀態加上 acquires 的值即可,tryRelease 里直接減去其參數值即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
protected
final
boolean
tryAcquire(
int
acquires) {
Thread current = Thread.currentThread();
int
c = getState();
int
w = exclusiveCount(c);
if
(c !=
0
) {
// 狀態不為0,表示鎖被分配出去了。
// (Note: if c != 0 and w == 0 then shared count != 0)
// c != 0 and w == 0 表示分配了讀鎖
// w != 0 && current != getExclusiveOwnerThread() 表示其他線程獲取了寫鎖。
if
(w ==
0
|| current != getExclusiveOwnerThread())
return
false
;
// 寫鎖重入
// 檢測是否超過最大重入次數。
if
(w + exclusiveCount(acquires) > MAX_COUNT)
throw
new
Error(
"Maximum lock count exceeded"
);
// 更新寫鎖重入次數,寫鎖在低位,直接加上 acquire 即可。
// Reentrant acquire
setState(c + acquires);
return
true
;
}
// writerShouldBlock 留給子類實現,用於實現公平性策略。
// 如果允許獲取寫鎖,則用 CAS 更新狀態。
if
(writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return
false
;
// 不允許獲取鎖 或 CAS 失敗。
// 獲取寫鎖超過,設置獨占線程。
setExclusiveOwnerThread(current);
return
true
;
}
protected
final
boolean
tryRelease(
int
releases) {
if
(!isHeldExclusively())
// 是否是當前線程持有寫鎖
throw
new
IllegalMonitorStateException();
// 這里不考慮高16位是因為高16位肯定是 0。
int
nextc = getState() - releases;
boolean
free = exclusiveCount(nextc) ==
0
;
if
(free)
setExclusiveOwnerThread(
null
);
// 寫鎖完全釋放,設置獨占線程為null。
setState(nextc);
return
free;
}
|
讀鎖獲取與釋放// 參數變為 unused 是因為讀鎖的重入計數是內部維護的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
protected
final
int
tryAcquireShared(
int
unused) {
Thread current = Thread.currentThread();
int
c = getState();
// 這個if語句是說:持有寫鎖的線程可以獲取讀鎖。
if
(exclusiveCount(c) !=
0
&&
// 已分配了寫鎖
getExclusiveOwnerThread() != current)
// 且當前線程不是持有寫鎖的線程
return
-
1
;
int
r = sharedCount(c);
// 取讀鎖計數
if
(!readerShouldBlock() &&
// 由子類根據其公平策略決定是否允許獲取讀鎖
r < MAX_COUNT &&
// 讀鎖數量還沒達到最大值
// 嘗試獲取讀鎖。注意讀線程計數的單位是 2^16
compareAndSetState(c, c + SHARED_UNIT)) {
// 成功獲取讀鎖
// 注意下面對firstReader的處理:firstReader是不會放到readHolds里的
// 這樣,在讀鎖只有一個的情況下,就避免了查找readHolds。
if
(r ==
0
) {
// 是 firstReader,計數不會放入 readHolds。
firstReader = current;
firstReaderHoldCount =
1
;
}
else
if
(firstReader == current) {
// firstReader 重入
firstReaderHoldCount++;
}
else
{
// 非 firstReader 讀鎖重入計數更新
HoldCounter rh = cachedHoldCounter;
// 首先訪問緩存
if
(rh ==
null
|| rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else
if
(rh.count ==
0
)
readHolds.set(rh);
rh.count++;
}
return
1
;
}
// 獲取讀鎖失敗,放到循環里重試。
return
fullTryAcquireShared(current);
}
final
int
fullTryAcquireShared(Thread current) {
HoldCounter rh =
null
;
for
(;;) {
int
c = getState();
if
(exclusiveCount(c) !=
0
) {
if
(getExclusiveOwnerThread() != current)
// 寫鎖被分配,非寫鎖線程獲取讀鎖,失敗
return
-
1
;
// 否則,當前線程持有寫鎖,在這里阻塞將導致死鎖。
}
else
if
(readerShouldBlock()) {
// 寫鎖空閑 且 公平策略決定 線程應當被阻塞
// 下面的處理是說,如果是已獲取讀鎖的線程重入讀鎖時,
// 即使公平策略指示應當阻塞也不會阻塞。
// 否則,這也會導致死鎖的。
if
(firstReader == current) {
// assert firstReaderHoldCount > 0;
}
else
{
if
(rh ==
null
) {
rh = cachedHoldCounter;
if
(rh ==
null
|| rh.tid != current.getId()) {
rh = readHolds.get();
if
(rh.count ==
0
)
readHolds.remove();
}
}
// 需要阻塞且是非重入(還未獲取讀鎖的),獲取失敗。
if
(rh.count ==
0
)
return
-
1
;
}
}
// 寫鎖空閑 且 公平策略決定線程可以獲取讀鎖
if
(sharedCount(c) == MAX_COUNT)
// 讀鎖數量達到最多
throw
new
Error(
"Maximum lock count exceeded"
);
if
(compareAndSetState(c, c + SHARED_UNIT)) {
// 申請讀鎖成功,下面的處理跟tryAcquireShared是類似的。
if
(sharedCount(c) ==
0
) {
firstReader = current;
firstReaderHoldCount =
1
;
}
else
if
(firstReader == current) {
firstReaderHoldCount++;
}
else
{
// 設定最后一次獲取讀鎖的緩存
if
(rh ==
null
)
rh = cachedHoldCounter;
if
(rh ==
null
|| rh.tid != current.getId())
rh = readHolds.get();
else
if
(rh.count ==
0
)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh;
// 緩存起來用於釋放
}
return
1
;
}
}
}
protected
final
boolean
tryReleaseShared(
int
unused) {
Thread current = Thread.currentThread();
// 清理firstReader緩存 或 readHolds里的重入計數
if
(firstReader == current) {
// assert firstReaderHoldCount > 0;
if
(firstReaderHoldCount ==
1
)
firstReader =
null
;
else
firstReaderHoldCount--;
}
else
{
HoldCounter rh = cachedHoldCounter;
if
(rh ==
null
|| rh.tid != current.getId())
rh = readHolds.get();
int
count = rh.count;
if
(count <=
1
) {
// 完全釋放讀鎖
readHolds.remove();
if
(count <=
0
)
throw
unmatchedUnlockException();
}
--rh.count;
// 主要用於重入退出
}
// 循環在CAS更新狀態值,主要是把讀鎖數量減 1
for
(;;) {
int
c = getState();
int
nextc = c - SHARED_UNIT;
if
(compareAndSetState(c, nextc))
// 釋放讀鎖對其他讀線程沒有任何影響,
// 但可以允許等待的寫線程繼續,如果讀鎖、寫鎖都空閑。
return
nextc ==
0
;
}
}
公平性策略公平與非公平策略是由 Sync 的子類 FairSync 和 NonfairSync 實現的。
/**
* 這個非公平策略的同步器是寫鎖優先的,申請寫鎖時總是不阻塞。
*/
static
final
class
NonfairSync
extends
Sync {
private
static
final
long
serialVersionUID = -8159625535654395037L;
final
boolean
writerShouldBlock() {
return
false
;
// 寫線程總是可以突入
}
final
boolean
readerShouldBlock() {
/* 作為一個啟發用於避免寫線程飢餓,如果線程臨時出現在等待隊列的頭部則阻塞,
* 如果存在這樣的,則是寫線程。
*/
return apparentlyFirstQueuedIsExclusive();
}
}
/**
* 公平的 Sync,它的策略是:如果線程准備獲取鎖時,
* 同步隊列里有等待線程,則阻塞獲取鎖,不管是否是重入
* 這也就需要tryAcqire、tryAcquireShared方法進行處理。
*/
static
final
class
FairSync
extends
Sync {
private
static
final
long
serialVersionUID = -2274990926593161451L;
final
boolean
writerShouldBlock() {
return
hasQueuedPredecessors();
}
final
boolean
readerShouldBlock() {
return
hasQueuedPredecessors();
}
}
|
現在用奇數表示申請讀鎖的讀線程,偶數表示申請寫鎖的寫線程,每個數都表示一個不同的線程,存在下面這樣的申請隊列,假設開始時鎖空閑:
1 3 5 0 7 9 2 4
讀線程1申請讀鎖時,鎖是空閑的,馬上分配,讀線程3、5申請時,由於已分配讀鎖,它們也可以馬上獲取讀鎖。
假設此時有線程11申請讀鎖,由於它不是讀鎖重入,只能等待。而線程1再次申請讀鎖是可以的,因為它的重入。
寫線程0申請寫鎖時,由於分配了讀鎖,只能等待,當讀線程1、3、5都釋放讀鎖后,線程0可以獲取寫鎖。
線程0釋放后,線程7、9獲取讀鎖,它們釋放后,線程2獲取寫鎖,此時線程4必須等待線程2釋放。
線程4在線程2釋放寫鎖后獲取寫鎖,它釋放寫鎖后,鎖恢復空閑。