go map底層實現


1、map原理

map是由key-value組成實現,主要的數據結構由:哈希查找表和搜索樹;

哈希查找表一般會存在“碰撞”的問題,就是對於不同的key會哈希到同一個單元中,解決這個問題有兩種實現方法:鏈表法和開放地址法。鏈表法是為每一個單元創建一個鏈表,去存儲不同的key;開放地址發,則是碰撞發生后通過某種方法,將key放到空的單元種

搜索樹一般都是平衡樹,平衡樹包括:ALV樹紅黑樹

2、map底層實現

//map結構體是hmap,是hashmap的縮寫
type hmap struct {
    count      int            //元素個數,調用len(map)時直接返回
    flags      uint8          //標志map當前狀態,正在刪除元素、添加元素.....
    B          uint8          //單元(buckets)的對數 B=5表示能容納32個元素
    noverflow  uint16         //單元(buckets)溢出數量,如果一個單元能存8個key,此時存儲了9個,溢出了,就需要再增加一個單元
    hash0      uint32         //哈希種子
    buckets    unsafe.Pointer //指向單元(buckets)數組,大小為2^B,可以為nil
    oldbuckets unsafe.Pointer //擴容的時候,buckets長度會是oldbuckets的兩倍
    nevacute   uintptr        //指示擴容進度,小於此buckets遷移完成
    extra      *mapextra      //與gc相關 可選字段
}

//a bucket for a Go map
type bmap struct {
    tophash [bucketCnt]uint8
}

//實際上編輯期間會動態生成一個新的結構體
type bmap struct {
    topbits  [8]uint8
    keys     [8]keytype
    values   [8]valuetype
    pad      uintptr
    overflow uintptr
}

bmp也就是bucket,由初始化的結構體可知,里面最多存8個key,每個key落在桶的位置有hash出來的結果的高8位決定。整體如下圖

由上圖可以看到,map中的key和value都不是指針,所以當size小於128字節時,會把bmap標記為不含指針,這樣能夠避免gc時掃描整個hmap;但是bmap中是存在一個overflow指針,用於指向下一個bmap,為了滿足條件,這時候會把overflow指針到extra字段中。

type mapextra struct {
    // overflow[0] contains overflow buckets for hmap.buckets.
    // overflow[1] contains overflow buckets for hmap.oldbuckets.
    overflow [2]*[]*bmap

    // nextOverflow 包含空閑的 overflow bucket,這是預分配的 bucket
    nextOverflow *bmap
}

bmp的內部組成如下:

 上面就是bmap的內存模型,HOB Hash指的就是tophash;這里可以看到,key value並不是以鍵值對的形式存放的,而是獨立放在一起的,源碼給出的解釋是,減少pad字段,節省內存空間。

比如:map[int64] int8,如果以key-value的形式存儲就必須在每個value后面添加padding7個字節,如果以上圖的形式只需要在最后一個value后面添加padding就可以了

3、創建map

func makemap(t *maptype, hint int, h *hmap) *hmap {
    mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
    if overflow || mem > maxAlloc {
        hint = 0
    }

    // initialize Hmap
    if h == nil {
        h = new(hmap)
    }
    h.hash0 = fastrand()

    //查找一個B,使得map的裝載因子在一個正常的范圍
    B := uint8(0)
    for overLoadFactor(hint, B) {
        B++
    }
    h.B = B

    // 初始化hash table
    // if B == 0, 那么buckets會在復制后再分配
    // 如果長度太大,復制會花費很長的時間
    if h.B != 0 {
        var nextOverflow *bmap
        h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
        if nextOverflow != nil {
            h.extra = new(mapextra)
            h.extra.nextOverflow = nextOverflow
        }
    }

    return h
}

// overLoadFactor reports whether count items placed in 1<<B buckets is over loadFactor.
func overLoadFactor(count int, B uint8) bool {
    return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}

4、key定位

key經過哈希值計算得到哈希值,共64位(64位機器),后面5位用於計算該key放在哪一個bucket中,前8位用於確定該key在bucket中的位置;比如一個key經過計算結果是:

10010111 | 000011110110110010001111001010100010010110010101010 │ 01010

01010值是10,也就是第10個bucket;10010111值是151,在6號bucket中查找tophash值為151的key(最開始bucket還沒有 key,新加入的 key 會找到第一個空位,放入)。

如果在bucket中沒有找到,此時如果overflow不為空,那么就沿着overflow繼續查找,如果還是沒有找到,那就從別的key槽位查找,直到遍歷所有bucket。key查找源碼如下(mapaccess1為例):

// mapaccess1返回一個指向h[鍵]的指針。決不返回nil,相反,如果鍵不在映射中,它將返回對elem類型的zero對象的引用。
// 注意:返回的指針可能會使整個映射保持活動狀態,所以不要長時間保持。
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    if raceenabled && h != nil {
        callerpc := getcallerpc()
        pc := funcPC(mapaccess1)
        racereadpc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
    if msanenabled && h != nil {
        msanread(key, t.key.size)
    }
    //如果h說明都沒有,返回零值
    if h == nil || h.count == 0 {
        if t.hashMightPanic() { //如果哈希函數出錯
            t.key.alg.hash(key, 0) // see issue 23734
        }
        return unsafe.Pointer(&zeroVal[0])
    }
    //寫和讀沖突
    if h.flags&hashWriting != 0 {
        throw("concurrent map read and map write")
    }
    //不同類型的key需要不同的hash算法需要在編譯期間確定
    alg := t.key.alg
    //利用hash0引入隨機性,計算哈希值
    hash := alg.hash(key, uintptr(h.hash0))
    //比如B=5那m就是31二進制是全1,
    //求bucket num時,將hash與m相與,
    //達到bucket num由hash的低8位決定的效果,
    //bucketMask函數掩蔽了移位量,省略了溢出檢查。
    m := bucketMask(h.B)
    //b即bucket的地址
    b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
    // oldbuckets 不為 nil,說明發生了擴容
    if c := h.oldbuckets; c != nil {
        if !h.sameSizeGrow() {
            //新的bucket是舊的bucket兩倍
            m >>= 1
        }
        //求出key在舊的bucket中的位置
        oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
        //如果舊的bucket還沒有搬遷到新的bucket中,那就在老的bucket中尋找
        if !evacuated(oldb) {
            b = oldb
        }
    }
    //計算tophash高8位
    top := tophash(hash)
bucketloop:
    //遍歷所有overflow里面的bucket
    for ; b != nil; b = b.overflow(t) {
        //遍歷8個bucket
        for i := uintptr(0); i < bucketCnt; i++ {
            //tophash不匹配,繼續
            if b.tophash[i] != top {
                if b.tophash[i] == emptyRest {
                    break bucketloop
                }
                continue
            }
            //tophash匹配,定位到key的位置
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            //若key為指針
            if t.indirectkey() {
                //解引用
                k = *((*unsafe.Pointer)(k))
            }
            //key相等
            if alg.equal(key, k) {
                //定位value的位置
                e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
                if t.indirectelem() {
                    //value解引用
                    e = *((*unsafe.Pointer)(e))
                }
                return e
            }
        }
    }
    //沒有找到,返回0值
    return unsafe.Pointer(&zeroVal[0])
}

這里說一下定位key和value的方法以及整個循環的寫法:

//key定位公式
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
//value定位公式
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))

b是bmap的地址,dataOffset是key相對於bmap起始地址的偏移:

dataOffset=unsafe.Offsetof(struct{
        b bmap
        v int64
    }{}.v)

 因此bucket里key的起始地址就是unsafe.Pointer(b)+dataOffset;第i個key的地址就要此基礎上加i個key大小;value的地址是在key之后,所以第i個value,要加上所有的key的偏移。

遍歷所有bucket如下:

 再來說一下minTopHash:

// 計算tophash值
func tophash(hash uintptr) uint8 {
    top := uint8(hash >> (sys.PtrSize*8 - 8))
        //增加一個minTopHash(默認最小值為5)       
    if top < minTopHash {
        top += minTopHash
    }
    return top
}

當一個cell的tophash值小於minTopHash時,標志該cell的遷移狀態。因為這個狀態值是放在tophash數組里,為了和正常的哈希值區分開,會給key計算出來的哈希值一個增量:minTopHash,這樣就能區分正常的tophash值和表示狀態的哈希值。

emptyRest      = 0 //這個單元格是空的,在更高的索引或溢出處不再有非空單元格
emptyOne       = 1 //單元是空的
evacuatedX     = 2 // key/elem有效.  實體已經被搬遷到新的buckt的前半部分
evacuatedY     = 3 //同上,實體已經被搬遷到新的buckt的后半部分
evacuatedEmpty = 4 // 單元為空,以搬遷完成
minTopHash     = 5 // 正常填充單元格的最小tophash

源碼中通過第一個tophash值來判斷bucket是否搬遷完成:

func evacuated(b *bmap) bool {
    h := b.tophash[0]
    return h > emptyOne && h < minTopHash
}

 

 

參考地址:https://github.com/qcrao/Go-Questions/blob/master/map/map%20%E7%9A%84%E5%BA%95%E5%B1%82%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86%E6%98%AF%E4%BB%80%E4%B9%88.md


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM