sync.map
前言
Go中的map不是並發安全的,在Go1.9之后,引入了sync.Map
,並發安全的map。
深入了解下
對於map,我們常用的做法就是加鎖。
對於sync.Map
這些操作則是不需要的,來看下sync.Map
的特點:
- 1、以空間換效率,通過read和dirty兩個map來提高讀取效率
- 2、優先從read map中讀取(無鎖),否則再從dirty map中讀取(加鎖)
- 3、動態調整,當misses次數過多時,將dirty map提升為read map
- 4、延遲刪除,刪除只是為value打一個標記,在dirty map提升時才執行真正的刪除
簡單的使用栗子:
func syncMapDemo() {
var smp sync.Map
// 數據寫入
smp.Store("name", "小紅")
smp.Store("age", 18)
// 數據讀取
name, _ := smp.Load("name")
fmt.Println(name)
age, _ := smp.Load("age")
fmt.Println(age)
// 遍歷
smp.Range(func(key, value interface{}) bool {
fmt.Println(key, value)
return true
})
// 刪除
smp.Delete("age")
age, ok := smp.Load("age")
fmt.Println("刪除后的查詢")
fmt.Println(age, ok)
// 讀取或寫入,存在就讀取,不存在就寫入
smp.LoadOrStore("age", 100)
age, _ = smp.Load("age")
fmt.Println("不存在")
fmt.Println(age)
smp.LoadOrStore("age", 99)
age, _ = smp.Load("age")
fmt.Println("存在")
fmt.Println(age)
}
查看下具體的實現
// sync/map.go
type Map struct {
// 當寫read map 或讀寫dirty map時 需要上鎖
mu Mutex
// read map的 k v(entry) 是不變的,刪除只是打標記,插入新key會加鎖寫到dirty中
// 因此對read map的讀取無需加鎖
read atomic.Value // 保存readOnly結構體
// dirty map 對dirty map的操作需要持有mu鎖
dirty map[interface{}]*entry
// 當Load操作在read map中未找到,嘗試從dirty中進行加載時(不管是否存在),misses+1
// 當misses達到diry map len時,dirty被提升為read 並且重新分配dirty
misses int
}
// read map數據結構
type readOnly struct {
m map[interface{}]*entry
// 為true時代表dirty map中含有m中沒有的元素
amended bool
}
type entry struct {
// 指向實際的interface{}
// p有三種狀態:
// p == nil: 鍵值已經被刪除,此時,m.dirty==nil 或 m.dirty[k]指向該entry
// p == expunged: 鍵值已經被刪除, 此時, m.dirty!=nil 且 m.dirty不存在該鍵值
// 其它情況代表實際interface{}地址 如果m.dirty!=nil 則 m.read[key] 和 m.dirty[key] 指向同一個entry
// 當刪除key時,並不實際刪除,先CAS entry.p為nil 等到每次dirty map創建時(dirty提升后的第一次新建Key),會將entry.p由nil CAS為expunged
p unsafe.Pointer // *interface{}
}
read map
和 dirty map
的存儲方式是不一致的。
前者使用 atomic.Value
,后者只是單純的使用 map。原因是 read map
使用 lock free
操作,必須保證 load/store
的原子性;而 dirty map
的 load+store
操作是由 lock(就是 mu)來保護的。
1、read和dirty通過entry包裝value,這樣使得value的變化和map的變化隔離,前者可以用atomic無鎖完成
2、Map的read字段結構體定義為readOnly,這只是針對map[interface{}]*entry而言的,entry內的內容以及amended字段都是可以變的
3、大部分情況下,對已有key的刪除(entry.p置為nil)和更新可以直接通過修改entry.p來完成
Load
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
// 首先在通過atomic的原子操作讀取內容
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 如果沒在 read 中找到,並且 amended 為 true,即 dirty 中存在 read 中沒有的 key
if !ok && read.amended {
// read調用了atomic的原子性,所以不用加鎖,但是dirty map[interface{}]*entry就需要了,用的互斥鎖
m.mu.Lock()
// double check,避免在加鎖的時候dirty map提升為read map
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
// 還是沒有找到
if !ok && read.amended {
// 從 dirty 中找
e, ok = m.dirty[key]
// 不管dirty中有沒有找到 都增加misses計數 該函數可能將dirty map提升為readmap
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
// 從entry中atomic load實際interface{}
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
梳理下處理的邏輯:
1、首先是 fast path,直接在 read 中找,如果找到了直接調用 entry 的 load 方法,取出其中的值。
2、如果 read 中沒有這個 key,且 amended 為 fase,說明 dirty 為空,那直接返回 空和 false。
3、如果 read 中沒有這個 key,且 amended 為 true,說明 dirty 中可能存在我們要找的 key。當然要先上鎖,再嘗試去 dirty 中查找。在這之前,仍然有一個 double check 的操作。若還是沒有在 read 中找到,那么就從 dirty 中找。不管 dirty 中有沒有找到,都要"記一筆",因為在 dirty 被提升為 read 之前,都會進入這條路徑
// 增加misses計數,並在必要的時候提升dirty map
// 如果 misses 值小於 m.dirty 的長度,就直接返回。否則,將 m.dirty 晉升為 read,並清空 dirty,清空 misses 計數值。這樣,之前
// 一段時間新加入的 key 都會進入到 read 中,從而能夠提升 read 的命中率。
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
// 提升過程很簡單,直接將m.dirty賦給m.read.m
// 提升完成之后 amended == false m.dirty == nil
// m.dirty並不立即創建被拷貝元素,而是延遲創建
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
對於missLocked
會直接將 misses 的值加 1,表示一次未命中,如果 misses 值小於 m.dirty 的長度,就直接返回。否則,將 m.dirty 晉升為 read,並清空 dirty,清空 misses 計數值。這樣,之前一段時間新加入的 key 都會進入到 read 中,從而能夠提升 read 的命中率。
Store
// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
// 如果read map中存在該key 則嘗試直接更改(由於修改的是entry內部的pointer,因此dirty map也可見)
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// 如果 read map 中存在該 key,但 p == expunged,則說明 m.dirty != nil 並且 m.dirty 中不存在該 key 值 此時:
// a. 將 p 的狀態由 expunged 更改為 nil
// b. dirty map 插入 key
m.dirty[key] = e
}
// 更新 entry.p = value (read map 和 dirty map 指向同一個 entry)
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 如果 read map 中不存在該 key,但 dirty map 中存在該 key,直接寫入更新 entry(read map 中仍然沒有這個 key)
e.storeLocked(&value)
} else {
// 如果read map和dirty map中都不存在該key,則:
// a. 如果dirty map為空,則需要創建dirty map,並從read map中拷貝未刪除的元素
// b. 更新amended字段,標識dirty map中存在read map中沒有的key
// c. 將k v寫入dirty map中,read.m不變
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
// 嘗試直接更新entry 如果p == expunged 返回false
func (e *entry) tryStore(i *interface{}) bool {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
for {
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
}
}
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
// 如果 dirty map為nil,則從read map中拷貝元素到dirty map
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
// a. 將所有為 nil的 p 置為 expunged
// b. 只拷貝不為expunged 的 p
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
梳理下流程:
1、首先還是去read map中查詢,存在並且p!=expunged,直接修改。(由於修改的是 entry 內部的 pointer,因此 dirty map 也可見)
2、如果read map中存在該key,但p == expunged。加鎖更新p的狀態,然后直接更新該entry (此時m.dirtynil或m.dirty[key]e)
3、如果read map中不存在該Key,但dirty map中存在該key,直接寫入更新entry(read map中仍然沒有)
4、如果read map和dirty map都不存在該key
- a. 如果dirty map為空,則需要創建dirty map,並從read map中拷貝未刪除的元素
- b. 更新amended字段,標識dirty map中存在read map中沒有的key
- c. 將k v寫入dirty map中,read.m不變
Delete
// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
// 從read map中尋找
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 沒找到
if !ok && read.amended { // read.amended為true代表dirty map中含有m中沒有的元素
m.mu.Lock()
// double check
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
// 第二次仍然沒找到,但dirty map中存在,則直接從dirty map刪除
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
// 如果read存在,將entry.p 置為 nil
if ok {
e.delete()
}
}
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}
梳理下流程:
1、先去read map中尋找,如果存在就直接刪除
2、如果沒找到,並且 read.amended為true代表dirty map中存在,依照傳統進行 double check。
3、read map找到就刪除,沒找到判斷dirty map是否存在,存在了就刪除
LoadOrStore
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
// 首先還是先去read map中查詢
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}
m.mu.Lock()
// double check
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// 如果 read map 中存在該 key,但 p == expunged,則說明 m.dirty != nil 並且 m.dirty 中不存在該 key 值 此時:
// a. 將 p 的狀態由 expunged 更改為 nil
// b. dirty map 插入 key
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
// 如果 read map 中不存在該 key,但 dirty map 中存在該 key
actual, loaded, _ = e.tryLoadOrStore(value)
// 不管dirty中有沒有找到 都增加misses計數 該函數可能將dirty map提升為readmap
m.missLocked()
} else {
if !read.amended {
// 如果read map和dirty map中都不存在該key,則:
// a. 如果dirty map為空,則需要創建dirty map,並從read map中拷貝未刪除的元素
// b. 更新amended字段,標識dirty map中存在read map中沒有的key
// c. 將k v寫入dirty map中,read.m不變
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()
return actual, loaded
}
// 如果entry is expunged則不處理,返回false
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
// Copy the interface after the first load to make this method more amenable
// to escape analysis: if we hit the "load" path or the entry is expunged, we
// shouldn't bother heap-allocating.
ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
}
}
這個函數結合了 Load 和 Store 的功能,如果 map 中存在這個 key,那么返回這個 key 對應的 value;否則,將 key-value 存入 map。
這在需要先執行 Load 查看某個 key 是否存在,之后再更新此 key 對應的 value 時很有效,因為 LoadOrStore 可以並發執行。
總結
除了Load/Store/Delete之外,sync.Map還提供了LoadOrStore/Range操作,但沒有提供Len()方法,這是因為要統計有效的鍵值對只能先提升dirty map(dirty map中可能有read map中沒有的鍵值對),再遍歷m.read(由於延遲刪除,不是所有的鍵值對都有效),這其實就是Range做的事情,因此在不添加新數據結構支持的情況下,sync.Map的長度獲取和Range操作是同一復雜度的。這部分只能看官方后續支持。
1、sync.map 是線程安全的,讀取,插入,刪除也都保持着常數級的時間復雜度。
2、通過讀寫分離,降低鎖時間來提高效率,適用於讀多寫少的場景。
3、Range 操作需要提供一個函數,參數是 k,v,返回值是一個布爾值:f func(key, value interface{}) bool。
4、調用 Load 或 LoadOrStore 函數時,如果在 read 中沒有找到 key,則會將 misses 值原子地增加 1,當 misses 增加到和 dirty 的長度相等時,會將 dirty 提升為 read。以期減少“讀 miss”。
5、新寫入的 key 會保存到 dirty 中,如果這時 dirty 為 nil,就會先新創建一個 dirty,並將 read 中未被刪除的元素拷貝到 dirty。
6、當 dirty 為 nil 的時候,read 就代表 map 所有的數據;當 dirty 不為 nil 的時候,dirty 才代表 map 所有的數據。
流程圖片
最后附上一張不錯的圖片
參考
【Go sync.Map 實現】https://wudaijun.com/2018/02/go-sync-map-implement/
【深度解密 Go 語言之 sync.map】https://www.cnblogs.com/qcrao-2018/p/12833787.html
【圖片】http://russellluo.com/2017/06/go-sync-map-diagram.html