SingleFlight將並發請求合並成一個請求,可用於減少下游壓力;CyclicBarrier可重用柵欄並發原語,控制一組請求同時執行;
SingleFlight
在Go中SingleFlight並不是原生提供的,而是開發組提供的擴展並發原語。它可實現多個goroutine調用通過一函數時,只讓一個goroutine調用該函數,等到該goroutine調用函數返回結果時再將結果返回給其他同時調用的goroutine,從而做到了減少並發調用的次數;
在秒殺、緩存等場景下SingleFlight作用很明顯,能夠大規模的減少並發數量避免緩存穿透、系統崩潰等。將多個並發請求 合並成一,瞬間將下游系統壓力從N減少到1;
func flightDemo() {
key := "flight"
for i := 0; i < 5; i++ {
log.Printf("ID: %d 請求獲取緩存", i)
go func(id int) {
value, _ := getCache(key, id)
log.Printf("ID :%d 獲取到緩存 , key: %v,value: %v", id, key, value)
}(i)
}
time.Sleep(20 * time.Second)
}
func getCache(key string, id int) (string, error) {
var ret, _, _ = group.Do(key, func() (ret interface{}, err error) {
time.Sleep(2 * time.Second)//模擬獲取緩存
log.Printf("ID: %v 執行獲取緩存", id)
return id, nil
})
return strconv.Itoa(ret.(int)), nil
}
執行結果:
2020/12/14 14:35:13 ID: 0 請求獲取緩存
2020/12/14 14:35:13 ID: 1 請求獲取緩存
2020/12/14 14:35:13 ID: 2 請求獲取緩存
2020/12/14 14:35:13 ID: 3 請求獲取緩存
2020/12/14 14:35:13 ID: 4 請求獲取緩存
2020/12/14 14:35:15 ID: 0 執行獲取緩存
2020/12/14 14:35:15 ID :0 獲取到緩存 , key: flight,value: 0
2020/12/14 14:35:15 ID :2 獲取到緩存 , key: flight,value: 0
2020/12/14 14:35:15 ID :4 獲取到緩存 , key: flight,value: 0
2020/12/14 14:35:15 ID :3 獲取到緩存 , key: flight,value: 0
2020/12/14 14:35:15 ID :1 獲取到緩存 , key: flight,value: 0
這個Demo中有五個goroutine同時發起獲取key為flight的緩存,由於使用了SingleFlight對象,ID為0的請求率先發起了獲取緩存,其他4個goroutine並不會去執行獲取緩存請求邏輯,而是等到ID為0的請求取得到結果后直接使用該結果;
SingleFlight內部使用了互斥鎖Mutex與Map實現,Mutex用於提供並發時的讀寫保護,Map用於保存同一個key的處理請求;SingleFlight提供了如下三個方法:
Do: 執行一個函數,返回函數的執行結果;
DoChan: 與Do方法類似,返回的是一個chan,函數fn執行完成產生結果后,可從chan中接受到函數的執行結果;
Forget: 丟棄某個key,之后這個key請求會繼續執行函數fn,不在等待前一個請求fn函數的執行結果;
SingleFlight的實現部分代碼如下,其中call為具體的的請求、Group代表Singleflight、map[string]*call用於存儲相對應的key所發起的請求;
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
//是否已存在該key的請求
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait() //等待該key第一個請求完成
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true //返回該key第一個請求的結果
}
c := new(call) //第一個請求
c.wg.Add(1)
g.m[key] = c //將請求加入到map中
g.mu.Unlock()
g.doCall(c, key, fn) //調用函數fn
return c.val, c.err, c.dups > 0
}
CyclicBarrier
在Go的標准庫中、開發組擴展庫中其實也並沒有CyclicBarrier的實現,有個第三方的CyclicBarrier實現:https://github.com/marusama/cyclicbarrier, 它的邏輯為:一組goroutine彼此等待直到所有的goroutine都達到某個執行點,再往下執行。就如柵欄一樣等指定數量的人到齊了,開始抬起柵欄放行;它的執行邏輯與Java的cyclicbarrier類似;
在Go標准庫中有個對象有類似的功能:WaitGroup,但該對象並沒有CyclicBarrier那么簡單易用;
func cyclicBarrierDemo(){
for i := 0; i < 3; i++ {
go func(id int) {
log.Printf("start: %v", id)
barrier.Await(context.Background())
log.Printf("finish: %v", id)
}(i)
}
time.Sleep(5 * time.Second)
log.Printf("完成")
}
執行結果:
2020/12/14 15:11:57 start: 2
2020/12/14 15:11:57 start: 0
2020/12/14 15:11:57 start: 1
2020/12/14 15:11:57 finish: 1
2020/12/14 15:11:57 finish: 2
2020/12/14 15:11:57 finish: 0
2020/12/14 15:12:02 完成
通過上面Demo可以看到ID為2、0的goroutine輸出start后並沒有繼續往下執行,而是等到ID為0的goroutine執行到start后三個goroutine一起往下執行;
如沒有使用柵欄,則這個Demo的執行結果如下:
2020/12/14 15:09:02 start: 0
2020/12/14 15:09:02 finish: 0
2020/12/14 15:09:02 start: 1
2020/12/14 15:09:02 finish: 2
2020/12/14 15:09:02 start: 2
2020/12/14 15:09:02 finish: 2
2020/12/14 15:09:07 完成