本文是 golang 實現 redis 系列的第三篇, 主要介紹如何實現內存KV數據庫。本文完整源代碼在作者Github: HDT3213/godis
Concurrent Hash Map
KV 內存數據庫的核心是並發安全的哈希表,常見的設計有幾種:
-
sync.map: golang 官方提供的並發哈希表, 適合讀多寫少的場景。但是在 m.dirty 剛被提升后會將 m.read 復制到新的 m.dirty 中,在數據量較大的情況下復制操作會阻塞所有協程,存在較大的隱患。關於 sync.map 的詳細討論推薦閱讀鳥窩:sync.Map揭秘
-
juc.ConcurrentHashMap: java 的並發哈希表采用分段鎖實現。在進行擴容時訪問哈希表線程都將協助進行 rehash 操作,在 rehash 結束前所有的讀寫操作都會阻塞。因為緩存數據庫中鍵值對數量巨大且對讀寫操作響應時間要求較高,使用juc的策略是不合適的。
-
memcached hashtable: 在后台線程進行 rehash 操作時,主線程會判斷要訪問的哈希槽是否已被 rehash 從而決定操作 old_hashtable 還是操作 primary_hashtable。
memcached hashtable 的漸進式 Rehash 策略使主線程和rehash線程之間的 data race 限制在哈希槽內,最小化rehash操作對讀寫操作的影響,這是最理想的實現方式。但由於作者才疏學淺無法使用 golang 實現該策略故忍痛放棄(主要原因在於 golang 沒有 volatile 關鍵字, 保證可見性的操作非常復雜),歡迎各位讀者討論。
本文采用 golang 社區廣泛使用的分段鎖策略。我們將 key 分散到固定數量的 shard 中避免 rehash 操作。shard 是有鎖保護的 map, 當 shard 進行 rehash 時會阻塞shard內的讀寫,但不會對其他 shard 造成影響。
這種策略簡單可靠易於實現,但由於需要兩次 hash 性能略差。這個 dict 完整源碼在github.com/hdt3213/godis/datastruct/dict/concurrent.go 可以獨立使用(雖然也沒有什么用。。。)。
定義數據結構:
type ConcurrentDict struct {
table []*Shard
count int32
}
type Shard struct {
m map[string]interface{}
mutex sync.RWMutex
}
在構造時初始化 shard,這個操作相對比較耗時:
func computeCapacity(param int) (size int) {
if param <= 16 {
return 16
}
n := param - 1
n |= n >> 1
n |= n >> 2
n |= n >> 4
n |= n >> 8
n |= n >> 16
if n < 0 {
return math.MaxInt32
} else {
return int(n + 1)
}
}
func MakeConcurrent(shardCount int) *ConcurrentDict {
shardCount = computeCapacity(shardCount)
table := make([]*Shard, shardCount)
for i := 0; i < shardCount; i++ {
table[i] = &Shard{
m: make(map[string]interface{}),
}
}
d := &ConcurrentDict{
count: 0,
table: table,
}
return d
}
哈希算法選擇FNV算法:
const prime32 = uint32(16777619)
func fnv32(key string) uint32 {
hash := uint32(2166136261)
for i := 0; i < len(key); i++ {
hash *= prime32
hash ^= uint32(key[i])
}
return hash
}
定位shard, 當n為2的整數冪時 h % n == (n - 1) & h
func (dict *ConcurrentDict) spread(hashCode uint32) uint32 {
if dict == nil {
panic("dict is nil")
}
tableSize := uint32(len(dict.table))
return (tableSize - 1) & uint32(hashCode)
}
func (dict *ConcurrentDict) getShard(index uint32) *Shard {
if dict == nil {
panic("dict is nil")
}
return dict.table[index]
}
Get 和 Put 方法實現:
func (dict *ConcurrentDict) Get(key string) (val interface{}, exists bool) {
if dict == nil {
panic("dict is nil")
}
hashCode := fnv32(key)
index := dict.spread(hashCode)
shard := dict.getShard(index)
shard.mutex.RLock()
defer shard.mutex.RUnlock()
val, exists = shard.m[key]
return
}
func (dict *ConcurrentDict) Len() int {
if dict == nil {
panic("dict is nil")
}
return int(atomic.LoadInt32(&dict.count))
}
// return the number of new inserted key-value
func (dict *ConcurrentDict) Put(key string, val interface{}) (result int) {
if dict == nil {
panic("dict is nil")
}
hashCode := fnv32(key)
index := dict.spread(hashCode)
shard := dict.getShard(index)
shard.mutex.Lock()
defer shard.mutex.Unlock()
if _, ok := shard.m[key]; ok {
shard.m[key] = val
return 0
} else {
shard.m[key] = val
dict.addCount()
return 1
}
}
LockMap
上一節實現的ConcurrentMap 可以保證對單個 key 操作的並發安全性,但是仍然無法滿足需求:
- Incr 命令需要完成: 讀取 -> 做加法 -> 寫入 三步操作
- MSETNX 命令當且僅當所有給定鍵都不存在時所有給定鍵設置值, 因此我們需要鎖定所有給定的鍵直到完成所有鍵的檢查和設置
因此我們需要實現 db.Locker 用於鎖定一個或一組 key 並在我們需要的時候釋放鎖。
實現 db.Locker 最直接的想法是使用一個 map[string]*sync.RWMutex, 加鎖過程分為兩步: 初始化對應的鎖 -> 加鎖, 解鎖過程也分為兩步: 解鎖 -> 釋放對應的鎖。那么存在一個無法解決的並發問題:
| 時間 | 協程A | 協程B |
|---|---|---|
| 1 | locker["a"].Unlock() | |
| 2 | locker["a"] = &sync.RWMutex{} | |
| 3 | delete(locker["a"]) | |
| 4 | locker["a"].Lock() |
由於 t3 時協程B釋放了鎖,t4 時協程A試圖加鎖會失敗。
若我們在解鎖時不釋放鎖就可以避免該異常的發生,但是每個曾經使用過的鎖都無法釋放從而造成嚴重的內存泄露。
我們注意到哈希表的長度遠少於可能的鍵的數量,反過來說多個鍵可以共用一個哈希槽。若我們不為單個鍵加鎖而是為它所在的哈希槽加鎖,因為哈希槽的數量非常少即使不釋放鎖也不會占用太多內存。
作者根據這種思想實現了 LockMap 來解決並發控制問題。
type Locks struct {
table []*sync.RWMutex
}
func Make(tableSize int) *Locks {
table := make([]*sync.RWMutex, tableSize)
for i := 0; i < tableSize; i++ {
table[i] = &sync.RWMutex{}
}
return &Locks{
table: table,
}
}
func (locks *Locks)Lock(key string) {
index := locks.spread(fnv32(key))
mu := locks.table[index]
mu.Lock()
}
func (locks *Locks)UnLock(key string) {
index := locks.spread(fnv32(key))
mu := locks.table[index]
mu.Unlock()
}
哈希算法已經在 Dict 一節介紹過不再贅述。
在鎖定多個key時需要注意,若協程A持有鍵a的鎖試圖獲得鍵b的鎖,此時協程B持有鍵b的鎖試圖獲得鍵a的鎖則會形成死鎖。
解決方法是所有協程都按照相同順序加鎖,若兩個協程都想獲得鍵a和鍵b的鎖,那么必須先獲取鍵a的鎖后獲取鍵b的鎖,這樣就可以避免循環等待。
func (locks *Locks) toLockIndices(keys []string, reverse bool) []uint32 {
indexMap := make(map[uint32]bool)
for _, key := range keys {
index := locks.spread(fnv32(key))
indexMap[index] = true
}
indices := make([]uint32, 0, len(indexMap))
for index := range indexMap {
indices = append(indices, index)
}
sort.Slice(indices, func(i, j int) bool {
if !reverse {
return indices[i] < indices[j]
} else {
return indices[i] > indices[j]
}
})
return indices
}
// 允許 writeKeys 和 readKeys 中存在重復的 key
func (locks *Locks) RWLocks(writeKeys []string, readKeys []string) {
keys := append(writeKeys, readKeys...)
indices := locks.toLockIndices(keys, false)
writeIndices := locks.toLockIndices(writeKeys, false)
writeIndexSet := make(map[uint32]struct{})
for _, idx := range writeIndices {
writeIndexSet[idx] = struct{}{}
}
for _, index := range indices {
_, w := writeIndexSet[index]
mu := locks.table[index]
if w {
mu.Lock()
} else {
mu.RLock()
}
}
}
func (locks *Locks) RWUnLocks(writeKeys []string, readKeys []string) {
keys := append(writeKeys, readKeys...)
indices := locks.toLockIndices(keys, true)
writeIndices := locks.toLockIndices(writeKeys, true)
writeIndexSet := make(map[uint32]struct{})
for _, idx := range writeIndices {
writeIndexSet[idx] = struct{}{}
}
for _, index := range indices {
_, w := writeIndexSet[index]
mu := locks.table[index]
if w {
mu.Unlock()
} else {
mu.RUnlock()
}
}
}
TTL
Time To Live (TTL) 功能可以為 key 設置失效時間。它的核心是存儲 key -> expireTime 的 map 以及自動刪除過期的 key 的時間輪。
完整代碼在github.com/hdt3213/godis/db.go
func genExpireTask(key string) string {
return "expire:" + key
}
// Expire sets ttlCmd of key
func (db *DB) Expire(key string, expireTime time.Time) {
db.stopWorld.Wait()
db.ttlMap.Put(key, expireTime) // 記錄過期時間
taskKey := genExpireTask(key)
// 通過時間輪設置定時任務,在 key 過期后自動刪除
timewheel.At(expireTime, taskKey, func() {
// 采用 check-lock-check 防止在等待鎖期間,其它協程修改了 ttl 設置
db.Lock(key)
defer db.UnLock(key)
logger.Info("expire " + key)
rawExpireTime, ok := db.ttlMap.Get(key)
if !ok {
return
}
expireTime, _ := rawExpireTime.(time.Time)
expired := time.Now().After(expireTime)
if expired {
db.Remove(key)
}
})
}
// Persist cancel ttlCmd of key
func (db *DB) Persist(key string) {
db.stopWorld.Wait()
db.ttlMap.Remove(key)
taskKey := genExpireTask(key)
timewheel.Cancel(taskKey)
}
// IsExpired check whether a key is expired
func (db *DB) IsExpired(key string) bool {
db.stopWorld.Wait()
db.Lock(key)
defer db.UnLock(key)
rawExpireTime, ok := db.ttlMap.Get(key)
if !ok {
return false
}
expireTime, _ := rawExpireTime.(time.Time)
expired := time.Now().After(expireTime)
if expired {
db.Remove(key)
}
return expired
}
