package main import "fmt" import "sync" import "math/rand" import "time" var cond sync.Cond // 創建全局條件變量 // 生產者 func producer(out chan<- int, idx int) { for { cond.L.Lock() // 條件變量對應互斥鎖加鎖 for len(out) == 3 { // 產品區滿 等待消費者消費 cond.Wait() // 掛起當前協程, 等待條件變量滿足,被消費者喚醒 } num := rand.Intn(1000) // 產生一個隨機數 out <- num // 寫入到 channel 中 (生產) fmt.Printf("%dth 生產者,產生數據 %3d, 公共區剩余%d個數據\n", idx, num, len(out)) cond.L.Unlock() // 生產結束,解鎖互斥鎖 cond.Signal() // 喚醒 阻塞的 消費者 time.Sleep(time.Second) // 生產完休息一會,給其他協程執行機會 } } //消費者 func consumer(in <-chan int, idx int) { for { cond.L.Lock() // 條件變量對應互斥鎖加鎖(與生產者是同一個) for len(in) == 0 { // 產品區為空 等待生產者生產 cond.Wait() // 掛起當前協程, 等待條件變量滿足,被生產者喚醒 } num := <-in // 將 channel 中的數據讀走 (消費) fmt.Printf("---- %dth 消費者, 消費數據 %3d,公共區剩余%d個數據\n", idx, num, len(in)) cond.L.Unlock() // 消費結束,解鎖互斥鎖 cond.Signal() // 喚醒 阻塞的 生產者 time.Sleep(time.Millisecond * 500) //消費完 休息一會,給其他協程執行機會 } } func main() { rand.Seed(time.Now().UnixNano()) // 設置隨機數種子 quit := make(chan bool) // 創建用於結束通信的 channel product := make(chan int, 3) // 產品區(公共區)使用channel 模擬 cond.L = new(sync.Mutex) // 創建互斥鎖和條件變量 for i := 0; i < 5; i++ { // 5個消費者 go producer(product, i+1) } for i := 0; i < 3; i++ { // 3個生產者 go consumer(product, i+1) } <-quit // 主協程阻塞 不結束 }
我們希望當倉庫滿時,生產者停止生產,等待消費者消費;同理,如果倉庫空了,我們希望消費者停下來等待生產者生產。為了達到這個目的,這里引入條件變量。(需要注意:如果倉庫隊列用channel,是不存在以上情況的,因為channel被填滿后就阻塞了,或者channel中沒有數據也會阻塞)。
條件變量的作用並不保證在同一時刻僅有一個協程(線程)訪問某個共享的數據資源,而是在對應的共享數據的狀態發生變化時,通知阻塞在某個條件上的協程(線程)。條件變量不是鎖,在並發中不能達到同步的目的,因此條件變量總是與鎖一塊使用。
GO標准庫中的sys.Cond類型代表了條件變量。條件變量要與鎖(互斥鎖,或者讀寫鎖)一起使用。成員變量L代表與條件變量搭配使用的鎖。
對應的有3個常用方法,Wait,Signal,Broadcast。
1) func (c *Cond) Wait()
該函數的作用可歸納為如下三點:
a) 阻塞等待條件變量滿足
b) 釋放已掌握的互斥鎖相當於cond.L.Unlock()。 注意:兩步為一個原子操作。
c) 當被喚醒,Wait()函數返回時,解除阻塞並重新獲取互斥鎖。相當於cond.L.Lock()
2) func (c *Cond) Signal()
單發通知,給一個正等待(阻塞)在該條件變量上的goroutine(線程)發送通知。
3) func (c *Cond) Broadcast()
廣播通知,給正在等待(阻塞)在該條件變量上的所有goroutine(線程)發送通知。