go學習筆記 sync/Cond源碼


ond需要指定一個Locker,通常是一個*Mutex或*RWMutex。

func (c *Cond) Broadcast() 和 func (c *Cond) Signal() 喚醒因wait condition而掛起goroutine,區別是Signal只喚醒一個,而Broadcast喚醒所有。允許調用者獲取基礎鎖Locker之后再調用喚醒,但非必需。

func (c *Cond) Wait()方法在調用時會釋放底層鎖Locker,並且將當前goroutine掛起,直到另一個goroutine執行Signal或者Broadcase,該goroutine才有機會重新喚醒,並嘗試獲取Locker,完成后續邏輯。

使用Wait 方法之前,我們必須先獲取外部鎖,原因是:先當前協程占有着鎖,並掛起當前協程等待,其他協程的 通知喚醒,好走后續的業務邏輯,(占有着鎖,是不想別人拿到鎖,而自己走不到Wait這一步,而Wait是掛起了當前協程,等待別人通知,這樣做,就知道只要通知一來,肯定是當前協程可以繼續往下走了),這里自己通過對比 Wait的使用及Wait的源碼自己就明白了,使用示例:

package main
 
import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)
 
var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)
 
var capacity = 10
var consumerNum = 3
var producerNum = 5
 
func producer(out chan<- int) {
    for i := 0; i < producerNum; i++ {
        go func(nu int) {
            for {
                cond.L.Lock()
                for len(out) == capacity {
                    fmt.Println("Capacity Full, stop Produce")
                    cond.Wait()
                }
                num := rand.Intn(100)
                out <- num
                fmt.Printf("Produce %d produce: num %d\n", nu, num)
                cond.L.Unlock()
                cond.Signal()
 
                time.Sleep(time.Second)
            }
        }(i)
    }
}
 
func consumer(in <-chan int) {
    for i := 0; i < consumerNum; i++ {
        go func(nu int) {
 
            for {
                cond.L.Lock()
                for len(in) == 0 {
                    fmt.Println("Capacity Empty, stop Consume")
                    cond.Wait()
                }
                num := <-in
                fmt.Printf("Goroutine %d: consume num %d\n", nu, num)
                cond.L.Unlock()
                time.Sleep(time.Millisecond * 500)
                cond.Signal()
            }
        }(i)
    }
}
 
func main() {
 
    rand.Seed(time.Now().UnixNano())
 
    quit := make(chan bool)
    product := make(chan int, capacity)
 
    producer(product)
    consumer(product)
 
    <-quit
}

sync/Cond.go源碼

package sync
 
import (
    "sync/atomic"
    "unsafe"
)
 
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
// Cond實現了一個條件變量,一個等待或宣布事件發生的goroutines的集合點。
// 每個Cond都有一個相關的Locker L(通常是* Mutex或* RWMutex)
type Cond struct {
    // 不允許復制,一個結構體,有一個Lock()方法,嵌入別的結構體中,表示不允許復制
    // noCopy對象,擁有一個Lock方法,使得Cond對象在進行go vet掃描的時候,能夠被檢測到是否被復制
    noCopy noCopy
 
    // L is held while observing or changing the condition
    // 鎖的具體實現,通常為 mutex 或者rwmutex
    L Locker
 
    // 通知列表,調用Wait()方法的goroutine會被放入list中,每次喚醒,從這里取出
    // notifyList對象,維護等待喚醒的goroutine隊列,使用鏈表實現
    // 在 sync 包中被實現, src/sync/runtime.go
    notify  notifyList
 
    // 復制檢查,檢查cond實例是否被復制
    // copyChecker對象,實際上是uintptr對象,保存自身對象地址
    checker copyChecker
}
 
// NewCond returns a new Cond with Locker l.
// NewCond方法傳入一個實現了Locker接口的對象,返回一個新的Cond對象指針,
// 保證在多goroutine使用cond的時候,持有的是同一個實例
func NewCond(l Locker) *Cond {
    return &Cond{L: l}
}
 
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
// 等待原子解鎖c.L並暫停執行調用goroutine。
// 稍后恢復執行后,Wait會在返回之前鎖定c.L.
// 與其他系統不同,除非被廣播或信號喚醒,否則等待無法返回。
// 因為等待第一次恢復時c.L沒有被鎖定,
// 所以當Wait返回時,調用者通常不能認為條件為真。
// 相反,調用者應該循環等待:
//
//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//
//調用此方法會將此routine加入通知列表,並等待獲取通知,調用此方法必須先Lock,不然方法里會調用Unlock(),報錯
func (c *Cond) Wait() {
    // 檢查是否被復制; 如果是就panic
    // check檢查,保證cond在第一次使用后沒有被復制
    c.checker.check()
    // 將當前goroutine加入等待隊列, 該方法在 runtime 包的 notifyListAdd 函數中實現 src/runtime/sema.go
    t := runtime_notifyListAdd(&c.notify)
    // 釋放鎖, 因此在調用Wait方法前,必須保證獲取到了cond的鎖,否則會報錯
    c.L.Unlock()
 
    // 等待隊列中的所有的goroutine執行等待喚醒操作
    // 將當前goroutine掛起,等待喚醒信號
    // 該方法在 runtime 包的 notifyListWait 函數中實現 src/runtime/sema.go
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}
 
// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
// 喚醒單個 等待的 goroutine
func (c *Cond) Signal() {
    c.checker.check()
    // 通知等待列表中的一個, 順序喚醒一個等待的gorountine
    // 在runtime 包的 notifyListNotifyOne 函數中被實現 src/runtime/sema.go
    runtime_notifyListNotifyOne(&c.notify)
}
 
// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
// 喚醒等待隊列中的所有goroutine。
func (c *Cond) Broadcast() {
    c.checker.check()
    // 喚醒等待隊列中所有的goroutine
    // 有runtime 包的 notifyListNotifyAll 函數實現 src\runtime\sema.go
    runtime_notifyListNotifyAll(&c.notify)
}
 
// copyChecker holds back pointer to itself to detect object copying.
// copyChecker保持指向自身的指針以檢測對象復制。
type copyChecker uintptr
// 檢查c是否被復制,如果是則panic
//check方法在第一次調用的時候,會將checker對象地址賦值給checker,也就是將自身內存地址賦值給自身
func (c *copyChecker) check() {
    /**
    因為 copyChecker的底層類型為 uintptr
    那么 這里的 *c其實就是 copyChecker類型本身,然后強轉成uintptr
    和拿着 c 也就是copyChecker的指針去求 uintptr,理論上要想等
    即:內存地址為一樣,則表示沒有被復制
     */
     // 下述做法是:
     // 其實 copyChecker中存儲的對象地址就是 copyChecker 對象自身的地址
     // 先把 copyChecker 處存儲的對象地址和自己通過 unsafe.Pointer求出來的對象地址作比較,
     // 如果發現不相等,那么就嘗試的替換,由於使用的 old是0,
     // 則表示c還沒有開辟內存空間,也就是說,只有是首次開辟地址才會替換成功
     // 如果替換不成功,則表示 copyChecker出所存儲的地址和 unsafe計算出來的不一致
     // 則表示對象是被復制了
    if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
        !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
        uintptr(*c) != uintptr(unsafe.Pointer(c)) {
        panic("sync.Cond is copied")
    }
}
 
// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
// noCopy可以嵌入到結構中,在第一次使用后不得復制。
type noCopy struct{}
 
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}
 
 
type notifyList struct {
    wait   uint32
    notify uint32
    lock   uintptr // key field of the mutex
    head   unsafe.Pointer
    tail   unsafe.Pointer
}

我們可以看出,其中

  • Cond不能被復制:Cond在內部持有一個等待隊列,這個隊列維護所有等待在這個Cond的goroutine。因此若這個Cond允許值傳遞,則這個隊列在值傳遞的過程中會進行復制,導致在喚醒goroutine的時候出現錯誤。

  • 順序喚醒: notifyList對象持有兩個無限自增的字段wait和notify,wait字段在有新的goroutine等待的時候加1,notify字段在有新的喚醒信號的時候加1。在有新的goroutine加入隊列的時候,會將當前wait賦值給goroutine的ticket,喚醒的時候會喚醒ticket等於notify的gourine。另外,當wait==notify時表示沒有goroutine需要被喚醒,wait>notify時表示有goroutine需要被喚醒,waity恆大於等於notify

Wait:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM