groupcache使用及源碼分析


groupcache是一個緩存系統,開始應用在Google下載站點dl.google.com,后來也使用在Google Blogger和Google Code這些數據更改頻率較低的系統中。

groupcache沒有update/delete 命令,只有set命令,使用lru存儲策略,空間占滿時便淘汰最不常使用的緩存,所以適合數據更改頻率較低的應用。

groupcache集群使用“一致性哈希“分布節點,單節點出現問題對整體系統影響較小。

 

一. groupcache 使用

下面創建有三個groupcache節點的集群

"http://127.0.0.1:8001", "http://127.0.0.1:8002", "http://127.0.0.1:8003"

 

1. 創建本地httppool,並添加對等節點。httppool是一個集群節點選取器,保存所有節點信息,獲取對等節點緩存時,通過計算key的“一致性哈希值”與節點的哈希值比較來選取集群中的某個節點

local_addr = “http://127.0.0.1:8001”
peers := groupcache.NewHTTPPool("http://" + local_addr)

peers_addrs = []string{"http://127.0.0.1:8001", "http://127.0.0.1:8002", "http://127.0.0.1:8003"}
peers.Set(peers_addrs...)

 

2. 創建一個group(一個group是一個存儲模塊,類似命名空間,可以創建多個) “image_cache”。NewGroup參數分別是group名字、group大小byte、getter函數(當獲取不到key對應的緩存的時候,該函數處理如何獲取相應數據,並設置給dest,然后image_cache便會緩存key對應的數據)

var image_cache = groupcache.NewGroup("image", 8<<30, groupcache.GetterFunc(   func(ctx groupcache.Context, key string, dest groupcache.Sink) error { //此函數即為自定義數據處理邏輯   result, err := ioutil.ReadFile(key) if err != nil {   fmt.Printf("read file error %s.\n", err.Error()) return nil  }  fmt.Printf("asking for %s from local file system\n", key) dest.SetBytes([]byte(result)) return nil }))

 

3. group查找對應key的緩存,data需要使用sink(一個數據包裝結構)包裝一下

var data []byte
image_cache.Get(nil, key, groupcache.AllocatingByteSliceSink(&data))

 

其他兩個節點需修改local_addr 地址,然后3個節點的groupcache集群就設置成功了。

 

二. groupcache源碼分析

groupcache主要分為httppool和group兩部分,httppool負責集群管理,group負責緩存管理。

 

1. httppool的作用就是管理所有節點,並通過http協議使節點之間相互通信,獲取存儲在其他節點上的緩存

type HTTPPool struct { // Context optionally specifies a context for the server to use when it // receives a request. // If nil, the server uses a nil Context.
    Context func(*http.Request) Context // Transport optionally specifies an http.RoundTripper for the client // to use when it makes a request. // If nil, the client uses http.DefaultTransport.
    Transport func(Context) http.RoundTripper // this peer's base URL, e.g. "https://example.net:8000"
    self string //本地節點url // opts specifies the options.
 opts HTTPPoolOptions mu sync.Mutex // guards peers and httpGetters
    peers       *consistenthash.Map  //包含 一致性hash map、hash函數的結構體,保存集群節點(都是用url代表,同下)和其對應一致性hash值
    httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008" 保存節點url和其對應的http數據請求器
} type HTTPPoolOptions struct { // BasePath specifies the HTTP path that will serve groupcache requests. // If blank, it defaults to "/_groupcache/".
    BasePath string //peers間url請求路徑 // Replicas specifies the number of key replicas on the consistent hash. // If blank, it defaults to 50.
    Replicas int //單一節點在一致性hash map中的虛擬節點數 // HashFn specifies the hash function of the consistent hash. // If blank, it defaults to crc32.ChecksumIEEE.
    HashFn consistenthash.Hash //一致性hash函數
} type Map struct { //consistenthash.Map
    hash     Hash //一致性hash函數
    replicas int  //單一節點在一致性hash map中的虛擬節點數
    keys     []int // Sorted //所有節點生成的虛擬節點hash值slice
    hashMap  map[int]string  //hash值和節點對應map
} //httppool選取節點算法
func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) { p.mu.Lock() defer p.mu.Unlock() if p.peers.IsEmpty() { return nil, false } if peer := p.peers.Get(key); peer != p.self { //判斷獲取到的節點是不是本地節點
        return p.httpGetters[peer], true //返回節點對應的httpGetter
 } return nil, false } func (m *Map) Get(key string) string {  //p.peers.Get(key)
    if m.IsEmpty() { return "" } hash := int(m.hash([]byte(key))) //使用一致性hash函數計算"緩存數據"key的hash值 // Binary search for appropriate replica.
    idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash }) //選取最小的大於 key的hash值 的 節點hash值 // Means we have cycled back to the first replica.
    if idx == len(m.keys) { idx = 0 } return m.hashMap[m.keys[idx]] //返回節點(url)
} //httppool實現了http Handler,可以給peers提供http服務,用來查詢緩存
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Parse request.
    if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) { panic("HTTPPool serving unexpected path: " + r.URL.Path) } parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2) if len(parts) != 2 { http.Error(w, "bad request", http.StatusBadRequest) return } groupName := parts[0] key := parts[1] // Fetch the value for this group/key.
    group := GetGroup(groupName) //獲取group名字,因為可以有多個group
    if group == nil { http.Error(w, "no such group: "+groupName, http.StatusNotFound) return } var ctx Context if p.Context != nil { ctx = p.Context(r) } group.Stats.ServerRequests.Add(1) //設置統計數據
    var value []byte err := group.Get(ctx, key, AllocatingByteSliceSink(&value)) //此處和groupcache使用處一致
    if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } // Write the value to the response body as a proto message.
    body, err := proto.Marshal(&pb.GetResponse{Value: value}) //protobuf協議編碼數據
    if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/x-protobuf") w.Write(body) } //向其他節點發起http請求,獲取緩存數據
func (h *httpGetter) Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error { u := fmt.Sprintf( "%v%v/%v", h.baseURL, url.QueryEscape(in.GetGroup()), url.QueryEscape(in.GetKey()), ) req, err := http.NewRequest("GET", u, nil) if err != nil { return err } tr := http.DefaultTransport if h.transport != nil { tr = h.transport(context) } res, err := tr.RoundTrip(req) if err != nil { return err } defer res.Body.Close() if res.StatusCode != http.StatusOK { return fmt.Errorf("server returned: %v", res.Status) } b := bufferPool.Get().(*bytes.Buffer) b.Reset() defer bufferPool.Put(b) _, err = io.Copy(b, res.Body) if err != nil { return fmt.Errorf("reading response body: %v", err) } err = proto.Unmarshal(b.Bytes(), out) if err != nil { return fmt.Errorf("decoding response body: %v", err) } return nil }

 

2. group的作用是管理緩存,並通過httppool選取節點,獲取其他節點的緩存。

type Group struct { name string //group 名字
    getter     Getter //getter 當緩存中不存在對應數據時,使用該函數獲取數據並緩存
 peersOnce sync.Once peers PeerPicker //http實現了該接口,使用 func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) 函數選取節點
    cacheBytes int64 // limit for sum of mainCache and hotCache size //緩存最大空間 byte // mainCache is a cache of the keys for which this process // (amongst its peers) is authoritative. That is, this cache // contains keys which consistent hash on to this process's // peer number.
    mainCache cache //使用lru策略實現的緩存結構,也是key hash值在本地的緩存 // hotCache contains keys/values for which this peer is not // authoritative (otherwise they would be in mainCache), but // are popular enough to warrant mirroring in this process to // avoid going over the network to fetch from a peer. Having // a hotCache avoids network hotspotting, where a peer's // network card could become the bottleneck on a popular key. // This cache is used sparingly to maximize the total number // of key/value pairs that can be stored globally.
    hotCache cache //使用lru策略實現的緩存結構,key hash值不再本地,作為熱點緩存,負載均衡 // loadGroup ensures that each key is only fetched once // (either locally or remotely), regardless of the number of // concurrent callers.
    loadGroup flightGroup //使用該結構保證當緩存中不存在key對應的數據時,只有一個goroutine 調用getter函數取數據,其他正在並發的goroutine會等待直到第一個goroutine返回數據,然后大家一起返回數據 // Stats are statistics on the group.
    Stats Stats //統計信息
} //創建group
func NewGroup(name string, cacheBytes int64, getter Getter) *Group { return newGroup(name, cacheBytes, getter, nil) } func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group { if getter == nil { panic("nil Getter") } mu.Lock() defer mu.Unlock() initPeerServerOnce.Do(callInitPeerServer) if _, dup := groups[name]; dup { panic("duplicate registration of group " + name) } g := &Group{ name: name, //maincache、hotcache、peerPick都是在函數調用過程中賦值或初始化的
 getter: getter, peers: peers, cacheBytes: cacheBytes, loadGroup: &singleflight.Group{}, } if fn := newGroupHook; fn != nil { fn(g) //此處函數空
 } groups[name] = g //保存創建的group
    return g } //group查找
func (g *Group) Get(ctx Context, key string, dest Sink) error { g.peersOnce.Do(g.initPeers) //把httppool賦值給 groupcache.PeerPicker
    g.Stats.Gets.Add(1) //統計信息
    if dest == nil { return errors.New("groupcache: nil dest Sink") } value, cacheHit := g.lookupCache(key) //從maincache、hotcache查找

    if cacheHit { g.Stats.CacheHits.Add(1) return setSinkView(dest, value) } // Optimization to avoid double unmarshalling or copying: keep // track of whether the dest was already populated. One caller // (if local) will set this; the losers will not. The common // case will likely be one caller.
    destPopulated := false value, destPopulated, err := g.load(ctx, key, dest) //從對等節點或自定義查找邏輯(getter)中獲取數據
    if err != nil { return err } if destPopulated { return nil } return setSinkView(dest, value) //把數據設置給sink
} //從maincache、hotcache查找,cache底層使用鏈表實現並使用lru策略修改鏈表
func (g *Group) lookupCache(key string) (value ByteView, ok bool) { if g.cacheBytes <= 0 { return } value, ok = g.mainCache.get(key) if ok { return } value, ok = g.hotCache.get(key) return } //從對等節點或自定義查找邏輯(getter)中獲取數據
func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) { g.Stats.Loads.Add(1) viewi, err := g.loadGroup.Do(key, func() (interface{}, error) { //此函數使用flightGroup執行策略,保證只有一個goroutine 調用getter函數取數據 // Check the cache again because singleflight can only dedup calls // that overlap concurrently. It's possible for 2 concurrent // requests to miss the cache, resulting in 2 load() calls. An // unfortunate goroutine scheduling would result in this callback // being run twice, serially. If we don't check the cache again, // cache.nbytes would be incremented below even though there will // be only one entry for this key. //
        // Consider the following serialized event ordering for two // goroutines in which this callback gets called twice for hte // same key: // 1: Get("key") //展示了一個有可能2個以上的goroutine同時執行進入了load,這樣會導致同一個key對應的數據被多次獲取並統計,所以又執行了一次g.lookupCache(key) // 2: Get("key") // 1: lookupCache("key") // 2: lookupCache("key") // 1: load("key") // 2: load("key") // 1: loadGroup.Do("key", fn) // 1: fn() // 2: loadGroup.Do("key", fn) // 2: fn()
        if value, cacheHit := g.lookupCache(key); cacheHit { g.Stats.CacheHits.Add(1) return value, nil } g.Stats.LoadsDeduped.Add(1) var value ByteView var err error if peer, ok := g.peers.PickPeer(key); ok { //通過一致性hash獲取對等節點,與httppool對應
            value, err = g.getFromPeer(ctx, peer, key) //構造protobuf數據,向其他節點發起http請求,查找數據,並存儲到hotcache
            if err == nil { g.Stats.PeerLoads.Add(1) return value, nil } g.Stats.PeerErrors.Add(1) // TODO(bradfitz): log the peer's error? keep // log of the past few for /groupcachez? It's // probably boring (normal task movement), so not // worth logging I imagine.
 } value, err = g.getLocally(ctx, key, dest) //調用getter函數獲取數據,並存儲到maincache
        if err != nil { g.Stats.LocalLoadErrs.Add(1) return nil, err } g.Stats.LocalLoads.Add(1) destPopulated = true // only one caller of load gets this return value
        g.populateCache(key, value, &g.mainCache) return value, nil }) if err == nil { value = viewi.(ByteView) } return }

 

 

參考

groupcache:https://github.com/golang/groupcache

一致性哈希:http://blog.codinglabs.org/articles/consistent-hashing.html

protobuf:https://developers.google.com/protocol-buffers/docs/overviewgo

protobuf例子:https://godoc.org/github.com/golang/protobuf/proto


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM