package main import "fmt" import "sync" import "math/rand" import "time" var cond sync.Cond // 创建全局条件变量 // 生产者 func producer(out chan<- int, idx int) { for { cond.L.Lock() // 条件变量对应互斥锁加锁 for len(out) == 3 { // 产品区满 等待消费者消费 cond.Wait() // 挂起当前协程, 等待条件变量满足,被消费者唤醒 } num := rand.Intn(1000) // 产生一个随机数 out <- num // 写入到 channel 中 (生产) fmt.Printf("%dth 生产者,产生数据 %3d, 公共区剩余%d个数据\n", idx, num, len(out)) cond.L.Unlock() // 生产结束,解锁互斥锁 cond.Signal() // 唤醒 阻塞的 消费者 time.Sleep(time.Second) // 生产完休息一会,给其他协程执行机会 } } //消费者 func consumer(in <-chan int, idx int) { for { cond.L.Lock() // 条件变量对应互斥锁加锁(与生产者是同一个) for len(in) == 0 { // 产品区为空 等待生产者生产 cond.Wait() // 挂起当前协程, 等待条件变量满足,被生产者唤醒 } num := <-in // 将 channel 中的数据读走 (消费) fmt.Printf("---- %dth 消费者, 消费数据 %3d,公共区剩余%d个数据\n", idx, num, len(in)) cond.L.Unlock() // 消费结束,解锁互斥锁 cond.Signal() // 唤醒 阻塞的 生产者 time.Sleep(time.Millisecond * 500) //消费完 休息一会,给其他协程执行机会 } } func main() { rand.Seed(time.Now().UnixNano()) // 设置随机数种子 quit := make(chan bool) // 创建用于结束通信的 channel product := make(chan int, 3) // 产品区(公共区)使用channel 模拟 cond.L = new(sync.Mutex) // 创建互斥锁和条件变量 for i := 0; i < 5; i++ { // 5个消费者 go producer(product, i+1) } for i := 0; i < 3; i++ { // 3个生产者 go consumer(product, i+1) } <-quit // 主协程阻塞 不结束 }
我们希望当仓库满时,生产者停止生产,等待消费者消费;同理,如果仓库空了,我们希望消费者停下来等待生产者生产。为了达到这个目的,这里引入条件变量。(需要注意:如果仓库队列用channel,是不存在以上情况的,因为channel被填满后就阻塞了,或者channel中没有数据也会阻塞)。
条件变量的作用并不保证在同一时刻仅有一个协程(线程)访问某个共享的数据资源,而是在对应的共享数据的状态发生变化时,通知阻塞在某个条件上的协程(线程)。条件变量不是锁,在并发中不能达到同步的目的,因此条件变量总是与锁一块使用。
GO标准库中的sys.Cond类型代表了条件变量。条件变量要与锁(互斥锁,或者读写锁)一起使用。成员变量L代表与条件变量搭配使用的锁。
对应的有3个常用方法,Wait,Signal,Broadcast。
1) func (c *Cond) Wait()
该函数的作用可归纳为如下三点:
a) 阻塞等待条件变量满足
b) 释放已掌握的互斥锁相当于cond.L.Unlock()。 注意:两步为一个原子操作。
c) 当被唤醒,Wait()函数返回时,解除阻塞并重新获取互斥锁。相当于cond.L.Lock()
2) func (c *Cond) Signal()
单发通知,给一个正等待(阻塞)在该条件变量上的goroutine(线程)发送通知。
3) func (c *Cond) Broadcast()
广播通知,给正在等待(阻塞)在该条件变量上的所有goroutine(线程)发送通知。