sync.Mutex
Go中使用sync.Mutex類型實現mutex(排他鎖、互斥鎖)。在源代碼的sync/mutex.go文件中,有如下定義:
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}
這沒有任何非凡的地方。和mutex相關的所有事情都是通過sync.Mutex類型的兩個方法sync.Lock()和sync.Unlock()函數來完成的,前者用於獲取sync.Mutex鎖,后者用於釋放sync.Mutex鎖。sync.Mutex一旦被鎖住,其它的Lock()操作就無法再獲取它的鎖,只有通過Unlock()釋放鎖之后才能通過Lock()繼續獲取鎖。
也就是說,已有的鎖會導致其它申請Lock()操作的goroutine被阻塞,且只有在Unlock()的時候才會解除阻塞。
另外需要注意,sync.Mutex不區分讀寫鎖,只有Lock()與Lock()之間才會導致阻塞的情況,如果在一個地方Lock(),在另一個地方不Lock()而是直接修改或訪問共享數據,這對於sync.Mutex類型來說是允許的,因為mutex不會和goroutine進行關聯。如果想要區分讀、寫鎖,可以使用sync.RWMutex類型,見后文。
在Lock()和Unlock()之間的代碼段稱為資源的臨界區(critical section),在這一區間內的代碼是嚴格被Lock()保護的,是線程安全的,任何一個時間點都只能有一個goroutine執行這段區間的代碼。
以下是使用sync.Mutex的一個示例,稍后是非常詳細的分析過程。
package main
import (
"fmt"
"sync"
"time"
)
// 共享變量
var (
m sync.Mutex
v1 int
)
// 修改共享變量
// 在Lock()和Unlock()之間的代碼部分是臨界區
func change(i int) {
m.Lock()
time.Sleep(time.Second)
v1 = v1 + 1
if v1%10 == 0 {
v1 = v1 - 10*i
}
m.Unlock()
}
// 訪問共享變量
// 在Lock()和Unlock()之間的代碼部分是是臨界區
func read() int {
m.Lock()
a := v1
m.Unlock()
return a
}
func main() {
var numGR = 21
var wg sync.WaitGroup
fmt.Printf("%d", read())
// 循環創建numGR個goroutine
// 每個goroutine都執行change()、read()
// 每個change()和read()都會持有鎖
for i := 0; i < numGR; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
change(i)
fmt.Printf(" -> %d", read())
}(i)
}
wg.Wait()
}
第一次執行結果:
0 -> 1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 -> 8 -> 9 -> -100 -> -99
-> -98 -> -97 -> -96 -> -95 -> -94 -> -93 -> -92 -> -91 -> -260 -> -259
第二次執行結果:注意其中的-74和-72之間跨了一個數
0 -> 1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 -> 8 -> 9 -> -80 -> -79
-> -78 -> -77 -> -76 -> -75 -> -74 -> -72 -> -71 -> -230 -> -229 -> -229
上面的示例中,change()、read()都會申請鎖,並在准備執行完函數時釋放鎖,它們如何修改數據、訪問數據本文不多做解釋。需要詳細解釋的是main()中的for循環部分。
在for循環中,會不斷激活新的goroutine(共21個)執行匿名函數,在每個匿名函數中都會執行change()和read(),意味着每個goroutine都會申請兩次鎖、釋放兩次鎖,且for循環中沒有任何Sleep延遲,這21個goroutine幾乎是一瞬間同時激活的。
但由於change()和read()中都申請鎖,對於這21個goroutine將要分別執行的42個critical section,Lock()保證了在某一時間點只有其中一個goroutine能訪問其中一個critical section。當釋放了一個critical section,其它的Lock()將爭奪互斥鎖,也就是所謂的競爭現象(race condition)。因為競爭的存在,這42個critical section被訪問的順序是隨機的,完全無法保證哪個critical section先被訪問。
對於前9個被調度到的goroutine,無論是哪個goroutine取得這9個change(i)中的critical section,都只是對共享變量v1做加1運算,但當第10個goroutine被調度時,由於v1加1之后得到10,它滿足if條件,會執行v1 = v1 - i*10
,但這個i可能是任意0到numGR之間的值(因為無法保證並發的goroutine的調度順序),這使得v1的值從第10個goroutine開始出現隨機性。但從第10到第19個goroutine被調度的過程中,也只是對共享變量v1做加1運算,這些值是可以根據第10個數推斷出來的,到第20個goroutine,又再次隨機。依此類推。
此外,每個goroutine中的read()也都會參與鎖競爭,所以並不能保證每次change(i)之后會隨之執行到read(),可能goroutine 1的change()執行完后,會跳轉到goroutine 3的change()上,這樣一來,goroutine 1的read()就無法讀取到goroutine 1所修改的v1值,而是訪問到其它goroutine中修改后的值。所以,前面的第二次執行結果中出現了一次數據跨越。只不過執行完change()后立即執行read()的幾率比較大,所以多數時候輸出的數據都是連續的。
總而言之,Mutex保證了每個critical section安全,某一時間點只有一個goroutine訪問到這部分,但也因此而出現了隨機性。
如果Lock()后忘記了Unlock(),將會永久阻塞而出現死鎖。如果
適合sync.Mutex的數據類型
其實,對於內置類型的共享變量來說,使用sync.Mutex和Lock()、Unlock()來保護也是不合理的,因為它們自身不包含Mutex屬性。真正合理的共享變量是那些包含Mutex屬性的struct類型。例如:
type mytype struct {
m sync.Mutex
var int
}
x := new(mytype)
這時只要想保護var變量,就先x.m.Lock(),操作完var后,再x.m.Unlock()。這樣就能保證x中的var字段變量一定是被保護的。
sync.RWMutex
Go中使用sync.RWMutex類型實現讀寫互斥鎖rwmutex。在源代碼的sync/rwmutex.go文件中,有如下定義:
// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
//
// A RWMutex must not be copied after first use.
//
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // 寫鎖需要等待讀鎖釋放的信號量
readerSem uint32 // 讀鎖需要等待寫鎖釋放的信號量
readerCount int32 // 讀鎖后面掛起了多少個寫鎖申請
readerWait int32 // 已釋放了多少個讀鎖
}
上面的注釋和源代碼說明了幾點:
- RWMutex是基於Mutex的,在Mutex的基礎之上增加了讀、寫的信號量,並使用了類似引用計數的讀鎖數量
- 讀鎖與讀鎖兼容,讀鎖與寫鎖互斥,寫鎖與寫鎖互斥,只有在鎖釋放后才可以繼續申請互斥的鎖:
- 可以同時申請多個讀鎖
- 有讀鎖時申請寫鎖將阻塞,有寫鎖時申請讀鎖將阻塞
- 只要有寫鎖,后續申請讀鎖和寫鎖都將阻塞
此類型有幾個鎖和解鎖的方法:
func (rw *RWMutex) Lock()
func (rw *RWMutex) RLock()
func (rw *RWMutex) RLocker() Locker
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) Unlock()
其中:
- Lock()和Unlock()用於申請和釋放寫鎖
- RLock()和RUnlock()用於申請和釋放讀鎖
- 一次RUnlock()操作只是對讀鎖數量減1,即減少一次讀鎖的引用計數
- 如果不存在寫鎖,則Unlock()引發panic,如果不存在讀鎖,則RUnlock()引發panic
- RLocker()用於返回一個實現了Lock()和Unlock()方法的Locker接口
此外,無論是Mutex還是RWMutex都不會和goroutine進行關聯,這意味着它們的鎖申請行為可以在一個goroutine中操作,釋放鎖行為可以在另一個goroutine中操作。
由於RLock()和Lock()都能保證數據不被其它goroutine修改,所以在RLock()與RUnlock()之間的,以及Lock()與Unlock()之間的代碼區都是critical section。
以下是一個示例,此示例中同時使用了Mutex和RWMutex,RWMutex用於讀、寫,Mutex只用於讀。
package main
import (
"fmt"
"os"
"sync"
"time"
)
var Password = secret{password: "myPassword"}
type secret struct {
RWM sync.RWMutex
M sync.Mutex
password string
}
// 通過rwmutex寫
func Change(c *secret, pass string) {
c.RWM.Lock()
fmt.Println("Change with rwmutex lock")
time.Sleep(3 * time.Second)
c.password = pass
c.RWM.Unlock()
}
// 通過rwmutex讀
func rwMutexShow(c *secret) string {
c.RWM.RLock()
fmt.Println("show with rwmutex",time.Now().Second())
time.Sleep(1 * time.Second)
defer c.RWM.RUnlock()
return c.password
}
// 通過mutex讀,和rwMutexShow的唯一區別在於鎖的方式不同
func mutexShow(c *secret) string {
c.M.Lock()
fmt.Println("show with mutex:",time.Now().Second())
time.Sleep(1 * time.Second)
defer c.M.Unlock()
return c.password
}
func main() {
// 定義一個稍后用於覆蓋(重寫)的函數
var show = func(c *secret) string { return "" }
// 通過變量賦值的方式,選擇並重寫showFunc函數
if len(os.Args) != 2 {
fmt.Println("Using sync.RWMutex!",time.Now().Second())
show = rwMutexShow
} else {
fmt.Println("Using sync.Mutex!",time.Now().Second())
show = mutexShow
}
var wg sync.WaitGroup
// 激活5個goroutine,每個goroutine都查看
// 根據選擇的函數不同,showFunc()加鎖的方式不同
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Go Pass:", show(&Password),time.Now().Second())
}()
}
// 激活一個申請寫鎖的goroutine
go func() {
wg.Add(1)
defer wg.Done()
Change(&Password, "123456")
}()
// 阻塞,直到所有wg.Done
wg.Wait()
}
Change()函數申請寫鎖,並睡眠3秒后修改數據,然后釋放寫鎖。
rwMutexShow()函數申請讀鎖,並睡眠一秒后取得數據,並釋放讀鎖。注意,rwMutexShow()中的print和return是相隔一秒鍾的。
mutexShow()函數申請Mutex鎖,和RWMutex互不相干。和rwMutexShow()唯一不同之處在於申請的鎖不同。
main()中,先根據命令行參數數量決定運行哪一個show()。之所以能根據函數變量來賦值,是因為先定義了一個show()函數,它的函數簽名和rwMutexShow()、mutexShow()的簽名相同,所以可以相互賦值。
for循環中激活了5個goroutine並發運行,for瞬間激活5個goroutine后,繼續執行main()代碼會激活另一個用於申請寫鎖的goroutine。這6個goroutine的執行順序是隨機的。
如果show選中的函數是rwMutexShow(),則5個goroutine要申請的RLock()鎖和寫鎖是沖突的,但5個RLock()是兼容的。所以,只要某個時間點調度到了寫鎖的goroutine,剩下的讀鎖goroutine都會從那時開始阻塞3秒。
除此之外,還有一個不嚴格准確,但在時間持續長短的理論上來說能保證的一個規律:當修改數據結束后,各個剩下的goroutine都申請讀鎖,因為申請后立即print輸出,然后睡眠1秒,但1秒時間足夠所有剩下的goroutine申請完讀鎖,使得show with rwmutex
輸出是連在一起,輸出的Go Pass: 123456
又是連在一起的。
某次結果如下:
Using sync.RWMutex! 58
show with rwmutex 58
Change with rwmutex lock
Go Pass: myPassword 59
show with rwmutex 2
show with rwmutex 2
show with rwmutex 2
show with rwmutex 2
Go Pass: 123456 3
Go Pass: 123456 3
Go Pass: 123456 3
Go Pass: 123456 3
如果show選中的函數是mutexShow(),則讀數據和寫數據互不沖突,但讀和讀是沖突的(因為Mutex的Lock()是互斥的)。
某次結果如下:
Using sync.Mutex! 30
Change with rwmutex lock
show with mutex: 30
Go Pass: myPassword 31
show with mutex: 31
Go Pass: myPassword 32
show with mutex: 32
Go Pass: 123456 33
show with mutex: 33
show with mutex: 34
Go Pass: 123456 34
Go Pass: 123456 35
用Mutex還是用RWMutex
Mutex和RWMutex都不關聯goroutine,但RWMutex顯然更適用於讀多寫少的場景。僅針對讀的性能來說,RWMutex要高於Mutex,因為rwmutex的多個讀可以並存。