CAS算法(compare and swap)
CAS算法是一種有名的無鎖算法。無鎖編程,即不使用鎖的情況下實現多線程之間的變量同步,也就是在沒有線程被阻塞的情況下實現變量的同步,所以也叫非阻塞同步(Non-blocking Synchronization)。CAS算法涉及到三個操作數
- 需要讀寫的內存值V
- 進行比較的值A
- 擬寫入的新值B
當且僅當 V 的值等於 A時,CAS通過原子方式用新值B來更新V的值,否則不會執行任何操作(比較和替換是一個原子操作)。一般情況下是一個自旋操作,即不斷的重試。
自旋鎖
自旋鎖是指當一個線程在獲取鎖的時候,如果鎖已經被其他線程獲取,那么該線程將循環等待,然后不斷地判斷是否能夠被成功獲取,知直到獲取到鎖才會退出循環。
獲取鎖的線程一直處於活躍狀態,但是並沒有執行任何有效的任務,使用這種鎖會造成 busy-waiting 。
它是為實現保護共享資源而提出的一種鎖機制。其實,自旋鎖與互斥鎖比較類似,它們都是為了解決某項資源的互斥使用。無論是互斥鎖,還是自旋鎖,在任何時刻,最多只能由一個保持者,也就說,在任何時刻最多只能有一個執行單元獲得鎖。但是兩者在調度機制上略有不同。對於互斥鎖,如果資源已經被占用,資源申請者只能進入睡眠狀態。但是自旋鎖不會引起調用者睡眠,如果自旋鎖已經被別的執行單元保持,調用者就一直循環在那里看是否該自旋鎖的保持者已經釋放了鎖,“自旋”一詞就是因此而得名。
golang實現自旋鎖
1
2
3
4
5
6
7
8
9
10
11
12
13
|
type spinLock uint32
func (sl *spinLock) Lock() {
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
runtime.Gosched()
}
}
func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}
func NewSpinLock() sync.Locker {
var lock spinLock
return &lock
}
|
可重入的自旋鎖和不可重入的自旋鎖
文章開始的時候的那段代碼,仔細分析一下就可以看出,它是不支持重入的,即當一個線程第一次已經獲取到了該鎖,在鎖釋放之前又一次重新獲取該鎖,第二次就不能成功獲取到。由於不滿足CAS,所以第二次獲取會進入while循環等待,而如果是可重入鎖,第二次也是應該能夠成功獲取到的。
而且,即使第二次能夠成功獲取,那么當第一次釋放鎖的時候,第二次獲取到的鎖也會被釋放,而這是不合理的。
為了實現可重入鎖,我們需要引入一個計數器,用來記錄獲取鎖的線程數
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
type
spinLock struct {
owner
int
count
int
}
func (sl
*
spinLock) Lock() {
me :
=
GetGoroutineId()
if
spinLock .owner
=
=
me {
/
/
如果當前線程已經獲取到了鎖,線程數增加一,然后返回
sl.count
+
+
return
}
/
/
如果沒獲取到鎖,則通過CAS自旋
for
!atomic.CompareAndSwapUint32((
*
uint32)(sl),
0
,
1
) {
runtime.Gosched()
}
}
func (sl
*
spinLock) Unlock() {
if
rl.owner !
=
GetGoroutineId() {
panic(
"illegalMonitorStateError"
)
}
if
sl.count >
0
{
/
/
如果大於
0
,表示當前線程多次獲取了該鎖,釋放鎖通過count減一來模擬
sl.count
-
-
}
else
{
/
/
如果count
=
=
0
,可以將鎖釋放,這樣就能保證獲取鎖的次數與釋放鎖的次數是一致的了。
atomic.StoreUint32((
*
uint32)(sl),
0
)
}
}
func GetGoroutineId()
int
{
defer func() {
if
err :
=
recover(); err !
=
nil {
fmt.Println(
"panic recover:panic info:%v"
, err) }
}()
var buf [
64
]byte
n :
=
runtime.Stack(buf[:], false)
idField :
=
strings.Fields(strings.TrimPrefix(string(buf[:n]),
"goroutine "
))[
0
]
id
, err :
=
strconv.Atoi(idField)
if
err !
=
nil {
panic(fmt.Sprintf(
"cannot get goroutine id: %v"
, err))
}
return
id
}
func NewSpinLock() sync.Locker {
var lock spinLock
return
&lock
}
|
自旋鎖的其他變種
1. TicketLock
TicketLock主要解決的是公平性的問題。
思路:每當有線程獲取鎖的時候,就給該線程分配一個遞增的id,我們稱之為排隊號,同時,鎖對應一個服務號,每當有線程釋放鎖,服務號就會遞增,此時如果服務號與某個線程排隊號一致,那么該線程就獲得鎖,由於排隊號是遞增的,所以就保證了最先請求獲取鎖的線程可以最先獲取到鎖,就實現了公平性。
可以想象成銀行辦理業務排隊,排隊的每一個顧客都代表一個需要請求鎖的線程,而銀行服務窗口表示鎖,每當有窗口服務完成就把自己的服務號加一,此時在排隊的所有顧客中,只有自己的排隊號與服務號一致的才可以得到服務。
2. CLHLock
CLH鎖是一種基於鏈表的可擴展、高性能、公平的自旋鎖,申請線程只在本地變量上自旋,它不斷輪詢前驅的狀態,如果發現前驅釋放了鎖就結束自旋,獲得鎖。
3. MCSLock
MCSLock則是對本地變量的節點進行循環。
4. CLHLock 和 MCSLock
都是基於鏈表,不同的是CLHLock是基於隱式鏈表,沒有真正的后續節點屬性,MCSLock是顯示鏈表,有一個指向后續節點的屬性。
將獲取鎖的線程狀態借助節點(node)保存,每個線程都有一份獨立的節點,這樣就解決了TicketLock多處理器緩存同步的問題。
自旋鎖與互斥鎖
- 自旋鎖與互斥鎖都是為了實現保護資源共享的機制。
- 無論是自旋鎖還是互斥鎖,在任意時刻,都最多只能有一個保持者。
- 獲取互斥鎖的線程,如果鎖已經被占用,則該線程將進入睡眠狀態;獲取自旋鎖的線程則不會睡眠,而是一直循環等待鎖釋放。
總結:
- 自旋鎖:線程獲取鎖的時候,如果鎖被其他線程持有,則當前線程將循環等待,直到獲取到鎖。
- 自旋鎖等待期間,線程的狀態不會改變,線程一直是用戶態並且是活動的(active)。
- 自旋鎖如果持有鎖的時間太長,則會導致其它等待獲取鎖的線程耗盡CPU。
- 自旋鎖本身無法保證公平性,同時也無法保證可重入性。
- 基於自旋鎖,可以實現具備公平性和可重入性質的鎖。
- TicketLock:采用類似銀行排號叫好的方式實現自旋鎖的公平性,但是由於不停的讀取serviceNum,每次讀寫操作都必須在多個處理器緩存之間進行緩存同步,這會導致繁重的系統總線和內存的流量,大大降低系統整體的性能。
- CLHLock和MCSLock通過鏈表的方式避免了減少了處理器緩存同步,極大的提高了性能,區別在於CLHLock是通過輪詢其前驅節點的狀態,而MCS則是查看當前節點的鎖狀態。
- CLHLock在NUMA架構下使用會存在問題。在沒有cache的NUMA系統架構中,由於CLHLock是在當前節點的前一個節點上自旋,NUMA架構中處理器訪問本地內存的速度高於通過網絡訪問其他節點的內存,所以CLHLock在NUMA架構上不是最優的自旋鎖。