Golang中的SingleFlight與CyclicBarrier


  SingleFlight將並發請求合並成一個請求,可用於減少下游壓力;CyclicBarrier可重用柵欄並發原語,控制一組請求同時執行;

SingleFlight

  在Go中SingleFlight並不是原生提供的,而是開發組提供的擴展並發原語。它可實現多個goroutine調用通過一函數時,只讓一個goroutine調用該函數,等到該goroutine調用函數返回結果時再將結果返回給其他同時調用的goroutine,從而做到了減少並發調用的次數;
  在秒殺緩存等場景下SingleFlight作用很明顯,能夠大規模的減少並發數量避免緩存穿透系統崩潰等。將多個並發請求 合並成一,瞬間將下游系統壓力從N減少到1

func flightDemo() {
    key := "flight"
	for i := 0; i < 5; i++ {
            log.Printf("ID: %d 請求獲取緩存", i)
            go func(id int) {
		value, _ := getCache(key, id)
		log.Printf("ID :%d 獲取到緩存 , key: %v,value: %v", id, key, value)
	    }(i)
	}
        time.Sleep(20 * time.Second)
}

func getCache(key string, id int) (string, error) {
	var ret, _, _ = group.Do(key, func() (ret interface{}, err error) {
		time.Sleep(2 * time.Second)//模擬獲取緩存
		log.Printf("ID: %v 執行獲取緩存", id)
		return id, nil
	})
	return strconv.Itoa(ret.(int)), nil
}

執行結果:

2020/12/14 14:35:13 ID: 0 請求獲取緩存
2020/12/14 14:35:13 ID: 1 請求獲取緩存
2020/12/14 14:35:13 ID: 2 請求獲取緩存
2020/12/14 14:35:13 ID: 3 請求獲取緩存
2020/12/14 14:35:13 ID: 4 請求獲取緩存
2020/12/14 14:35:15 ID: 0 執行獲取緩存
2020/12/14 14:35:15 ID :0 獲取到緩存 , key: flight,value: 0
2020/12/14 14:35:15 ID :2 獲取到緩存 , key: flight,value: 0
2020/12/14 14:35:15 ID :4 獲取到緩存 , key: flight,value: 0
2020/12/14 14:35:15 ID :3 獲取到緩存 , key: flight,value: 0
2020/12/14 14:35:15 ID :1 獲取到緩存 , key: flight,value: 0

  這個Demo中有五個goroutine同時發起獲取key為flight的緩存,由於使用了SingleFlight對象,ID為0的請求率先發起了獲取緩存,其他4個goroutine並不會去執行獲取緩存請求邏輯,而是等到ID為0的請求取得到結果后直接使用該結果

  SingleFlight內部使用了互斥鎖Mutex與Map實現,Mutex用於提供並發時的讀寫保護,Map用於保存同一個key的處理請求;SingleFlight提供了如下三個方法:
  Do: 執行一個函數,返回函數的執行結果;
  DoChan: 與Do方法類似,返回的是一個chan,函數fn執行完成產生結果后,可從chan中接受到函數的執行結果;
  Forget: 丟棄某個key,之后這個key請求會繼續執行函數fn,不在等待前一個請求fn函數的執行結果;

  SingleFlight的實現部分代碼如下,其中call為具體的的請求、Group代表Singleflight、map[string]*call用於存儲相對應的key所發起的請求;

type call struct {
   wg  sync.WaitGroup
   val interface{}
   err error
}

type Group struct {
   mu sync.Mutex       // protects m
   m  map[string]*call // lazily initialized
}

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	//是否已存在該key的請求
	if c, ok := g.m[key]; ok {
 		c.dups++
		g.mu.Unlock()
		c.wg.Wait()    //等待該key第一個請求完成
		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
		return c.val, c.err, true   //返回該key第一個請求的結果
	}
	c := new(call)   //第一個請求
	c.wg.Add(1)
	g.m[key] = c     //將請求加入到map中
	g.mu.Unlock()
	g.doCall(c, key, fn) //調用函數fn
	return c.val, c.err, c.dups > 0
}

CyclicBarrier

  在Go的標准庫中、開發組擴展庫中其實也並沒有CyclicBarrier的實現,有個第三方的CyclicBarrier實現:https://github.com/marusama/cyclicbarrier, 它的邏輯為:一組goroutine彼此等待直到所有的goroutine都達到某個執行點,再往下執行。就如柵欄一樣等指定數量的人到齊了,開始抬起柵欄放行;它的執行邏輯與Java的cyclicbarrier類似;
  在Go標准庫中有個對象有類似的功能:WaitGroup,但該對象並沒有CyclicBarrier那么簡單易用;

func cyclicBarrierDemo(){
	for i := 0; i < 3; i++ {
		go func(id int) {
            log.Printf("start: %v", id)
		barrier.Await(context.Background())
			log.Printf("finish: %v", id)		
                }(i)
        }

	time.Sleep(5 * time.Second)
	log.Printf("完成")
}

執行結果:

2020/12/14 15:11:57 start: 2
2020/12/14 15:11:57 start: 0
2020/12/14 15:11:57 start: 1
2020/12/14 15:11:57 finish: 1
2020/12/14 15:11:57 finish: 2
2020/12/14 15:11:57 finish: 0
2020/12/14 15:12:02 完成

  通過上面Demo可以看到ID為2、0的goroutine輸出start后並沒有繼續往下執行,而是等到ID為0的goroutine執行到start后三個goroutine一起往下執行;

  如沒有使用柵欄,則這個Demo的執行結果如下:

2020/12/14 15:09:02 start: 0
2020/12/14 15:09:02 finish: 0
2020/12/14 15:09:02 start: 1
2020/12/14 15:09:02 finish: 2
2020/12/14 15:09:02 start: 2
2020/12/14 15:09:02 finish: 2
2020/12/14 15:09:07 完成


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM