Go中sync.map使用小結


sync.map

前言

Go中的map不是並發安全的,在Go1.9之后,引入了sync.Map,並發安全的map。

深入了解下

對於map,我們常用的做法就是加鎖。

對於sync.Map這些操作則是不需要的,來看下sync.Map的特點:

  • 1、以空間換效率,通過read和dirty兩個map來提高讀取效率
  • 2、優先從read map中讀取(無鎖),否則再從dirty map中讀取(加鎖)
  • 3、動態調整,當misses次數過多時,將dirty map提升為read map
  • 4、延遲刪除,刪除只是為value打一個標記,在dirty map提升時才執行真正的刪除

簡單的使用栗子:

func syncMapDemo() {

	var smp sync.Map

	// 數據寫入
	smp.Store("name", "小紅")
	smp.Store("age", 18)

	// 數據讀取
	name, _ := smp.Load("name")
	fmt.Println(name)

	age, _ := smp.Load("age")
	fmt.Println(age)

	// 遍歷
	smp.Range(func(key, value interface{}) bool {
		fmt.Println(key, value)
		return true
	})

	// 刪除
	smp.Delete("age")
	age, ok := smp.Load("age")
	fmt.Println("刪除后的查詢")
	fmt.Println(age, ok)

	// 讀取或寫入,存在就讀取,不存在就寫入
	smp.LoadOrStore("age", 100)
	age, _ = smp.Load("age")
	fmt.Println("不存在")
	fmt.Println(age)

	smp.LoadOrStore("age", 99)
	age, _ = smp.Load("age")
	fmt.Println("存在")
	fmt.Println(age)
}

查看下具體的實現

// sync/map.go
type Map struct {
   // 當寫read map 或讀寫dirty map時 需要上鎖
   mu Mutex

   // read map的 k v(entry) 是不變的,刪除只是打標記,插入新key會加鎖寫到dirty中
   // 因此對read map的讀取無需加鎖
   read atomic.Value // 保存readOnly結構體

   // dirty map 對dirty map的操作需要持有mu鎖
   dirty map[interface{}]*entry

   // 當Load操作在read map中未找到,嘗試從dirty中進行加載時(不管是否存在),misses+1
   // 當misses達到diry map len時,dirty被提升為read 並且重新分配dirty
   misses int
}

// read map數據結構
type readOnly struct {
   m       map[interface{}]*entry
   // 為true時代表dirty map中含有m中沒有的元素
   amended bool
}

type entry struct {
   // 指向實際的interface{}
   // p有三種狀態:
   // p == nil: 鍵值已經被刪除,此時,m.dirty==nil 或 m.dirty[k]指向該entry
   // p == expunged: 鍵值已經被刪除, 此時, m.dirty!=nil 且 m.dirty不存在該鍵值
   // 其它情況代表實際interface{}地址 如果m.dirty!=nil 則 m.read[key] 和 m.dirty[key] 指向同一個entry
   // 當刪除key時,並不實際刪除,先CAS entry.p為nil 等到每次dirty map創建時(dirty提升后的第一次新建Key),會將entry.p由nil CAS為expunged
   p unsafe.Pointer // *interface{}
}

read mapdirty map 的存儲方式是不一致的。

前者使用 atomic.Value,后者只是單純的使用 map。原因是 read map 使用 lock free 操作,必須保證 load/store 的原子性;而 dirty mapload+store 操作是由 lock(就是 mu)來保護的。

1、read和dirty通過entry包裝value,這樣使得value的變化和map的變化隔離,前者可以用atomic無鎖完成
2、Map的read字段結構體定義為readOnly,這只是針對map[interface{}]*entry而言的,entry內的內容以及amended字段都是可以變的
3、大部分情況下,對已有key的刪除(entry.p置為nil)和更新可以直接通過修改entry.p來完成

Load

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    // 首先在通過atomic的原子操作讀取內容
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
    // 如果沒在 read 中找到,並且 amended 為 true,即 dirty 中存在 read 中沒有的 key
	if !ok && read.amended {
       // read調用了atomic的原子性,所以不用加鎖,但是dirty map[interface{}]*entry就需要了,用的互斥鎖
		m.mu.Lock()
        // double check,避免在加鎖的時候dirty map提升為read map
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
        // 還是沒有找到
		if !ok && read.amended {
            // 從 dirty 中找
			e, ok = m.dirty[key]
            // 不管dirty中有沒有找到 都增加misses計數 該函數可能將dirty map提升為readmap
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}

// 從entry中atomic load實際interface{}
func (e *entry) load() (value interface{}, ok bool) {
  p := atomic.LoadPointer(&e.p)
  if p == nil || p == expunged {
    return nil, false
  }
  return *(*interface{})(p), true
}

梳理下處理的邏輯:

1、首先是 fast path,直接在 read 中找,如果找到了直接調用 entry 的 load 方法,取出其中的值。
2、如果 read 中沒有這個 key,且 amended 為 fase,說明 dirty 為空,那直接返回 空和 false。
3、如果 read 中沒有這個 key,且 amended 為 true,說明 dirty 中可能存在我們要找的 key。當然要先上鎖,再嘗試去 dirty 中查找。在這之前,仍然有一個 double check 的操作。若還是沒有在 read 中找到,那么就從 dirty 中找。不管 dirty 中有沒有找到,都要"記一筆",因為在 dirty 被提升為 read 之前,都會進入這條路徑

// 增加misses計數,並在必要的時候提升dirty map
// 如果 misses 值小於 m.dirty 的長度,就直接返回。否則,將 m.dirty 晉升為 read,並清空 dirty,清空 misses 計數值。這樣,之前
// 一段時間新加入的 key 都會進入到 read 中,從而能夠提升 read 的命中率。
func (m *Map) missLocked() {
  m.misses++
  if m.misses < len(m.dirty) {
    return
  }
  // 提升過程很簡單,直接將m.dirty賦給m.read.m
  // 提升完成之后 amended == false m.dirty == nil
  // m.dirty並不立即創建被拷貝元素,而是延遲創建
  m.read.Store(readOnly{m: m.dirty})
  m.dirty = nil
  m.misses = 0
}

對於missLocked會直接將 misses 的值加 1,表示一次未命中,如果 misses 值小於 m.dirty 的長度,就直接返回。否則,將 m.dirty 晉升為 read,並清空 dirty,清空 misses 計數值。這樣,之前一段時間新加入的 key 都會進入到 read 中,從而能夠提升 read 的命中率。

Store

// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
    // 如果read map中存在該key  則嘗試直接更改(由於修改的是entry內部的pointer,因此dirty map也可見)
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok && e.tryStore(&value) {
		return
	}

	m.mu.Lock()
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
			// 如果 read map 中存在該 key,但 p == expunged,則說明 m.dirty != nil 並且 m.dirty 中不存在該 key 值 此時:
			//    a. 將 p 的狀態由 expunged  更改為 nil
			//    b. dirty map 插入 key
			m.dirty[key] = e
		}
        // 更新 entry.p = value (read map 和 dirty map 指向同一個 entry)
		e.storeLocked(&value)
	} else if e, ok := m.dirty[key]; ok {
        // 如果 read map 中不存在該 key,但 dirty map 中存在該 key,直接寫入更新 entry(read map 中仍然沒有這個 key)
		e.storeLocked(&value)
	} else {
		// 如果read map和dirty map中都不存在該key,則:
		//    a. 如果dirty map為空,則需要創建dirty map,並從read map中拷貝未刪除的元素
		//    b. 更新amended字段,標識dirty map中存在read map中沒有的key
		//    c. 將k v寫入dirty map中,read.m不變
		if !read.amended {

			m.dirtyLocked()
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
	}
	m.mu.Unlock()
}

// 嘗試直接更新entry 如果p == expunged 返回false
func (e *entry) tryStore(i *interface{}) bool {
  p := atomic.LoadPointer(&e.p)
  if p == expunged {
    return false
  }
  for {
    if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
      return true
    }
    p = atomic.LoadPointer(&e.p)
    if p == expunged {
      return false
    }
  }
}

func (e *entry) unexpungeLocked() (wasExpunged bool) {
  return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

// 如果 dirty map為nil,則從read map中拷貝元素到dirty map
func (m *Map) dirtyLocked() {
  if m.dirty != nil {
    return
  }

  read, _ := m.read.Load().(readOnly)
  m.dirty = make(map[interface{}]*entry, len(read.m))
  for k, e := range read.m {
    // a. 將所有為 nil的 p 置為 expunged
    // b. 只拷貝不為expunged 的 p
    if !e.tryExpungeLocked() {
      m.dirty[k] = e
    }
  }
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
  p := atomic.LoadPointer(&e.p)
  for p == nil {
    if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
      return true
    }
    p = atomic.LoadPointer(&e.p)
  }
  return p == expunged
}

梳理下流程:

1、首先還是去read map中查詢,存在並且p!=expunged,直接修改。(由於修改的是 entry 內部的 pointer,因此 dirty map 也可見)
2、如果read map中存在該key,但p == expunged。加鎖更新p的狀態,然后直接更新該entry (此時m.dirtynil或m.dirty[key]e)
3、如果read map中不存在該Key,但dirty map中存在該key,直接寫入更新entry(read map中仍然沒有)
4、如果read map和dirty map都不存在該key

  • a. 如果dirty map為空,則需要創建dirty map,並從read map中拷貝未刪除的元素
  • b. 更新amended字段,標識dirty map中存在read map中沒有的key
  • c. 將k v寫入dirty map中,read.m不變

Delete

// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
    // 從read map中尋找
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
    // 沒找到
	if !ok && read.amended { // read.amended為true代表dirty map中含有m中沒有的元素
		m.mu.Lock()
        // double check
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
        // 第二次仍然沒找到,但dirty map中存在,則直接從dirty map刪除
		if !ok && read.amended {
			delete(m.dirty, key)
		}
		m.mu.Unlock()
	}
    // 如果read存在,將entry.p 置為 nil
	if ok {
		e.delete()
	}
}

func (e *entry) delete() (hadValue bool) {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == nil || p == expunged {
			return false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, nil) {
			return true
		}
	}
}

梳理下流程:
1、先去read map中尋找,如果存在就直接刪除
2、如果沒找到,並且 read.amended為true代表dirty map中存在,依照傳統進行 double check。
3、read map找到就刪除,沒找到判斷dirty map是否存在,存在了就刪除

LoadOrStore

func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
	// 首先還是先去read map中查詢
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		actual, loaded, ok := e.tryLoadOrStore(value)
		if ok {
			return actual, loaded
		}
	}

	m.mu.Lock()
    // double check
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
		    // 如果 read map 中存在該 key,但 p == expunged,則說明 m.dirty != nil 並且 m.dirty 中不存在該 key 值 此時:
		    //    a. 將 p 的狀態由 expunged  更改為 nil
		    //    b. dirty map 插入 key
			m.dirty[key] = e
		}
		actual, loaded, _ = e.tryLoadOrStore(value)
	} else if e, ok := m.dirty[key]; ok {
        // 如果 read map 中不存在該 key,但 dirty map 中存在該 key
		actual, loaded, _ = e.tryLoadOrStore(value)
        // 不管dirty中有沒有找到 都增加misses計數 該函數可能將dirty map提升為readmap
		m.missLocked()
	} else {
		if !read.amended {
		    // 如果read map和dirty map中都不存在該key,則:
		    //    a. 如果dirty map為空,則需要創建dirty map,並從read map中拷貝未刪除的元素
		    //    b. 更新amended字段,標識dirty map中存在read map中沒有的key
		    //    c. 將k v寫入dirty map中,read.m不變
			m.dirtyLocked()
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
		actual, loaded = value, false
	}
	m.mu.Unlock()

	return actual, loaded
}

// 如果entry is expunged則不處理,返回false
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
	p := atomic.LoadPointer(&e.p)
	if p == expunged {
		return nil, false, false
	}
	if p != nil {
		return *(*interface{})(p), true, true
	}

	// Copy the interface after the first load to make this method more amenable
	// to escape analysis: if we hit the "load" path or the entry is expunged, we
	// shouldn't bother heap-allocating.
	ic := i
	for {
		if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
			return i, false, true
		}
		p = atomic.LoadPointer(&e.p)
		if p == expunged {
			return nil, false, false
		}
		if p != nil {
			return *(*interface{})(p), true, true
		}
	}
}

這個函數結合了 Load 和 Store 的功能,如果 map 中存在這個 key,那么返回這個 key 對應的 value;否則,將 key-value 存入 map。
這在需要先執行 Load 查看某個 key 是否存在,之后再更新此 key 對應的 value 時很有效,因為 LoadOrStore 可以並發執行。

總結

除了Load/Store/Delete之外,sync.Map還提供了LoadOrStore/Range操作,但沒有提供Len()方法,這是因為要統計有效的鍵值對只能先提升dirty map(dirty map中可能有read map中沒有的鍵值對),再遍歷m.read(由於延遲刪除,不是所有的鍵值對都有效),這其實就是Range做的事情,因此在不添加新數據結構支持的情況下,sync.Map的長度獲取和Range操作是同一復雜度的。這部分只能看官方后續支持。

1、sync.map 是線程安全的,讀取,插入,刪除也都保持着常數級的時間復雜度。

2、通過讀寫分離,降低鎖時間來提高效率,適用於讀多寫少的場景。

3、Range 操作需要提供一個函數,參數是 k,v,返回值是一個布爾值:f func(key, value interface{}) bool。

4、調用 Load 或 LoadOrStore 函數時,如果在 read 中沒有找到 key,則會將 misses 值原子地增加 1,當 misses 增加到和 dirty 的長度相等時,會將 dirty 提升為 read。以期減少“讀 miss”。

5、新寫入的 key 會保存到 dirty 中,如果這時 dirty 為 nil,就會先新創建一個 dirty,並將 read 中未被刪除的元素拷貝到 dirty。

6、當 dirty 為 nil 的時候,read 就代表 map 所有的數據;當 dirty 不為 nil 的時候,dirty 才代表 map 所有的數據。

流程圖片

最后附上一張不錯的圖片

參考

【Go sync.Map 實現】https://wudaijun.com/2018/02/go-sync-map-implement/
【深度解密 Go 語言之 sync.map】https://www.cnblogs.com/qcrao-2018/p/12833787.html
【圖片】http://russellluo.com/2017/06/go-sync-map-diagram.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM