go學習筆記 Go的sync.Pool源碼


Pool介紹#

總所周知Go 是一個自動垃圾回收的編程語言,采用三色並發標記算法標記對象並回收。如果你想使用 Go 開發一個高性能的應用程序的話,就必須考慮垃圾回收給性能帶來的影響。因為Go 在垃圾回收的時候會有一個STW(stop-the-world,程序暫停)的時間,並且如果對象太多,做標記也需要時間。所以如果采用對象池來創建對象,增加對象的重復利用率,使用的時候就不必在堆上重新創建對象可以節省開銷。在Go中,golang提供了對象重用的機制,也就是sync.Pool對象池。 sync.Pool是可伸縮的,並發安全的。其大小僅受限於內存的大小,可以被看作是一個存放可重用對象的值的容器。 設計的目的是存放已經分配的但是暫時不用的對象,在需要用到的時候直接從pool中取。

任何存放區其中的值可以在任何時候被刪除而不通知,在高負載下可以動態的擴容,在不活躍時對象池會收縮。它對外提供了三個方法:New、Get 和 Put。下面用一個簡短的例子來說明一下Pool使用:

package main
 
import (
    "fmt"
    "sync"
)
 
var pool *sync.Pool
 
type Person struct {
    Name string
}
 
func init() {
    pool = &sync.Pool{
        New: func() interface{} {
            fmt.Println("creating a new person")
            return new(Person)
        },
    }
}
 
func main() {
 
    person := pool.Get().(*Person)
    fmt.Println("Get Pool Object1:", person)
 
    person.Name = "first"
    pool.Put(person)
 
    fmt.Println("Get Pool Object2:", pool.Get().(*Person))
    fmt.Println("Get Pool Object3:", pool.Get().(*Person))
 
}

結果:

creating a new person
Get Pool Object1: &{}
Get Pool Object2: &{first}
creating a new person
Get Pool Object3: &{}

這里我用了init方法初始化了一個pool,然后get了三次,put了一次到pool中,如果pool中沒有對象,那么會調用New函數創建一個新的對象,否則會從put進去的對象中獲取。

存儲在池中的任何項目都可以隨時自動刪除,並且不會被通知。Pool可以安全地同時使用多個goroutine。池的目的是緩存已分配但未使用的對象以供以后重用,從而減輕對gc的壓力。也就是說,它可以輕松構建高效,線程安全的free列表。但是,它不適用於所有free列表。池的適當使用是管理一組默認共享的臨時項,並且可能由包的並發獨立客戶端重用。池提供了一種在許多客戶端上分攤分配開銷的方法。很好地使用池的一個例子是fmt包,它維護一個動態大小的臨時輸出緩沖區存儲。底層存儲隊列在負載下(當許多goroutine正在積極打印時)進行縮放,並在靜止時收縮。另一方面,作為短期對象的一部分維護的空閑列表不適合用於池, 因為在該場景中開銷不能很好地攤銷。 使這些對象實現自己的空閑列表更有效。首次使用后不得復制池。

pool 的兩個特點
1、在本地私有池和本地共享池均獲取 obj 失敗時,則會從其他p偷一個 obj 返回給調用方。
2、obj在池中的生命周期取決於垃圾回收任務的下一次執行時間,並且從池中獲取到的值可能是 put 進去的其中一個值,也可能是 newfun處 新生成的一個值,在應用時很容易入坑。

在多個goroutine之間使用同一個pool做到高效,是因為sync.pool為每個P都分配了一個子池,
當執行一個pool的get或者put操作的時候都會先把當前的goroutine固定到某個P的子池上面,
然后再對該子池進行操作。每個子池里面有一個私有對象和共享列表對象,
私有對象是只有對應的P能夠訪問,因為一個P同一時間只能執行一個goroutine,
【因此對私有對象存取操作是不需要加鎖的】。

源碼分析

type Pool struct {
    // 不允許復制,一個結構體,有一個Lock()方法,嵌入別的結構體中,表示不允許復制
    // noCopy對象,擁有一個Lock方法,使得Cond對象在進行go vet掃描的時候,能夠被檢測到是否被復制
    noCopy noCopy
 
    //local 和 localSize 維護一個動態 poolLocal 數組
    // 每個固定大小的池, 真實類型是 [P]poolLocal
    // 其實就是一個[P]poolLocal 的指針地址
    local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    localSize uintptr        // size of the local array
 
    victim     unsafe.Pointer // local from previous cycle
    victimSize uintptr        // size of victims array
 
    // New optionally specifies a function to generate
    // a value when Get would otherwise return nil.
    // It may not be changed concurrently with calls to Get.
    // New 是一個回調函數指針,當Get 獲取到目標對象為 nil 時,需要調用此處的回調函數用於生成 新的對象
    New func() interface{}
}
  1. Pool結構體里面noCopy代表這個結構體是禁止拷貝的,它可以在我們使用  go vet 工具的時候生效;

local是一個poolLocal數組的指針,localSize代表這個數組的大小;同樣victim也是一個poolLocal數組的指針,每次垃圾回收的時候,Pool 會把 victim 中的對象移除,然后把 local 的數據給 victim;local和victim的邏輯我們下面會詳細介紹到。

New函數是在創建pool的時候設置的,當pool沒有緩存對象的時候,會調用New方法生成一個新的對象。

下面我們對照着pool的結構圖往下講,避免找不到北:

// Local per-P Pool appendix.
/*
因為poolLocal中的對象可能會被其他P偷走,
private域保證這個P不會被偷光,至少保留一個對象供自己用。
否則,如果這個P只剩一個對象,被偷走了,
那么當它本身需要對象時又要從別的P偷回來,造成了不必要的開銷
*/
type poolLocalInternal struct {
    private interface{} // Can be used only by the respective P.
    shared  poolChain   // Local P can pushHead/popHead; any P can popTail.
}
 
type poolLocal struct {
    poolLocalInternal
 
    // Prevents false sharing on widespread platforms with
    // 128 mod (cache line size) = 0 .
    /**
    cache使用中常見的一個問題是false sharing。
    當不同的線程同時讀寫同一cache line上不同數據時就可能發生false sharing。
    false sharing會導致多核處理器上嚴重的系統性能下降。
    字節對齊,避免 false sharing (偽共享)
    */
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

local字段存儲的是一個poolLocal數組的指針,poolLocal數組大小是goroutine中P的數量,訪問時,P的id對應poolLocal數組下標索引,所以Pool的最大個數runtime.GOMAXPROCS(0)。

通過這樣的設計,每個P都有了自己的本地空間,多個 goroutine 使用同一個 Pool 時,減少了競爭,提升了性能。如果對goroutine的P、G、M有疑惑的同學不妨看看這篇文章:The Go scheduler

poolLocal里面有一個pad數組用來占位用,防止在 cache line 上分配多個 poolLocalInternal從而造成false sharing,cache使用中常見的一個問題是false sharing。當不同的線程同時讀寫同一cache line上不同數據時就可能發生false sharing。false sharing會導致多核處理器上嚴重的系統性能下降。具體的可以參考偽共享(False Sharing)

poolLocalInternal包含兩個字段private和shared。

private代表緩存的一個元素,只能由相應的一個 P 存取。因為一個 P 同時只能執行一個 goroutine,所以不會有並發的問題;所以無需加鎖

shared則可以由任意的 P 訪問,但是只有本地的 P 才能 pushHead/popHead,其它 P 可以 popTail。因為可能有多個goroutine同時操作,所以需要加鎖。

type poolChain struct {
    // head is the poolDequeue to push to. This is only accessed
    // by the producer, so doesn't need to be synchronized.
    head *poolChainElt
 
    // tail is the poolDequeue to popTail from. This is accessed
    // by consumers, so reads and writes must be atomic.
    tail *poolChainElt
}
 
type poolChainElt struct {
    poolDequeue
 
    // next and prev link to the adjacent poolChainElts in this
    // poolChain.
    //
    // next is written atomically by the producer and read
    // atomically by the consumer. It only transitions from nil to
    // non-nil.
    //
    // prev is written atomically by the consumer and read
    // atomically by the producer. It only transitions from
    // non-nil to nil.
    next, prev *poolChainElt
}
type poolDequeue struct {
    // headTail packs together a 32-bit head index and a 32-bit
    // tail index. Both are indexes into vals modulo len(vals)-1.
    //
    // tail = index of oldest data in queue
    // head = index of next slot to fill
    //
    // Slots in the range [tail, head) are owned by consumers.
    // A consumer continues to own a slot outside this range until
    // it nils the slot, at which point ownership passes to the
    // producer.
    //
    // The head index is stored in the most-significant bits so
    // that we can atomically add to it and the overflow is
    // harmless.
    headTail uint64
 
    // vals is a ring buffer of interface{} values stored in this
    // dequeue. The size of this must be a power of 2.
    //
    // vals[i].typ is nil if the slot is empty and non-nil
    // otherwise. A slot is still in use until *both* the tail
    // index has moved beyond it and typ has been set to nil. This
    // is set to nil atomically by the consumer and read
    // atomically by the producer.
    vals []eface
}
 
type eface struct {
    typ, val unsafe.Pointer
}

poolChain是一個雙端隊列,里面的head和tail分別指向隊列頭尾;poolDequeue里面存放真正的數據,是一個單生產者、多消費者的固定大小的無鎖的環狀隊列,headTail是環狀隊列的首位位置的指針,可以通過位運算解析出首尾的位置,生產者可以從 head 插入、head 刪除,而消費者僅可從 tail 刪除。

這個雙端隊列的模型大概是這個樣子:

poolDequeue里面的環狀隊列大小是固定的,后面分析源碼我們會看到,當環狀隊列滿了的時候會創建一個size是原來兩倍大小的環狀隊列。大家這張圖好好體會一下,會反復用到。

Get方法#

// Get selects an arbitrary item from the Pool, removes it from the
// Pool, and returns it to the caller.
// Get may choose to ignore the pool and treat it as empty.
// Callers should not assume any relation between values passed to Put and
// the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
func (p *Pool) Get() interface{} {
    if race.Enabled {
        race.Disable()
    }
    l, pid := p.pin() //1.把當前goroutine綁定在當前的P上
    x := l.private    //2.優先從local的private中獲取
    l.private = nil
    if x == nil {
        // Try to pop the head of the local shard. We prefer
        // the head over the tail for temporal locality of
        // reuse.
        x, _ = l.shared.popHead()  //3,private沒有,那么從shared的頭部獲取
        if x == nil {
            x = p.getSlow(pid)  //4. 如果都沒有,那么去別的local上去偷一個
        }
    }
    runtime_procUnpin()  //解除搶占
    if race.Enabled {
        race.Enable()
        if x != nil {
            race.Acquire(poolRaceAddr(x))
        }
    }
    //5. 如果沒有獲取到,嘗試使用New函數生成一個新的
    if x == nil && p.New != nil {
        x = p.New()
    }
    return x
}
  • 這一段代碼首先會將當前goroutine綁定在當前的P上返回對應的local,然后嘗試從local的private中獲取,然后需要把private字段置空,因為已經拿到了想要的對象;

  • private中獲取不到,那么就去shared的頭部獲取;

  • shared也沒有,那么嘗試遍歷所有的 local,嘗試從它們的 shared 彈出一個元素;

  • 最后如果還是沒有,那么就直接調用預先設置好的 New 函數,創建一個出來。

pin#

// pin 會將當前 goroutine 訂到 P 上, 禁止搶占(preemption) 並從 poolLocal 池中返回 P 對應的 poolLocal
// 調用方必須在完成取值后調用 runtime_procUnpin() 來取消禁止搶占。
// pin pins the current goroutine to P, disables preemption and
// returns poolLocal pool for the P and the P's id.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() (*poolLocal, int) {
    pid := runtime_procPin()
    // In pinSlow we store to local and then to localSize, here we load in opposite order.
    // Since we've disabled preemption, GC cannot happen in between.
    // Thus here we must observe local at least as large localSize.
    // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
    // 因為可能存在動態的 P(運行時調整 P 的個數)procresize/GOMAXPROCS
    // 如果 P.id 沒有越界,則直接返回   PID
    /**
    具體的邏輯就是首先拿到當前的pid,
    然后以pid作為index找到local中的poolLocal,
    但是如果pid大於了localsize,
    說明當前線程的poollocal不存在,就會新創建一個poolLocal
    */
    s := atomic.LoadUintptr(&p.localSize) // load-acquire
    l := p.local                          // load-consume
    if uintptr(pid) < s {
        return indexLocal(l, pid), pid
    }
    // 沒有結果時,涉及全局加鎖
    // 例如重新分配數組內存,添加到全局列表
    return p.pinSlow()
}
 
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
    lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
    return (*poolLocal)(lp)
}

pin方法里面首先會調用runtime_procPin方法會先獲取當前goroutine,然后綁定到對應的M上,然后返回M目前綁定的P的id,因為這個pid后面會用到,防止在使用途中P被搶占,具體的細節可以看這篇:https://zhuanlan.zhihu.com/p/99710992。接下來會使用原子操作取出localSize,如果當前pid大於localSize,那么就表示Pool還沒創建對應的poolLocal,那么調用pinSlow進行創建工作,否則調用indexLocal取出pid對應的poolLocal返回。

indexLocal里面是使用了地址操作,傳入的i是數組的index值,所以需要獲取poolLocal{}的size做一下地址的位移操作,然后再轉成轉成poolLocal地址返回。

pinSlow#

func (p *Pool) pinSlow() (*poolLocal, int) {
    // Retry under the mutex.
    // Can not lock the mutex while pinned.
    //因為需要對全局進行加鎖,pinSlow() 會首先取消 P 的不可搶占,然后使用 allPoolsMu 進行加鎖
    runtime_procUnpin() // 解除pin
    allPoolsMu.Lock() // 加上全局鎖
    defer allPoolsMu.Unlock()
    pid := runtime_procPin() // pin住
    // poolCleanup won't be called while we are pinned.
    s := p.localSize
    l := p.local
    // 重新對pid進行檢查 再次檢查是否符合條件,因為可能中途已被其他線程調用
    // 當再次固定 P 時 poolCleanup 不會被調用
 
    if uintptr(pid) < s {
        return indexLocal(l, pid), pid
    }
    // 初始化local前會將pool放入到allPools數組中
    if p.local == nil {
        allPools = append(allPools, p)
    }
    // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
    size := runtime.GOMAXPROCS(0) // 當前P的數量
    local := make([]poolLocal, size)
    atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
    atomic.StoreUintptr(&p.localSize, uintptr(size))         // store-release
    return &local[pid], pid
}

因為allPoolsMu是一個全局Mutex鎖,因此上鎖會比較慢可能被阻塞,所以上鎖前調用runtime_procUnpin方法解除pin的操作;

在解除綁定后,pinSlow 可能被其他的線程調用過了,p.local 可能會發生變化。因此這時候需要再次對 pid 進行檢查。

最后初始化local,並使用原子操作對local和localSize設值,返回當前P對應的local。

到這里pin方法終於講完了。畫一個簡單的圖描述一下這整個流程:

下面我們再回到Get方法中,如果private中沒有值,那么會調用shared的popHead方法獲取值。

popHead#

func (c *poolChain) popHead() (interface{}, bool) {
    d := c.head // 這里頭部是一個poolChainElt
    // 遍歷poolChain鏈表
    for d != nil {  
        // 從poolChainElt的環狀列表中獲取值
        if val, ok := d.popHead(); ok {
            return val, ok
        }
        // There may still be unconsumed elements in the
        // previous dequeue, so try backing up.
        // load poolChain下一個對象
        d = loadPoolChainElt(&d.prev)
    }
    return nil, false
}
 
// popHead removes and returns the element at the head of the queue.
// It returns false if the queue is empty. It must only be called by a
// single producer.
func (d *poolDequeue) popHead() (interface{}, bool) {
    var slot *eface
    for {
        ptrs := atomic.LoadUint64(&d.headTail)
        head, tail := d.unpack(ptrs) // headTail的高32位為head,低32位為tail
        if tail == head {
            // Queue is empty. // 首尾相等,那么這個隊列就是空的
            return nil, false
        }
 
        // Confirm tail and decrement head. We do this before
        // reading the value to take back ownership of this
        // slot.
        head--   // 這里需要head--之后再獲取slot
        ptrs2 := d.pack(head, tail)
        if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
            // We successfully took back slot.
            slot = &d.vals[head&uint32(len(d.vals)-1)]
            break
        }
    }
 
    val := *(*interface{})(unsafe.Pointer(slot))
    if val == dequeueNil(nil) {   // 說明沒取到緩存的對象,返回 nil
        val = nil
    }
    // Zero the slot. Unlike popTail, this isn't racing with
    // pushHead, so we don't need to be careful here.
    *slot = eface{}  // 重置slot 
    return val, true
}

 

poolChain的popHead方法里面會獲取到poolChain的頭結點,不記得poolChain數據結構的同學建議往上面翻一下再回來。接着有個for循環會挨個從poolChain的頭結點往下遍歷,直到獲取對象返回。

  • poolDequeue的popHead方法首先會獲取到headTail的值,然后調用unpack解包,headTail是一個64位的值,高32位表示head,低32位表示tail。

  • 判斷head和tail是否相等,相等那么這個隊列就是空的;

  • 如果隊列不是空的,那么將head減一之后再使用,因為head當前指的位置是空值,表示下一個新對象存放的位置;

  • CAS重新設值新的headTail,成功之后獲取slot,這里因為vals大小是2的n 次冪,因此len(d.vals)-1)之后低n位全是1,和head取與之后可以獲取到head的低n位的值;

  • 如果slot所對應的對象是dequeueNil,那么表示是空值,直接返回,否則將slot指針對應位置的值置空,返回val。

如果shared的popHead方法也沒獲取到值,那么就需要調用getSlow方法獲取了。

getSlow#

 // 從其他P的共享緩沖區偷取 obj
func (p *Pool) getSlow(pid int) interface{} {
    // See the comment in pin regarding ordering of the loads.
    size := atomic.LoadUintptr(&p.localSize) // load-acquire 獲取當前 poolLocal 的大小
    locals := p.local                        // load-consume 獲取當前 poolLocal
    // Try to steal one element from other procs.
    // 遍歷locals列表,從其他的local的shared列表尾部獲取對象
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i+1)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }
 
    // Try the victim cache. We do this after attempting to steal
    // from all primary caches because we want objects in the
    // victim cache to age out if at all possible.
    size = atomic.LoadUintptr(&p.victimSize)
    if uintptr(pid) >= size {
        return nil
    }
    locals = p.victim
    l := indexLocal(locals, pid)
    // victim的private不為空則返回
    if x := l.private; x != nil {
        l.private = nil
        return x
    }
    //  遍歷victim對應的locals列表,從其他的local的shared列表尾部獲取對象
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }
 
    // Mark the victim cache as empty for future gets don't bother
    // with it.
    // 獲取不到,將victimSize置為0
    atomic.StoreUintptr(&p.victimSize, 0)
 
    return nil
}

 

getSlow方法會遍歷locals列表,這里需要注意的是,遍歷是從索引為 pid+1 的 poolLocal 處開始,嘗試調用shared的popTail方法獲取對象;如果沒有拿到,則從 victim 里找。如果都沒找到,那么就將victimSize置為0,下次就不找victim了。

poolChain&popTail#

func (c *poolChain) popTail() (interface{}, bool) {
    d := loadPoolChainElt(&c.tail)
    if d == nil {
        return nil, false  // 如果最后一個節點是空的,那么直接返回
    }
 
    for {
        // It's important that we load the next pointer
        // *before* popping the tail. In general, d may be
        // transiently empty, but if next is non-nil before
        // the pop and the pop fails, then d is permanently
        // empty, which is the only condition under which it's
        // safe to drop d from the chain.
        // 這里獲取的是next節點,與一般的雙向鏈表是相反的
        d2 := loadPoolChainElt(&d.next)
 
        if val, ok := d.popTail(); ok {
            return val, ok
        }
 
        if d2 == nil {
            // This is the only dequeue. It's empty right
            // now, but could be pushed to in the future.
            return nil, false
        }
 
        // The tail of the chain has been drained, so move on
        // to the next dequeue. Try to drop it from the chain
        // so the next pop doesn't have to look at the empty
        // dequeue again.
        // 因為d已經沒有數據了,所以重置tail為d2,並刪除d2的上一個節點
        if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
            // We won the race. Clear the prev pointer so
            // the garbage collector can collect the empty
            // dequeue and so popHead doesn't back up
            // further than necessary.
            storePoolChainElt(&d2.prev, nil)
        }
        d = d2
    }
}
  • 判斷poolChain,如果最后一個節點是空的,那么直接返回;
  • 進入for循環,獲取tail的next節點,這里需要注意的是這個雙向鏈表與一般的鏈表是反向的,不清楚的可以再去看看第一張圖;
  • 調用popTail獲取poolDequeue列表的對象,有對象直接返回;
  • d2為空則表示已經遍歷完整個poolChain雙向列表了,都為空,那么直接返回;
  • 通過CAS將tail重置為d2,因為d已經沒有數據了,並將d2的prev節點置為nil,然后將d置為d2,進入下一個循環;

poolDequeue&popTail#

// popTail removes and returns the element at the tail of the queue.
// It returns false if the queue is empty. It may be called by any
// number of consumers.
func (d *poolDequeue) popTail() (interface{}, bool) {
    var slot *eface
    for {
        ptrs := atomic.LoadUint64(&d.headTail)
        head, tail := d.unpack(ptrs) // 和pophead一樣,將headTail解包
        if tail == head {
            // Queue is empty. // 首位相等,表示列表中沒有數據,返回
            return nil, false
        }
 
        // Confirm head and tail (for our speculative check
        // above) and increment tail. If this succeeds, then
        // we own the slot at tail.
        ptrs2 := d.pack(head, tail+1)
        // CAS重置tail位置
        if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
            // Success.
            // 獲取tail位置對象
            slot = &d.vals[tail&uint32(len(d.vals)-1)]
            break
        }
    }
 
    // We now own slot.
    val := *(*interface{})(unsafe.Pointer(slot))
    if val == dequeueNil(nil) {
        val = nil 
    }
 
    // Tell pushHead that we're done with this slot. Zeroing the
    // slot is also important so we don't leave behind references
    // that could keep this object live longer than necessary.
    //
    // We write to val first and then publish that we're done with
    // this slot by atomically writing to typ.
    slot.val = nil
    atomic.StorePointer(&slot.typ, nil)
    // At this point pushHead owns the slot.
 
    return val, true
}

 如果看懂了popHead,那么這個popTail方法是和它非常的相近的。

popTail簡單來說也是從隊列尾部移除一個元素,如果隊列為空,返回 false。但是需要注意的是,這個popTail可能會被多個消費者調用,所以需要循環CAS獲取對象;在poolDequeue環狀列表中tail是有數據的,不必像popHead中head--

最后,需要將slot置空。

大家可以再對照一下圖回顧一下代碼:

Put方法#

// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
    if x == nil {
        return
    }
    if race.Enabled {
        if fastrand()%4 == 0 {
            // Randomly drop x on floor.
            return
        }
        race.ReleaseMerge(poolRaceAddr(x))
        race.Disable()
    }
    l, _ := p.pin() // 先獲得當前P綁定的 localPool
    if l.private == nil {
        l.private = x
        x = nil
    }
    if x != nil {
        l.shared.pushHead(x)
    }
    // 調用方必須在完成取值后調用 runtime_procUnpin() 來取消禁用搶占
    runtime_procUnpin() 
    if race.Enabled {
        race.Enable()
    }
}

 

看完了Get方法,看Put方法就容易多了。同樣Put方法首先會去Pin住當前goroutine和P,然后嘗試將 x 賦值給 private 字段。如果private不為空,那么就調用pushHead將其放入到shared隊列中。

poolChain&pushHead#

func (c *poolChain) pushHead(val interface{}) {
    d := c.head
   // 頭節點沒有初始化,那么設值一下
    if d == nil {
        // Initialize the chain.
        const initSize = 8 // Must be a power of 2
        d = new(poolChainElt)
        d.vals = make([]eface, initSize)
        c.head = d
        storePoolChainElt(&c.tail, d)
    }
    // 將對象加入到環狀隊列中
    if d.pushHead(val) {
        return
    }
 
    // The current dequeue is full. Allocate a new one of twice
    // the size.
    newSize := len(d.vals) * 2
    // 這里做了限制,單個環狀隊列不能超過2的30次方大小
    if newSize >= dequeueLimit {
        // Can't make it any bigger.
        newSize = dequeueLimit
    }
    // 初始化新的環狀列表,大小是d的兩倍
    d2 := &poolChainElt{prev: d}
    d2.vals = make([]eface, newSize)
    c.head = d2
    storePoolChainElt(&d.next, d2)
    // push到新的隊列中
    d2.pushHead(val)
}

 

如果頭節點為空,那么需要創建一個新的poolChainElt對象作為頭節點,大小為8;然后調用pushHead放入到環狀隊列中;

如果放置失敗,那么創建一個 poolChainElt 節點,並且雙端隊列的長度翻倍,當然長度也不能超過dequeueLimit,即2的30次方;

然后將新的節點d2和d互相綁定一下,並將d2設值為頭節點,將傳入的對象push到d2中;

poolDequeue&pushHead#

// pushHead adds val at the head of the queue. It returns false if the
// queue is full. It must only be called by a single producer.
func (d *poolDequeue) pushHead(val interface{}) bool {
    ptrs := atomic.LoadUint64(&d.headTail)
    head, tail := d.unpack(ptrs) // 解包headTail
    // 判斷隊列是否已滿
    if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
        // Queue is full.
        return false
    }
    slot := &d.vals[head&uint32(len(d.vals)-1)] // 找到head的槽位
 
    // Check if the head slot has been released by popTail.
    typ := atomic.LoadPointer(&slot.typ) // 檢查slot是否和popTail有沖突
    if typ != nil {
        // Another goroutine is still cleaning up the tail, so
        // the queue is actually still full.
        return false
    }
 
    // The head slot is free, so we own it.
    if val == nil {
        val = dequeueNil(nil)
    }
   // 將 val 賦值到 slot,並將 head 指針值加 1
    *(*interface{})(unsafe.Pointer(slot)) = val
 
    // Increment head. This passes ownership of slot to popTail
    // and acts as a store barrier for writing the slot.
    atomic.AddUint64(&d.headTail, 1<<dequeueBits)
    return true
}

首先通過位運算判斷隊列是否已滿,也就是將尾部指針加上 len(d.vals) ,因為head指向的是將要被填充的位置,所以head和tail位置是相隔len(d.vals),然后再取低 31 位,看它是否和 head 相等。如果隊列滿了,直接返回 false;

然后找到找到head的槽位slot,並判斷typ是否為空,因為popTail 是先設置 val,再將 typ 設置為 nil,所以如果有沖突,那么直接返回;

最后設值slot,並將head加1返回;

GC#

在pool.go文件的 init 函數里,注冊了 GC 發生時,如何清理 Pool 的函數:

 
func init() {
    runtime_registerPoolCleanup(poolCleanup)
}
// 當 stop the world  (STW) 來臨,在 GC 之前會調用該函數
func poolCleanup() {
    // This function is called with the world stopped, at the beginning of a garbage collection.
    // It must not allocate and probably should not call any runtime functions.
 
    // Because the world is stopped, no pool user can be in a
    // pinned section (in effect, this has all Ps pinned).
 
    // Drop victim caches from all pools.
    for _, p := range oldPools {
        p.victim = nil
        p.victimSize = 0
    }
 
    // Move primary cache to victim cache.
    for _, p := range allPools {
        p.victim = p.local
        p.victimSize = p.localSize
        p.local = nil
        p.localSize = 0
    }
 
    // The pools with non-empty primary caches now have non-empty
    // victim caches and no pools have primary caches.
    oldPools, allPools = allPools, nil
}

 poolCleanup 會在 STW 階段被調用。主要是將 local 和 victim 作交換,那么不至於GC 把所有的 Pool 都清空了,而是需要兩個 GC 周期才會被釋放。如果 sync.Pool 的獲取、釋放速度穩定,那么就不會有新的池對象進行分配。存在Pool中的對象會在沒有任何通知的情況下被自動移除掉。實際上,這個清理過程是在每次垃圾回收之前做的。垃圾回收是固定兩分鍾觸發一次。而且每次清理會將Pool中的所有對象都清理掉!

總結#

整個設計充分利用了go.runtime的調度器優勢:一個P下goroutine競爭的無鎖化;

一個goroutine固定在一個局部調度器P上,從當前 P 對應的 poolLocal 取值, 若取不到,則從對應的 shared 數組上取,若還是取不到;則嘗試從其他 P 的 shared 中偷。 若偷不到,則調用 New 創建一個新的對象。池中所有臨時對象在一次 GC 后會被全部清空。

通過以上的解讀,我們可以看到,Get方法並不會對獲取到的對象值做任何的保證,因為放入本地池中的值有可能會在任何時候被刪除,但是不通知調用者。放入共享池中的值有可能被其他的goroutine偷走。 所以對象池比較適合用來存儲一些臨時且狀態無關的數據,但是不適合用來存儲數據庫連接的實例,因為存入對象池重的值有可能會在垃圾回收時被刪除掉,這違反了數據庫連接池建立的初衷。

根據上面的說法,Golang的對象池嚴格意義上來說是一個臨時的對象池,適用於儲存一些會在goroutine間分享的臨時對象。主要作用是減少GC,提高性能。在Golang中最常見的使用場景是fmt包中的輸出緩沖區。

在Golang中如果要實現連接池的效果,可以用container/list來實現,開源界也有一些現成的實現,比如go-commons-pool,具體的讀者可以去自行了解。

Go語言的goroutine雖然可以創建很多,但是真正能物理上並發運行的goroutine數量是有限的,是由runtime.GOMAXPROCS(0)設置的。所以這個Pool高效的設計的地方就在於將數據分散在了各個真正並發的線程中,每個線程優先從自己的poolLocal中獲取數據,很大程度上降低了鎖競爭。 

Reference#

https://www.cnblogs.com/luozhiyun/p/14194872.html

https://blog.csdn.net/u010853261/article/details/90647884

https://www.cnblogs.com/wsw-seu/p/12402267.html

https://blog.csdn.net/qq_25870633/article/details/83448234


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM