ond需要指定一個Locker,通常是一個*Mutex或*RWMutex。
func (c *Cond) Broadcast() 和 func (c *Cond) Signal() 喚醒因wait condition而掛起goroutine,區別是Signal只喚醒一個,而Broadcast喚醒所有。允許調用者獲取基礎鎖Locker之后再調用喚醒,但非必需。
func (c *Cond) Wait()方法在調用時會釋放底層鎖Locker,並且將當前goroutine掛起,直到另一個goroutine執行Signal或者Broadcase,該goroutine才有機會重新喚醒,並嘗試獲取Locker,完成后續邏輯。
使用Wait 方法之前,我們必須先獲取外部鎖,原因是:先當前協程占有着鎖,並掛起當前協程等待,其他協程的 通知喚醒,好走后續的業務邏輯,(占有着鎖,是不想別人拿到鎖,而自己走不到Wait這一步,而Wait是掛起了當前協程,等待別人通知,這樣做,就知道只要通知一來,肯定是當前協程可以繼續往下走了),這里自己通過對比 Wait的使用及Wait的源碼自己就明白了,使用示例:
package main import ( "fmt" "math/rand" "sync" "time" ) var locker = new(sync.Mutex) var cond = sync.NewCond(locker) var capacity = 10 var consumerNum = 3 var producerNum = 5 func producer(out chan<- int) { for i := 0; i < producerNum; i++ { go func(nu int) { for { cond.L.Lock() for len(out) == capacity { fmt.Println("Capacity Full, stop Produce") cond.Wait() } num := rand.Intn(100) out <- num fmt.Printf("Produce %d produce: num %d\n", nu, num) cond.L.Unlock() cond.Signal() time.Sleep(time.Second) } }(i) } } func consumer(in <-chan int) { for i := 0; i < consumerNum; i++ { go func(nu int) { for { cond.L.Lock() for len(in) == 0 { fmt.Println("Capacity Empty, stop Consume") cond.Wait() } num := <-in fmt.Printf("Goroutine %d: consume num %d\n", nu, num) cond.L.Unlock() time.Sleep(time.Millisecond * 500) cond.Signal() } }(i) } } func main() { rand.Seed(time.Now().UnixNano()) quit := make(chan bool) product := make(chan int, capacity) producer(product) consumer(product) <-quit }
sync/Cond.go源碼
package sync import ( "sync/atomic" "unsafe" ) // Cond implements a condition variable, a rendezvous point // for goroutines waiting for or announcing the occurrence // of an event. // // Each Cond has an associated Locker L (often a *Mutex or *RWMutex), // which must be held when changing the condition and // when calling the Wait method. // // A Cond must not be copied after first use. // Cond實現了一個條件變量,一個等待或宣布事件發生的goroutines的集合點。 // 每個Cond都有一個相關的Locker L(通常是* Mutex或* RWMutex) type Cond struct { // 不允許復制,一個結構體,有一個Lock()方法,嵌入別的結構體中,表示不允許復制 // noCopy對象,擁有一個Lock方法,使得Cond對象在進行go vet掃描的時候,能夠被檢測到是否被復制 noCopy noCopy // L is held while observing or changing the condition // 鎖的具體實現,通常為 mutex 或者rwmutex L Locker // 通知列表,調用Wait()方法的goroutine會被放入list中,每次喚醒,從這里取出 // notifyList對象,維護等待喚醒的goroutine隊列,使用鏈表實現 // 在 sync 包中被實現, src/sync/runtime.go notify notifyList // 復制檢查,檢查cond實例是否被復制 // copyChecker對象,實際上是uintptr對象,保存自身對象地址 checker copyChecker } // NewCond returns a new Cond with Locker l. // NewCond方法傳入一個實現了Locker接口的對象,返回一個新的Cond對象指針, // 保證在多goroutine使用cond的時候,持有的是同一個實例 func NewCond(l Locker) *Cond { return &Cond{L: l} } // Wait atomically unlocks c.L and suspends execution // of the calling goroutine. After later resuming execution, // Wait locks c.L before returning. Unlike in other systems, // Wait cannot return unless awoken by Broadcast or Signal. // // Because c.L is not locked when Wait first resumes, the caller // typically cannot assume that the condition is true when // Wait returns. Instead, the caller should Wait in a loop: // 等待原子解鎖c.L並暫停執行調用goroutine。 // 稍后恢復執行后,Wait會在返回之前鎖定c.L. // 與其他系統不同,除非被廣播或信號喚醒,否則等待無法返回。 // 因為等待第一次恢復時c.L沒有被鎖定, // 所以當Wait返回時,調用者通常不能認為條件為真。 // 相反,調用者應該循環等待: // // c.L.Lock() // for !condition() { // c.Wait() // } // ... make use of condition ... // c.L.Unlock() // //調用此方法會將此routine加入通知列表,並等待獲取通知,調用此方法必須先Lock,不然方法里會調用Unlock(),報錯 func (c *Cond) Wait() { // 檢查是否被復制; 如果是就panic // check檢查,保證cond在第一次使用后沒有被復制 c.checker.check() // 將當前goroutine加入等待隊列, 該方法在 runtime 包的 notifyListAdd 函數中實現 src/runtime/sema.go t := runtime_notifyListAdd(&c.notify) // 釋放鎖, 因此在調用Wait方法前,必須保證獲取到了cond的鎖,否則會報錯 c.L.Unlock() // 等待隊列中的所有的goroutine執行等待喚醒操作 // 將當前goroutine掛起,等待喚醒信號 // 該方法在 runtime 包的 notifyListWait 函數中實現 src/runtime/sema.go runtime_notifyListWait(&c.notify, t) c.L.Lock() } // Signal wakes one goroutine waiting on c, if there is any. // // It is allowed but not required for the caller to hold c.L // during the call. // 喚醒單個 等待的 goroutine func (c *Cond) Signal() { c.checker.check() // 通知等待列表中的一個, 順序喚醒一個等待的gorountine // 在runtime 包的 notifyListNotifyOne 函數中被實現 src/runtime/sema.go runtime_notifyListNotifyOne(&c.notify) } // Broadcast wakes all goroutines waiting on c. // // It is allowed but not required for the caller to hold c.L // during the call. // 喚醒等待隊列中的所有goroutine。 func (c *Cond) Broadcast() { c.checker.check() // 喚醒等待隊列中所有的goroutine // 有runtime 包的 notifyListNotifyAll 函數實現 src\runtime\sema.go runtime_notifyListNotifyAll(&c.notify) } // copyChecker holds back pointer to itself to detect object copying. // copyChecker保持指向自身的指針以檢測對象復制。 type copyChecker uintptr // 檢查c是否被復制,如果是則panic //check方法在第一次調用的時候,會將checker對象地址賦值給checker,也就是將自身內存地址賦值給自身 func (c *copyChecker) check() { /** 因為 copyChecker的底層類型為 uintptr 那么 這里的 *c其實就是 copyChecker類型本身,然后強轉成uintptr 和拿着 c 也就是copyChecker的指針去求 uintptr,理論上要想等 即:內存地址為一樣,則表示沒有被復制 */ // 下述做法是: // 其實 copyChecker中存儲的對象地址就是 copyChecker 對象自身的地址 // 先把 copyChecker 處存儲的對象地址和自己通過 unsafe.Pointer求出來的對象地址作比較, // 如果發現不相等,那么就嘗試的替換,由於使用的 old是0, // 則表示c還沒有開辟內存空間,也就是說,只有是首次開辟地址才會替換成功 // 如果替換不成功,則表示 copyChecker出所存儲的地址和 unsafe計算出來的不一致 // 則表示對象是被復制了 if uintptr(*c) != uintptr(unsafe.Pointer(c)) && !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) && uintptr(*c) != uintptr(unsafe.Pointer(c)) { panic("sync.Cond is copied") } } // noCopy may be embedded into structs which must not be copied // after the first use. // // See https://golang.org/issues/8005#issuecomment-190753527 // for details. // noCopy可以嵌入到結構中,在第一次使用后不得復制。 type noCopy struct{} // Lock is a no-op used by -copylocks checker from `go vet`. func (*noCopy) Lock() {} func (*noCopy) Unlock() {} type notifyList struct { wait uint32 notify uint32 lock uintptr // key field of the mutex head unsafe.Pointer tail unsafe.Pointer }
我們可以看出,其中
-
Cond不能被復制:Cond在內部持有一個等待隊列,這個隊列維護所有等待在這個Cond的goroutine。因此若這個Cond允許值傳遞,則這個隊列在值傳遞的過程中會進行復制,導致在喚醒goroutine的時候出現錯誤。
-
順序喚醒: notifyList對象持有兩個無限自增的字段wait和notify,wait字段在有新的goroutine等待的時候加1,notify字段在有新的喚醒信號的時候加1。在有新的goroutine加入隊列的時候,會將當前wait賦值給goroutine的ticket,喚醒的時候會喚醒ticket等於notify的gourine。另外,當wait==notify時表示沒有goroutine需要被喚醒,wait>notify時表示有goroutine需要被喚醒,waity恆大於等於notify
Wait: