原文鏈接:https://segmentfault.com/a/1190000018448064
作者:薛薛薛
分斷鎖
type SimpleCache struct { mu sync.RWMutex items map[interface{}]*simpleItem }
在日常開發中, 上述這種數據結構肯定不少見,因為golang的原生map是非並發安全的,所以為了保證map的並發安全,最簡單的方式就是給map加鎖。
之前使用過兩個本地內存緩存的開源庫, gcache, cache2go,其中存儲緩存對象的結構都是這樣,對於輕量級的緩存庫,為了設計簡潔(包含清理過期對象等 ) 再加上當需要緩存大量數據時有redis,memcache等明星項目解決。 但是如果拋開這些因素遇到真正數量巨大的數據量時,直接對一個map加鎖,當map中的值越來越多,訪問map的請求越來越多,大家都競爭這一把鎖顯得並發訪問控制變重。 在go1.9引入sync.Map 之前,比較流行的做法就是使用分段鎖,顧名思義就是將鎖分段,將鎖的粒度變小,將存儲的對象分散到各個分片中,每個分片由一把鎖控制,這樣使得當需要對在A分片上的數據進行讀寫時不會影響B分片的讀寫。
分段鎖的實現
// Map 分片 type ConcurrentMap []*ConcurrentMapShared // 每一個Map 是一個加鎖的並發安全Map type ConcurrentMapShared struct { items map[string]interface{} sync.RWMutex // 各個分片Map各自的鎖 }
主流的分段鎖,即通過hash取模的方式找到當前訪問的key處於哪一個分片之上,再對該分片進行加鎖之后再讀寫。分片定位時,常用有BKDR, FNV32等hash算法得到key的hash值。
func New() ConcurrentMap { // SHARD_COUNT 默認32個分片 m := make(ConcurrentMap, SHARD_COUNT) for i := 0; i < SHARD_COUNT; i++ { m[i] = &ConcurrentMapShared{ items: make(map[string]interface{}), } } return m }
在初始化好分片后, 對分片上的數據進行讀寫時就需要用hash取模進行分段定位來確認即將要讀寫的分片。
獲取段定位
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared { return m[uint(fnv32(key))%uint(SHARD_COUNT)] } // FNV hash func fnv32(key string) uint32 { hash := uint32(2166136261) const prime32 = uint32(16777619) for i := 0; i < len(key); i++ { hash *= prime32 hash ^= uint32(key[i]) } return hash }
之后對於map的GET SET 就簡單順利成章的完成
Set And Get
func (m ConcurrentMap) Set(key string, value interface{}) { shard := m.GetShard(key) // 段定位找到分片 shard.Lock() // 分片上鎖 shard.items[key] = value // 分片操作 shard.Unlock() // 分片解鎖 } func (m ConcurrentMap) Get(key string) (interface{}, bool) { shard := m.GetShard(key) shard.RLock() val, ok := shard.items[key] shard.RUnlock() return val, ok }
由此一個分段鎖Map就實現了, 但是比起普通的Map, 常用到的方法比如獲取所有key, 獲取所有Val 操作是要比原生Map復雜的,因為要遍歷每一個分片的每一個數據, 好在golang的並發特性使得解決這類問題變得非常簡單
Keys
// 統計當前分段map中item的個數 func (m ConcurrentMap) Count() int { count := 0 for i := 0; i < SHARD_COUNT; i++ { shard := m[i] shard.RLock() count += len(shard.items) shard.RUnlock() } return count } // 獲取所有的key func (m ConcurrentMap) Keys() []string { count := m.Count() ch := make(chan string, count) // 每一個分片啟動一個協程 遍歷key go func() { wg := sync.WaitGroup{} wg.Add(SHARD_COUNT) for _, shard := range m { go func(shard *ConcurrentMapShared) { defer wg.Done() shard.RLock() // 每個分片中的key遍歷后都寫入統計用的channel for key := range shard.items { ch <- key } shard.RUnlock() }(shard) } wg.Wait() close(ch) }() keys := make([]string, count) // 統計各個協程並發讀取Map分片的key for k := range ch { keys = append(keys, k) } return keys }
這里寫了一個benchMark來對該分段鎖Map和原生的Map加鎖方式進行壓測, 場景為將一萬個不重復的鍵值對同時以100萬次寫和100萬次讀,分別進行5次壓測, 如下壓測代碼
func BenchmarkMapShared(b *testing.B) { num := 10000 testCase := genNoRepetTestCase(num) // 10000個不重復的鍵值對 m := New() for _, v := range testCase { m.Set(v.Key, v.Val) } b.ResetTimer() for i := 0; i < 5; i++ { b.Run(strconv.Itoa(i), func(b *testing.B) { b.N = 1000000 wg := sync.WaitGroup{} wg.Add(b.N * 2) for i := 0; i < b.N; i++ { e := testCase[rand.Intn(num)] go func(key string, val interface{}) { m.Set(key, val) wg.Done() }(e.Key, e.Val) go func(key string) { _, _ = m.Get(key) wg.Done() }(e.Key) } wg.Wait() }) } }
原生Map加鎖壓測結果
分段鎖壓測結果
可以看出在將鎖的粒度細化后再面對大量需要控制並發安全的訪問時,分段鎖Map的耗時比原生Map加鎖要快3倍有余