前言
前面我們為了解決go程同步的問題我們使用了channel, 但是go也提供了傳統的同步工具.
它們都在go的標准庫代碼包 sync 和 sync/atomic 中.
下面我們來看一下鎖的應用.
什么是鎖呢? 就是某個協程(線程)在訪問某個資源時先鎖住, 防止其他協程的訪問, 等訪問完畢解鎖后其他協程再來加鎖進行訪問.
這和我們生活中加鎖使用公共資源相似, 例如: 公共衛生間.
死鎖
死鎖是指兩個或者兩個以上的進程在執行過程中, 由於競爭資源或者由於彼此通信而造成的一種阻塞的現象, 若無外力作用, 它們都將無法推進下去. 此時稱系統處於死鎖狀態或系統產生了死鎖.
死鎖不是鎖的一種! 它是一種錯誤使用鎖導致的現象.
產生死鎖的幾種情況
- 單go程自己死鎖
- go程間channel訪問順序導致死鎖
- 多go程, 多channel交叉死鎖
- 將 互斥鎖、讀寫鎖與channel混用 -- 隱性死鎖(在
讀寫鎖講到)
單go程自己死鎖 示例代碼:
package main
import "fmt"
// 單go程自己死鎖
func main() {
ch := make(chan int)
ch <- 789
num := <- ch
fmt.Println(num)
}
上面這段乍一看有可能會覺得沒有什么問題, 可是仔細一看就會發現這個 ch 是一個無緩沖的channel, 當789寫入緩沖區時, 這時讀端還沒有准備好. 所以, 寫端 會發生阻塞, 后面的代碼不再運行.
所以可以得出一個結論: channel應該在至少2個及以上的go程進行通信, 否則會造成死鎖.
我們繼續看 go程間channel訪問順序導致死鎖 的例子:
package main
import "fmt"
// go程間channel訪問順序導致死鎖
func main(){
ch := make(chan int)
num := <- ch
fmt.Println("num = ", num)
go func() {
ch <- 789
}()
}
在代碼運行到 num := <- ch 時, 發生阻塞, 並且下面的代碼不會執行, 所以發生死鎖.
正確應該這樣寫:
package main
import "fmt"
func main(){
ch := make(chan int)
go func() {
ch <- 789
}()
num := <- ch
fmt.Println("num = ", num)
}
所以, 在使用channel一端讀(寫)時, 要保證另一端寫(讀)操作有機會執行.
我們再來看下 多go程, 多channel交叉死鎖 的示例代碼:
package main
import "fmt"
// 多go程, 多channel交叉死鎖
func main(){
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
for {
select {
case num := <- ch1:
ch2 <- num
}
}
}()
for {
select {
case num := <- ch2:
ch1 <- num
}
}
}
互斥鎖(互斥量)
每個資源都對應於一個可稱為"互斥鎖"的標記, 這個標記用來保證在任意時刻, 只能有一個協程(線程)訪問該資源, 其它的協程只能等待.
互斥鎖是傳統並發編程對共享資源進行訪問控制的主要手段, 它由標准庫 sync 中的 Mutex 結構體類型表示.
sync.Mutex 類型只有兩個公開的指針方法, Lock 和 Unlock.
Lock鎖定當前的共享資源, Unlock進行解鎖.
在使用互斥鎖時, 一定要注意, 對資源操作完成后, 一定要解鎖, 否則會出現流程執行異常, 死鎖等問題, 通常借助defer. 鎖定后, 立即使用 defer 語句保證互斥鎖及時解鎖. 如下所示:
var mutex sync.Mutex // 定義互斥鎖變量: mutex
func write() {
mutex.Lock()
defer mutex.Unlock()
}
我們先來回顧一下channel是怎么樣完成數據同步的.
package main
import (
"fmt"
"time"
)
var ch = make(chan int)
func printer(str string) {
for _, s := range str {
fmt.Printf("%c ", s)
time.Sleep(time.Millisecond * 300)
}
}
func person1() { // 先
printer("hello")
ch <- 666
}
func person2() { // 后
<-ch
printer("world")
}
func main() {
go person1()
go person2()
time.Sleep(5 * time.Second)
}
同樣可以使用互斥鎖來解決, 如下所示:
package main
import (
"fmt"
"sync"
"time"
)
// 使用傳統的 "鎖" 完成同步 -- 互斥鎖
var mutex sync.Mutex // 創建一個互斥鎖(互斥量), 新建的互斥鎖狀態為0 -> 未加鎖狀態. 鎖只有一把.
func printer(str string) {
mutex.Lock() // 訪問共享數據之前, 加鎖
for _, s := range str {
fmt.Printf("%c ", s)
time.Sleep(time.Millisecond * 300)
}
mutex.Unlock() // 共享數據訪問結束, 解鎖
}
func person1() {
printer("hello")
}
func person2() {
printer("world")
}
func main() {
go person1()
go person2()
time.Sleep(5 * time.Second)
}
這種鎖為建議鎖: 操作系統提供, 建議你在編程時使用.
強制鎖只會在底層操作系統自己用到, 我們在寫代碼時用不到.
person1與person2兩個go程共同訪問共享數據, 由於CPU調度隨機, 需要對 共享數據訪問順序加以限定(同步).
創建mutex(互斥鎖), 訪問共享數據之前, 加鎖; 訪問結束, 解鎖.
在person1的go程加鎖期間, person2的go程加鎖會失敗 --> 阻塞.
直至person1的go程解鎖mutext, person2從阻塞處, 恢復執行.
讀寫鎖
互斥鎖的本質是當一個goroutine訪問的時候, 其它goroutine都不能訪問. 這樣在資源同步, 避免競爭的同時, 也降低了程序的並發性能, 程序由原來的並行執行變成了串行執行.
其實, 當我們對一個不會變化的數據只做讀操作的話, 是不存在資源競爭的問題的. 因為數據是不變的, 不管怎么讀取, 多少goroutine同時讀取, 都是可以的.
所以問題不是出在讀上, 主要是修改, 也就是寫. 修改的數據要同步, 這樣其它goroutine才可以感知到. 所以真正的互斥應該是讀取和修改、修改和修改之間, 讀和讀是沒有互斥操作的必要的.
因此, 衍生出另外一種鎖, 叫做讀寫鎖.
讀寫鎖可以讓多個讀操作並發, 同時讀取, 但是對於寫操作是完全互斥的. 也就是說, 當一個goroutine進行寫操作的時候, 其它goroutine既不能進行讀操作, 也不能進行寫操作.
Go中的讀寫鎖由結構體類型 sync.RWMutex 表示. 此類型的方法集合中包含兩對方法:
一組是對寫操作的鎖定和解鎖, 簡稱為: 寫鎖定 和 寫解鎖.
func (*RWMutex) Lock()
func (*RWMutex) Unlock()
另一組表示對讀操作的鎖定和解鎖, 簡稱為: 讀鎖定 和 讀解鎖.
func (*RWMutex) RLock()
func (*RWMutex) RUnlock()
我們先來看一下沒有使用讀寫鎖的情況下會發生什么:
package main
import (
"fmt"
"math/rand"
"time"
)
func readGo(in <-chan int, idx int){
for {
num := <- in
fmt.Printf("----第%d個讀go程, 讀入: %d\n", idx, num)
}
}
func writeGo(out chan<- int, idx int){
for {
// 生成隨機數
num := rand.Intn(1000)
out <- num
fmt.Printf("第%d個寫go程, 寫入: %d\n", idx, num)
time.Sleep(time.Millisecond * 300)
}
}
func main() {
// 隨機數種子
rand.Seed(time.Now().UnixNano())
ch := make(chan int)
for i:=0; i<5; i++ {
go readGo(ch, i+1)
}
for i:=0; i<5; i++ {
go writeGo(ch, i+1)
}
time.Sleep(time.Second * 3)
}
結果(截取部分):
......
第4個寫go程, 寫入: 763
----第1個讀go程, 讀入: 998
第1個寫go程, 寫入: 238
第3個寫go程, 寫入: 998
......
第5個寫go程, 寫入: 607
第4個寫go程, 寫入: 151
----第1個讀go程, 讀入: 992
----第2個讀go程, 讀入: 151
......
通過結果我們可以知道, 當寫入 763 時, 由於創建的是無緩沖的channel, 應該先把這個數讀出來, 然后才可以繼續寫數據, 但是結果顯示, 讀到的是 998, 998 在下面才顯示寫入啊, 怎么會先讀出來呢? 出現這個情況的問題在於, 當運行到 num := <- in 時, 已經把 998 寫進去了, 但是這個時候還沒有來得及打印, 就失去了CPU, 失去CPU之后, 緩沖區中的數據就會被覆蓋掉, 這時被 763 所覆蓋.
這是第一個錯誤現象, 我們再來看一下第二個錯誤現象.
既然都是對數據進行讀操作, 相鄰的讀入應該都是相同的數, 比如說----第1個讀go程, 讀入: 992 ----第2個讀go程, 讀入: 151, 這兩個應該讀到的數都是一樣的, 但是結果顯示卻是不同的.
那么加了讀寫鎖之后, 先來看一下錯誤代碼, 大家可以想一下為什么會出現這種錯誤.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
var rwMutex sync.RWMutex
func readGo(in <-chan int, idx int){
for {
rwMutex.RLock() // 以讀模式加鎖
num := <- in
fmt.Printf("----第%d個讀go程, 讀入: %d\n", idx, num)
rwMutex.RUnlock() // 以讀模式解鎖
}
}
func writeGo(out chan<- int, idx int){
for {
// 生成隨機數
num := rand.Intn(1000)
rwMutex.Lock() // 以寫模式加鎖
out <- num
fmt.Printf("第%d個寫go程, 寫入: %d\n", idx, num)
time.Sleep(time.Millisecond * 300)
rwMutex.Unlock() // 以寫模式解鎖
}
}
func main() {
// 隨機數種子
rand.Seed(time.Now().UnixNano())
ch := make(chan int)
for i:=0; i<5; i++ {
go readGo(ch, i+1)
}
for i:=0; i<5; i++ {
go writeGo(ch, i+1)
}
time.Sleep(time.Second * 3)
}
上面代碼的結果會一直阻塞, 沒有輸出, 大家可以簡單想一下出現這種情況的原因是什么?
代碼看得仔細的應該都可以看出來, 這上面的代碼中, 比如說讀操作先搶到了CPU, 運行代碼 rwMutex.RLock() 讀加鎖, 然后運行到 num := <- in 時, 會要求寫端同時在線, 否則就會發生阻塞, 但是這時寫端不可能在線, 因為讀加鎖了. 所以就會一直在這發生阻塞.
這也就是我們之前在死鎖部分中提到的 隱性死鎖 (不報錯).
那么解決辦法有兩種: 一種是不混用, 另一種是使用條件變量(之后會講到)
我們先看一下不混用讀寫鎖與channel的解決辦法(只使用讀寫鎖, 如果只使用channel達不到想要的效果):
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
var rwMutex2 sync.RWMutex // 鎖只有一把, 兩個屬性: r w
var value int // 定義全局變量, 模擬共享數據
func readGo2(in <-chan int, idx int){
for {
rwMutex2.RLock() // 以讀模式加鎖
num := value
fmt.Printf("----第%d個讀go程, 讀入: %d\n", idx, num)
rwMutex2.RUnlock() // 以讀模式解鎖
}
}
func writeGo2(out chan<- int, idx int){
for {
// 生成隨機數
num := rand.Intn(1000)
rwMutex2.Lock() // 以寫模式加鎖
value = num
fmt.Printf("第%d個寫go程, 寫入: %d\n", idx, num)
time.Sleep(time.Millisecond * 300)
rwMutex2.Unlock() // 以寫模式解鎖
}
}
func main() {
// 隨機數種子
rand.Seed(time.Now().UnixNano())
ch := make(chan int)
for i:=0; i<5; i++ {
go readGo2(ch, i+1)
}
for i:=0; i<5; i++ {
go writeGo2(ch, i+1)
}
time.Sleep(time.Second * 3)
}
結果:
......
第5個寫go程, 寫入: 363
----第4個讀go程, 讀入: 363
----第4個讀go程, 讀入: 363
----第4個讀go程, 讀入: 363
----第4個讀go程, 讀入: 363
----第2個讀go程, 讀入: 363
第5個寫go程, 寫入: 726
----第5個讀go程, 讀入: 726
----第4個讀go程, 讀入: 726
----第2個讀go程, 讀入: 726
----第1個讀go程, 讀入: 726
----第3個讀go程, 讀入: 726
第1個寫go程, 寫入: 764
----第5個讀go程, 讀入: 764
----第2個讀go程, 讀入: 764
----第5個讀go程, 讀入: 764
----第1個讀go程, 讀入: 764
----第3個讀go程, 讀入: 764
......
處於讀鎖定狀態, 那么針對它的寫鎖定操作將永遠不會成功, 且相應的goroutine也會被一直阻塞, 因為它們是互斥的.
總結: 讀寫鎖控制下的多個寫操作之間都是互斥的, 並且寫操作與讀操作之間也都是互斥的. 但是多個讀操作之間不存在互斥關系.
從互斥鎖和讀寫鎖的源碼可以看出, 它們是同源的. 讀寫鎖的內部用互斥鎖來實現寫鎖定操作之間的互斥. 可以把讀寫鎖看作是互斥鎖的一種擴展.
條件變量
在講條件變量之前, 我們先來回顧一下之前的生產者消費者模型:
package main
import (
"fmt"
"time"
)
func producer(out chan <- int) {
for i:=0; i<5; i++ {
fmt.Println("生產者, 生產: ", i)
out <- i
}
close(out)
}
func consumer(in <- chan int) {
for num := range in {
fmt.Println("---消費者, 消費: ", num)
}
}
func main() {
ch := make(chan int)
go producer(ch)
go consumer(ch)
time.Sleep(5 * time.Second)
}
之前都是一個生產者與一個消費者, 那么如果是多個生產者與多個消費者的情況呢?
package main
import (
"fmt"
"math/rand"
"time"
)
func producer(out chan <- int, idx int) {
for i:=0; i<10; i++ {
num := rand.Intn(800)
fmt.Printf("第%d個生產者, 生產: %d\n", idx, num)
out <- num
}
}
func consumer(in <- chan int, idx int) {
for num := range in {
fmt.Printf("---第%d個消費者, 消費: %d\n", idx, num)
}
}
func main() {
ch := make(chan int)
rand.Seed(time.Now().UnixNano())
for i := 0; i < 5; i++ {
go producer(ch, i + 1)
}
for i := 0; i < 5; i++ {
go consumer(ch, i + 1)
}
time.Sleep(5 * time.Second)
}
如果是按照上面的代碼寫的話, 就又會出現之前的錯誤.
上面已經說過了, 解決這種錯誤有兩種方法: 用鎖或者用條件變量.
這次就用條件變量來解決一下.
首先, 強調一下. 條件變量本身不是鎖!! 但是經常與鎖結合使用!!
還有另外一個問題, 如果消費者比生產者多, 倉庫中就會出現沒有數據的情況. 我們需要不斷的通過循環來判斷倉庫隊列中是否有數據, 這樣會造成cpu的浪費. 反之, 如果生產者比較多, 倉庫很容易滿, 滿了就不能繼續添加數據, 也需要循環判斷倉庫滿這一事件, 同樣也會造成cpu的浪費.
我們希望當倉庫滿時, 生產者停止生產, 等待消費者消費; 同理, 如果倉庫空了, 我們希望消費者停下來等待生產者生產. 為了達到這個目的, 這里就引入了條件變量. (需要注意, 如果倉庫隊列用channel, 是不存在以上情況的, 因為channel被填滿后就阻塞了, 或者channel中沒有數據也會阻塞).
條件變量: 條件變量的作用並不保證在同一時刻僅有一個協程(線程)訪問某個共享的數據資源, 而是在對應的共享數據的狀態發生變化時, 通知阻塞在某個條件上的協程(線程). 條件變量不是鎖, 在並發中不能達到同步的目的, 因此條件變量總是與鎖一塊使用.
例如, 我們上面說的, 如果倉庫隊列滿了, 我們可以使用條件變量讓生產者對應的goroutine暫停(阻塞), 但是當消費者消費了某個產品后, 倉庫就不再滿了, 應該喚醒(發送通知給)阻塞的生產者goroutine繼續生產產品.
Go標准庫中的 sync.Cond 類型代表了條件變量. 條件變量要與鎖(互斥鎖或者讀寫鎖)一起使用. 成員變量L代表與條件變量搭配使用的鎖.
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
對應的有3個常用的方法, Wait, Signal, Broadcast
- func (c *Cond) Wait()
該函數的作用可歸納為如下三點:
- 阻塞等待條件變量滿足
- 釋放已掌握的互斥鎖相當於cond.L.Unlock()。注意: 兩步為一個原子操作(第一步與第二步操作不可再分).
- 當被喚醒時,
Wait()函數返回時, 解除阻塞並重新獲取互斥鎖. 相當於cond.L.Lock()
- func (c *Cond) Signal()
單發通知, 給一個正等待(阻塞)在該條件變量上的goroutine(線程)發送通知.
- func (c *Cond) Broadcast()
廣播通知, 給正在等待(阻塞)在該條件變量上的所有goroutine(線程)發送通知
下面, 我們就用條件變量來寫一個生產者消費者模型.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
var cond sync.Cond // 定義全局變量
func producer2(out chan<- int, idx int) {
for {
// 先加鎖
cond.L.Lock()
// 判斷緩沖區是否滿
for len(out) == 3 {
cond.Wait()
}
num := rand.Intn(800)
out <- num
fmt.Printf("第%d個生產者, 生產: %d\n", idx, num)
// 訪問公共區結束, 並且打印結束, 解鎖
cond.L.Unlock()
// 喚醒阻塞在條件變量上的 消費者
cond.Signal()
}
}
func consumer2(in <- chan int, idx int) {
for {
// 先加鎖
cond.L.Lock()
// 判斷緩沖區是否為 空
for len(in) == 0 {
cond.Wait()
}
num := <- in
fmt.Printf("---第%d個消費者, 消費: %d\n", idx, num)
// 訪問公共區結束后, 解鎖
cond.L.Unlock()
// 喚醒阻塞在條件變量上的生產者
cond.Signal()
}
}
func main() {
// 設置隨機種子數
rand.Seed(time.Now().UnixNano())
ch := make(chan int, 3)
cond.L = new(sync.Mutex)
for i := 0; i < 5; i++ {
go producer2(ch, i + 1)
}
for i := 0; i < 5; i++ {
go consumer2(ch, i + 1)
}
time.Sleep(time.Second * 1)
}
1)定義 ch 作為隊列, 生產者產生數據保存至隊列中, 最多存儲3個數據, 消費者從中取出數據模擬消費
2)條件變量要與鎖一起使用, 這里定義全局條件變量 cond, 它有一個屬性: L Locker, 是一個互斥鎖.
3)開啟5個消費者go程, 開啟5個生產者go程.
4)producer2 生產者, 在該方法中開啟互斥鎖, 保證數據完整性. 並且判斷隊列是否滿, 如果已滿, 調用 cond.Wait() 讓該goroutine阻塞. 當消費者取出數據后執行 cond.Signal(), 會喚醒該goroutine, 繼續產生數據.
5)consumer2 消費者, 同樣開啟互斥鎖, 保證數據完整性. 判斷隊列是否為空, 如果為空, 調用 cond.Wait() 使得當前goroutine阻塞. 當生產者產生數據並添加到隊列, 執行 cond.Signal() 喚醒該goroutine.
條件變量使用流程:
- 創建條件變量: var cond sync.Cond
- 指定條件變量用的鎖: cond.L = new(sync.Mutex)
- 給公共區加鎖(互斥鎖): cond.L.Lock()
- 判斷是否到達阻塞條件(緩沖區滿/空) --> for循環判斷
for len(ch) == cap(ch) { cond.Wait() } 或者 for len(ch) == 0 { cond.Wait() } 1) 阻塞 2)解鎖 3)加鎖 - 訪問公共區 --> 讀、寫數據、打印
- 解鎖條件變量用的鎖: cond.L.Unlock()
- 喚醒阻塞在條件變量上的對端: cond.Signal() cond.Broadcast()
李培冠博客
歡迎訪問我的個人網站:
李培冠博客:lpgit.com
