使用Golang的singleflight防止緩存擊穿


背景

在使用緩存時,容易發生緩存擊穿。

緩存擊穿:一個存在的key,在緩存過期的瞬間,同時有大量的請求過來,造成所有請求都去讀dB,這些請求都會擊穿到DB,造成瞬時DB請求量大、壓力驟增。

singleflight

介紹

import "golang.org/x/sync/singleflight"
singleflight類的使用方法就新建一個singleflight.Group,使用其方法Do或者DoChan來包裝方法,被包裝的方法在對於同一個key,只會有一個協程執行,其他協程等待那個協程執行結束后,拿到同樣的結果。

  • Group結構體
    代表一類工作,同一個group中,同樣的key同時只能被執行一次。
  • Do方法
    func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
    key:同一個key,同時只有一個協程執行。
    fn:被包裝的函數。
    v:返回值,即執行的結果。其他等待的協程都會拿到。
    shared:表示是否有其他協程得到了這個結果v。
  • DoChan方法
    func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
    與Do方法一樣,只是返回的是一個channel,執行結果會發送到channel中,其他等待的協程都可以從channel中拿到結果。

ref:https://godoc.org/golang.org/x/sync/singleflight

示例

  • 使用Do方法來模擬,解決緩存擊穿的問題
func main() {
   var singleSetCache singleflight.Group

   getAndSetCache:=func (requestID int,cacheKey string) (string, error) {
      log.Printf("request %v start to get and set cache...",requestID)
      value,_, _ :=singleSetCache.Do(cacheKey, func() (ret interface{}, err error) {//do的入參key,可以直接使用緩存的key,這樣同一個緩存,只有一個協程會去讀DB
         log.Printf("request %v is setting cache...",requestID)
         time.Sleep(3*time.Second)
         log.Printf("request %v set cache success!",requestID)
         return "VALUE",nil
      })
      return value.(string),nil
   }

   cacheKey:="cacheKey"
   for i:=1;i<10;i++{//模擬多個協程同時請求
      go func(requestID int) {
         value,_:=getAndSetCache(requestID,cacheKey)
         log.Printf("request %v get value: %v",requestID,value)
      }(i)
   }
   time.Sleep(20*time.Second)
}

輸出:

2020/04/12 18:18:40 request 4 start to get and set cache...
2020/04/12 18:18:40 request 4 is setting cache...
2020/04/12 18:18:40 request 2 start to get and set cache...
2020/04/12 18:18:40 request 7 start to get and set cache...
2020/04/12 18:18:40 request 5 start to get and set cache...
2020/04/12 18:18:40 request 1 start to get and set cache...
2020/04/12 18:18:40 request 6 start to get and set cache...
2020/04/12 18:18:40 request 3 start to get and set cache...
2020/04/12 18:18:40 request 8 start to get and set cache...
2020/04/12 18:18:40 request 9 start to get and set cache...
2020/04/12 18:18:43 request 4 set cache success!
2020/04/12 18:18:43 request 4 get value: VALUE
2020/04/12 18:18:43 request 9 get value: VALUE
2020/04/12 18:18:43 request 6 get value: VALUE
2020/04/12 18:18:43 request 3 get value: VALUE
2020/04/12 18:18:43 request 8 get value: VALUE
2020/04/12 18:18:43 request 1 get value: VALUE
2020/04/12 18:18:43 request 5 get value: VALUE
2020/04/12 18:18:43 request 2 get value: VALUE
2020/04/12 18:18:43 request 7 get value: VALUE

可以看到確實只有一個協程執行了被包裝的函數,並且其他協程都拿到了結果。

源碼分析

看一下這個Do方法是怎么實現的。
首先看一下Group的結構:

type Group struct {
   mu sync.Mutex      
   m  map[string]*call //保存key對應的函數執行過程和結果的變量。
}

Group的結構非常簡單,一個鎖來保證並發安全,另一個map用來保存key對應的函數執行過程和結果的變量。
看下call的結構:

type call struct {
   wg sync.WaitGroup //用WaitGroup實現只有一個協程執行函數
   val interface{} //函數執行結果
   err error
   forgotten bool
   dups  int //含義是duplications,即同時執行同一個key的協程數量
   chans []chan<- Result
}

看下Do方法

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
   g.mu.Lock()//寫Group的m字段時,加鎖保證寫安全。
   if g.m == nil {
      g.m = make(map[string]*call)
   }
   if c, ok := g.m[key]; ok {//如果key已經存在,說明已經有協程在執行,則dups++,並等待其執行完畢后,返回其執行結果,執行結果保存在對應的call的val字段里
      c.dups++
      g.mu.Unlock()
      c.wg.Wait()
      return c.val, c.err, true
   }
   //如果key不存在,則新建一個call,並使用WaitGroup來阻塞其他協程,同時在m字段里寫入key和對應的call
   c := new(call)
   c.wg.Add(1)
   g.m[key] = c
   g.mu.Unlock()

   g.doCall(c, key, fn)//第一個進來的協程來執行這個函數
   return c.val, c.err, c.dups > 0
}

繼續看下g.doCall里具體干了什么

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
   c.val, c.err = fn()//執行被包裝的函數
   c.wg.Done()//執行完畢后,就可以通知其他協程可以拿結果了

   g.mu.Lock()
   if !c.forgotten {//其實這里是為了保證執行完畢之后,對應的key被刪除,Group有一個方法Forget(key string),可以用來主動刪除key,這里是判斷那個方法是否被調用過,被調用過則字段forgotten會置為true,如果沒有被調用過,則在這里把key刪除。
      delete(g.m, key)
   }
   for _, ch := range c.chans {//將執行結果發送到channel里,這里是給DoChan方法使用的
      ch <- Result{c.val, c.err, c.dups > 0}
   }
   g.mu.Unlock()
}

由此看來,其實現是非常簡單的。不得不贊嘆一百來行代碼就實現了功能。

其他

順便附上DoChan方法的使用示例:

func main() {
   var singleSetCache singleflight.Group

   getAndSetCache:=func (requestID int,cacheKey string) (string, error) {
      log.Printf("request %v start to get and set cache...",requestID)
      retChan:=singleSetCache.DoChan(cacheKey, func() (ret interface{}, err error) {
         log.Printf("request %v is setting cache...",requestID)
         time.Sleep(3*time.Second)
         log.Printf("request %v set cache success!",requestID)
         return "VALUE",nil
      })

      var ret singleflight.Result

      timeout := time.After(5 * time.Second)

      select {//加入了超時機制
      case <-timeout:
         log.Printf("time out!")
         return "",errors.New("time out")
      case ret =<- retChan://從chan中取出結果
         return ret.Val.(string),ret.Err
      }
      return "",nil
   }

   cacheKey:="cacheKey"
   for i:=1;i<10;i++{
      go func(requestID int) {
         value,_:=getAndSetCache(requestID,cacheKey)
         log.Printf("request %v get value: %v",requestID,value)
      }(i)
   }
   time.Sleep(20*time.Second)
}

看下DoChan的源碼

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
   ch := make(chan Result, 1)
   g.mu.Lock()
   if g.m == nil {
      g.m = make(map[string]*call)
   }
   if c, ok := g.m[key]; ok {
      c.dups++
      c.chans = append(c.chans, ch)//可以看到,每個等待的協程,都有一個結果channel。從之前的g.doCall里也可以看到,每個channel都給塞了結果。為什么不所有協程共用一個channel?因為那樣就得在channel里塞至少與協程數量一樣的結果數量,但是你卻無法保證用戶一個協程只讀取一次。
      g.mu.Unlock()
      return ch
   }
   c := &call{chans: []chan<- Result{ch}}
   c.wg.Add(1)
   g.m[key] = c
   g.mu.Unlock()

   go g.doCall(c, key, fn)

   return ch
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM