最近讀groupcache的源碼,有個一次執行的模塊。
保證同一個key的函數只執行一次。
原理是利用sync.waitGroup的wait可以同步阻塞。然后等待所有的wait完成
寫了個測試的demo程序,其實還是需要分析下標准庫源碼。
wait是個for循環,檢測當前的狀態
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// Increment waiters count.
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
如下是我的demo
1 package main 2 3 import ( 4 "fmt" 5 "sync" 6 "time" 7 ) 8 9 10 /* 11 測試多個wait的情況 12 13 */ 14 func main() { 15 fmt.Println("start") 16 17 var group sync.WaitGroup 18 19 group.Add(1) 20 for i:=0; i< 10; i++{ 21 go func(i int){ 22 fmt.Printf("process %d waiting\n", i) 23 group.Wait() 24 fmt.Printf("process %d waiting done\n", i) 25 }(i) 26 } 27 time.Sleep(time.Second * 1) 28 group.Done() 29 30 time.Sleep(time.Second * 1) 31 32 fmt.Println("end") 33 }
