Golang 實現 Redis(3): 實現內存數據庫


本文是 golang 實現 redis 系列的第三篇, 主要介紹如何實現內存KV數據庫。本文完整源代碼在作者Github: HDT3213/godis

Concurrent Hash Map

KV 內存數據庫的核心是並發安全的哈希表,常見的設計有幾種:

  • sync.map: golang 官方提供的並發哈希表, 適合讀多寫少的場景。但是在 m.dirty 剛被提升后會將 m.read 復制到新的 m.dirty 中,在數據量較大的情況下復制操作會阻塞所有協程,存在較大的隱患。關於 sync.map 的詳細討論推薦閱讀鳥窩:sync.Map揭秘

  • juc.ConcurrentHashMap: java 的並發哈希表采用分段鎖實現。在進行擴容時訪問哈希表線程都將協助進行 rehash 操作,在 rehash 結束前所有的讀寫操作都會阻塞。因為緩存數據庫中鍵值對數量巨大且對讀寫操作響應時間要求較高,使用juc的策略是不合適的。

  • memcached hashtable: 在后台線程進行 rehash 操作時,主線程會判斷要訪問的哈希槽是否已被 rehash 從而決定操作 old_hashtable 還是操作 primary_hashtable。

memcached hashtable 的漸進式 Rehash 策略使主線程和rehash線程之間的 data race 限制在哈希槽內,最小化rehash操作對讀寫操作的影響,這是最理想的實現方式。但由於作者才疏學淺無法使用 golang 實現該策略故忍痛放棄(主要原因在於 golang 沒有 volatile 關鍵字, 保證可見性的操作非常復雜),歡迎各位讀者討論。

本文采用 golang 社區廣泛使用的分段鎖策略。我們將 key 分散到固定數量的 shard 中避免 rehash 操作。shard 是有鎖保護的 map, 當 shard 進行 rehash 時會阻塞shard內的讀寫,但不會對其他 shard 造成影響。

這種策略簡單可靠易於實現,但由於需要兩次 hash 性能略差。這個 dict 完整源碼在github.com/hdt3213/godis/datastruct/dict/concurrent.go 可以獨立使用(雖然也沒有什么用。。。)。

定義數據結構:

type ConcurrentDict struct {
    table []*Shard
    count int32
}

type Shard struct {
    m     map[string]interface{}
    mutex sync.RWMutex
}

在構造時初始化 shard,這個操作相對比較耗時:

func computeCapacity(param int) (size int) {
	if param <= 16 {
		return 16
	}
	n := param - 1
	n |= n >> 1
	n |= n >> 2
	n |= n >> 4
	n |= n >> 8
	n |= n >> 16
	if n < 0 {
		return math.MaxInt32
	} else {
		return int(n + 1)
	}
}

func MakeConcurrent(shardCount int) *ConcurrentDict {
    shardCount = computeCapacity(shardCount)
    table := make([]*Shard, shardCount)
    for i := 0; i < shardCount; i++ {
        table[i] = &Shard{
            m: make(map[string]interface{}),
        }
    }
    d := &ConcurrentDict{
        count: 0,
        table: table,
    }
    return d
}

哈希算法選擇FNV算法:

const prime32 = uint32(16777619)

func fnv32(key string) uint32 {
    hash := uint32(2166136261)
    for i := 0; i < len(key); i++ {
        hash *= prime32
        hash ^= uint32(key[i])
    }
    return hash
}

定位shard, 當n為2的整數冪時 h % n == (n - 1) & h

func (dict *ConcurrentDict) spread(hashCode uint32) uint32 {
	if dict == nil {
		panic("dict is nil")
	}
	tableSize := uint32(len(dict.table))
	return (tableSize - 1) & uint32(hashCode)
}

func (dict *ConcurrentDict) getShard(index uint32) *Shard {
	if dict == nil {
		panic("dict is nil")
	}
	return dict.table[index]
}

Get 和 Put 方法實現:

func (dict *ConcurrentDict) Get(key string) (val interface{}, exists bool) {
	if dict == nil {
		panic("dict is nil")
	}
	hashCode := fnv32(key)
	index := dict.spread(hashCode)
	shard := dict.getShard(index)
	shard.mutex.RLock()
	defer shard.mutex.RUnlock()
	val, exists = shard.m[key]
	return
}

func (dict *ConcurrentDict) Len() int {
	if dict == nil {
		panic("dict is nil")
	}
	return int(atomic.LoadInt32(&dict.count))
}

// return the number of new inserted key-value
func (dict *ConcurrentDict) Put(key string, val interface{}) (result int) {
	if dict == nil {
		panic("dict is nil")
	}
	hashCode := fnv32(key)
	index := dict.spread(hashCode)
	shard := dict.getShard(index)
	shard.mutex.Lock()
	defer shard.mutex.Unlock()

	if _, ok := shard.m[key]; ok {
		shard.m[key] = val
		return 0
	} else {
		shard.m[key] = val
		dict.addCount()
		return 1
	}
}

LockMap

上一節實現的ConcurrentMap 可以保證對單個 key 操作的並發安全性,但是仍然無法滿足需求:

  1. Incr 命令需要完成: 讀取 -> 做加法 -> 寫入 三步操作
  2. MSETNX 命令當且僅當所有給定鍵都不存在時所有給定鍵設置值, 因此我們需要鎖定所有給定的鍵直到完成所有鍵的檢查和設置

因此我們需要實現 db.Locker 用於鎖定一個或一組 key 並在我們需要的時候釋放鎖。

實現 db.Locker 最直接的想法是使用一個 map[string]*sync.RWMutex, 加鎖過程分為兩步: 初始化對應的鎖 -> 加鎖, 解鎖過程也分為兩步: 解鎖 -> 釋放對應的鎖。那么存在一個無法解決的並發問題:

時間 協程A 協程B
1 locker["a"].Unlock()
2 locker["a"] = &sync.RWMutex{}
3 delete(locker["a"])
4 locker["a"].Lock()

由於 t3 時協程B釋放了鎖,t4 時協程A試圖加鎖會失敗。

若我們在解鎖時不釋放鎖就可以避免該異常的發生,但是每個曾經使用過的鎖都無法釋放從而造成嚴重的內存泄露。

我們注意到哈希表的長度遠少於可能的鍵的數量,反過來說多個鍵可以共用一個哈希槽。若我們不為單個鍵加鎖而是為它所在的哈希槽加鎖,因為哈希槽的數量非常少即使不釋放鎖也不會占用太多內存。

作者根據這種思想實現了 LockMap 來解決並發控制問題。

type Locks struct {
    table []*sync.RWMutex
}

func Make(tableSize int) *Locks {
    table := make([]*sync.RWMutex, tableSize)
    for i := 0; i < tableSize; i++ {
        table[i] = &sync.RWMutex{}
    }
    return &Locks{
        table: table,
    }
}

func (locks *Locks)Lock(key string) {
    index := locks.spread(fnv32(key))
    mu := locks.table[index]
    mu.Lock()
}

func (locks *Locks)UnLock(key string) {
    index := locks.spread(fnv32(key))
    mu := locks.table[index]
    mu.Unlock()
}

哈希算法已經在 Dict 一節介紹過不再贅述。

在鎖定多個key時需要注意,若協程A持有鍵a的鎖試圖獲得鍵b的鎖,此時協程B持有鍵b的鎖試圖獲得鍵a的鎖則會形成死鎖。

解決方法是所有協程都按照相同順序加鎖,若兩個協程都想獲得鍵a和鍵b的鎖,那么必須先獲取鍵a的鎖后獲取鍵b的鎖,這樣就可以避免循環等待。

func (locks *Locks) toLockIndices(keys []string, reverse bool) []uint32 {
    indexMap := make(map[uint32]bool)
    for _, key := range keys {
        index := locks.spread(fnv32(key))
        indexMap[index] = true
    }
    indices := make([]uint32, 0, len(indexMap))
    for index := range indexMap {
        indices = append(indices, index)
    }
    sort.Slice(indices, func(i, j int) bool {
        if !reverse {
            return indices[i] < indices[j]
        } else {
            return indices[i] > indices[j]
        }
    })
    return indices
}

// 允許 writeKeys 和 readKeys 中存在重復的 key
func (locks *Locks) RWLocks(writeKeys []string, readKeys []string) {
	keys := append(writeKeys, readKeys...)
	indices := locks.toLockIndices(keys, false)
	writeIndices := locks.toLockIndices(writeKeys, false)
	writeIndexSet := make(map[uint32]struct{})
	for _, idx := range writeIndices {
		writeIndexSet[idx] = struct{}{}
	}
	for _, index := range indices {
		_, w := writeIndexSet[index]
		mu := locks.table[index]
		if w {
			mu.Lock()
		} else {
			mu.RLock()
		}
	}
}

func (locks *Locks) RWUnLocks(writeKeys []string, readKeys []string) {
	keys := append(writeKeys, readKeys...)
	indices := locks.toLockIndices(keys, true)
	writeIndices := locks.toLockIndices(writeKeys, true)
	writeIndexSet := make(map[uint32]struct{})
	for _, idx := range writeIndices {
		writeIndexSet[idx] = struct{}{}
	}
	for _, index := range indices {
		_, w := writeIndexSet[index]
		mu := locks.table[index]
		if w {
			mu.Unlock()
		} else {
			mu.RUnlock()
		}
	}
}

TTL

Time To Live (TTL) 功能可以為 key 設置失效時間。它的核心是存儲 key -> expireTime 的 map 以及自動刪除過期的 key 的時間輪。

完整代碼在github.com/hdt3213/godis/db.go

func genExpireTask(key string) string {
	return "expire:" + key
}

// Expire sets ttlCmd of key
func (db *DB) Expire(key string, expireTime time.Time) {
	db.stopWorld.Wait()
	db.ttlMap.Put(key, expireTime) // 記錄過期時間
	taskKey := genExpireTask(key)
    // 通過時間輪設置定時任務,在 key 過期后自動刪除
	timewheel.At(expireTime, taskKey, func() {
        // 采用 check-lock-check 防止在等待鎖期間,其它協程修改了 ttl 設置
		db.Lock(key)
		defer db.UnLock(key)
		logger.Info("expire " + key)
		rawExpireTime, ok := db.ttlMap.Get(key)
		if !ok {
			return
		}
		expireTime, _ := rawExpireTime.(time.Time)
		expired := time.Now().After(expireTime)
		if expired {
			db.Remove(key)
		}
	})
}

// Persist cancel ttlCmd of key
func (db *DB) Persist(key string) {
	db.stopWorld.Wait()
	db.ttlMap.Remove(key)
	taskKey := genExpireTask(key)
	timewheel.Cancel(taskKey)
}

// IsExpired check whether a key is expired
func (db *DB) IsExpired(key string) bool {
    db.stopWorld.Wait()
    db.Lock(key)
	defer db.UnLock(key)
	rawExpireTime, ok := db.ttlMap.Get(key)
	if !ok {
		return false
	}
	expireTime, _ := rawExpireTime.(time.Time)
	expired := time.Now().After(expireTime)
	if expired {
		db.Remove(key)
	}
	return expired
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM