1、map原理
map是由key-value組成實現,主要的數據結構由:哈希查找表和搜索樹;
哈希查找表一般會存在“碰撞”的問題,就是對於不同的key會哈希到同一個單元中,解決這個問題有兩種實現方法:鏈表法和開放地址法。鏈表法是為每一個單元創建一個鏈表,去存儲不同的key;開放地址發,則是碰撞發生后通過某種方法,將key放到空的單元種
2、map底層實現
//map結構體是hmap,是hashmap的縮寫 type hmap struct { count int //元素個數,調用len(map)時直接返回 flags uint8 //標志map當前狀態,正在刪除元素、添加元素..... B uint8 //單元(buckets)的對數 B=5表示能容納32個元素 noverflow uint16 //單元(buckets)溢出數量,如果一個單元能存8個key,此時存儲了9個,溢出了,就需要再增加一個單元 hash0 uint32 //哈希種子 buckets unsafe.Pointer //指向單元(buckets)數組,大小為2^B,可以為nil oldbuckets unsafe.Pointer //擴容的時候,buckets長度會是oldbuckets的兩倍 nevacute uintptr //指示擴容進度,小於此buckets遷移完成 extra *mapextra //與gc相關 可選字段 } //a bucket for a Go map type bmap struct { tophash [bucketCnt]uint8 } //實際上編輯期間會動態生成一個新的結構體 type bmap struct { topbits [8]uint8 keys [8]keytype values [8]valuetype pad uintptr overflow uintptr }
bmp也就是bucket,由初始化的結構體可知,里面最多存8個key,每個key落在桶的位置有hash出來的結果的高8位決定。整體如下圖
由上圖可以看到,map中的key和value都不是指針,所以當size小於128字節時,會把bmap標記為不含指針,這樣能夠避免gc時掃描整個hmap;但是bmap中是存在一個overflow指針,用於指向下一個bmap,為了滿足條件,這時候會把overflow指針到extra字段中。
type mapextra struct { // overflow[0] contains overflow buckets for hmap.buckets. // overflow[1] contains overflow buckets for hmap.oldbuckets. overflow [2]*[]*bmap // nextOverflow 包含空閑的 overflow bucket,這是預分配的 bucket nextOverflow *bmap }
bmp的內部組成如下:
上面就是bmap的內存模型,HOB Hash指的就是tophash;這里可以看到,key value並不是以鍵值對的形式存放的,而是獨立放在一起的,源碼給出的解釋是,減少pad字段,節省內存空間。
比如:map[int64] int8,如果以key-value的形式存儲就必須在每個value后面添加padding7個字節,如果以上圖的形式只需要在最后一個value后面添加padding就可以了
3、創建map
func makemap(t *maptype, hint int, h *hmap) *hmap { mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size) if overflow || mem > maxAlloc { hint = 0 } // initialize Hmap if h == nil { h = new(hmap) } h.hash0 = fastrand() //查找一個B,使得map的裝載因子在一個正常的范圍 B := uint8(0) for overLoadFactor(hint, B) { B++ } h.B = B // 初始化hash table // if B == 0, 那么buckets會在復制后再分配 // 如果長度太大,復制會花費很長的時間 if h.B != 0 { var nextOverflow *bmap h.buckets, nextOverflow = makeBucketArray(t, h.B, nil) if nextOverflow != nil { h.extra = new(mapextra) h.extra.nextOverflow = nextOverflow } } return h } // overLoadFactor reports whether count items placed in 1<<B buckets is over loadFactor. func overLoadFactor(count int, B uint8) bool { return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen) }
4、key定位
key經過哈希值計算得到哈希值,共64位(64位機器),后面5位用於計算該key放在哪一個bucket中,前8位用於確定該key在bucket中的位置;比如一個key經過計算結果是:
10010111 | 000011110110110010001111001010100010010110010101010 │ 01010
01010值是10,也就是第10個bucket;10010111值是151,在6號bucket中查找tophash值為151的key(最開始bucket還沒有 key,新加入的 key 會找到第一個空位,放入)。
如果在bucket中沒有找到,此時如果overflow不為空,那么就沿着overflow繼續查找,如果還是沒有找到,那就從別的key槽位查找,直到遍歷所有bucket。key查找源碼如下(mapaccess1為例):
// mapaccess1返回一個指向h[鍵]的指針。決不返回nil,相反,如果鍵不在映射中,它將返回對elem類型的zero對象的引用。 // 注意:返回的指針可能會使整個映射保持活動狀態,所以不要長時間保持。 func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { if raceenabled && h != nil { callerpc := getcallerpc() pc := funcPC(mapaccess1) racereadpc(unsafe.Pointer(h), callerpc, pc) raceReadObjectPC(t.key, key, callerpc, pc) } if msanenabled && h != nil { msanread(key, t.key.size) } //如果h說明都沒有,返回零值 if h == nil || h.count == 0 { if t.hashMightPanic() { //如果哈希函數出錯 t.key.alg.hash(key, 0) // see issue 23734 } return unsafe.Pointer(&zeroVal[0]) } //寫和讀沖突 if h.flags&hashWriting != 0 { throw("concurrent map read and map write") } //不同類型的key需要不同的hash算法需要在編譯期間確定 alg := t.key.alg //利用hash0引入隨機性,計算哈希值 hash := alg.hash(key, uintptr(h.hash0)) //比如B=5那m就是31二進制是全1, //求bucket num時,將hash與m相與, //達到bucket num由hash的低8位決定的效果, //bucketMask函數掩蔽了移位量,省略了溢出檢查。 m := bucketMask(h.B) //b即bucket的地址 b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize))) // oldbuckets 不為 nil,說明發生了擴容 if c := h.oldbuckets; c != nil { if !h.sameSizeGrow() { //新的bucket是舊的bucket兩倍 m >>= 1 } //求出key在舊的bucket中的位置 oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize))) //如果舊的bucket還沒有搬遷到新的bucket中,那就在老的bucket中尋找 if !evacuated(oldb) { b = oldb } } //計算tophash高8位 top := tophash(hash) bucketloop: //遍歷所有overflow里面的bucket for ; b != nil; b = b.overflow(t) { //遍歷8個bucket for i := uintptr(0); i < bucketCnt; i++ { //tophash不匹配,繼續 if b.tophash[i] != top { if b.tophash[i] == emptyRest { break bucketloop } continue } //tophash匹配,定位到key的位置 k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) //若key為指針 if t.indirectkey() { //解引用 k = *((*unsafe.Pointer)(k)) } //key相等 if alg.equal(key, k) { //定位value的位置 e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) if t.indirectelem() { //value解引用 e = *((*unsafe.Pointer)(e)) } return e } } } //沒有找到,返回0值 return unsafe.Pointer(&zeroVal[0]) }
這里說一下定位key和value的方法以及整個循環的寫法:
//key定位公式 k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) //value定位公式 e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
b是bmap的地址,dataOffset是key相對於bmap起始地址的偏移:
dataOffset=unsafe.Offsetof(struct{ b bmap v int64 }{}.v)
因此bucket里key的起始地址就是unsafe.Pointer(b)+dataOffset;第i個key的地址就要此基礎上加i個key大小;value的地址是在key之后,所以第i個value,要加上所有的key的偏移。
遍歷所有bucket如下:
再來說一下minTopHash:
// 計算tophash值 func tophash(hash uintptr) uint8 { top := uint8(hash >> (sys.PtrSize*8 - 8)) //增加一個minTopHash(默認最小值為5) if top < minTopHash { top += minTopHash } return top }
當一個cell的tophash值小於minTopHash時,標志該cell的遷移狀態。因為這個狀態值是放在tophash數組里,為了和正常的哈希值區分開,會給key計算出來的哈希值一個增量:minTopHash,這樣就能區分正常的tophash值和表示狀態的哈希值。
emptyRest = 0 //這個單元格是空的,在更高的索引或溢出處不再有非空單元格 emptyOne = 1 //單元是空的 evacuatedX = 2 // key/elem有效. 實體已經被搬遷到新的buckt的前半部分 evacuatedY = 3 //同上,實體已經被搬遷到新的buckt的后半部分 evacuatedEmpty = 4 // 單元為空,以搬遷完成 minTopHash = 5 // 正常填充單元格的最小tophash
源碼中通過第一個tophash值來判斷bucket是否搬遷完成:
func evacuated(b *bmap) bool { h := b.tophash[0] return h > emptyOne && h < minTopHash }
參考地址:https://github.com/qcrao/Go-Questions/blob/master/map/map%20%E7%9A%84%E5%BA%95%E5%B1%82%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86%E6%98%AF%E4%BB%80%E4%B9%88.md