Pool介紹#
總所周知Go 是一個自動垃圾回收的編程語言,采用三色並發標記算法標記對象並回收。如果你想使用 Go 開發一個高性能的應用程序的話,就必須考慮垃圾回收給性能帶來的影響。因為Go 在垃圾回收的時候會有一個STW(stop-the-world,程序暫停)的時間,並且如果對象太多,做標記也需要時間。所以如果采用對象池來創建對象,增加對象的重復利用率,使用的時候就不必在堆上重新創建對象可以節省開銷。在Go中,golang提供了對象重用的機制,也就是sync.Pool對象池。 sync.Pool是可伸縮的,並發安全的。其大小僅受限於內存的大小,可以被看作是一個存放可重用對象的值的容器。 設計的目的是存放已經分配的但是暫時不用的對象,在需要用到的時候直接從pool中取。
任何存放區其中的值可以在任何時候被刪除而不通知,在高負載下可以動態的擴容,在不活躍時對象池會收縮。它對外提供了三個方法:New、Get 和 Put。下面用一個簡短的例子來說明一下Pool使用:
package main import ( "fmt" "sync" ) var pool *sync.Pool type Person struct { Name string } func init() { pool = &sync.Pool{ New: func() interface{} { fmt.Println("creating a new person") return new(Person) }, } } func main() { person := pool.Get().(*Person) fmt.Println("Get Pool Object1:", person) person.Name = "first" pool.Put(person) fmt.Println("Get Pool Object2:", pool.Get().(*Person)) fmt.Println("Get Pool Object3:", pool.Get().(*Person)) }
結果:
creating a new person Get Pool Object1: &{} Get Pool Object2: &{first} creating a new person Get Pool Object3: &{}
這里我用了init方法初始化了一個pool,然后get了三次,put了一次到pool中,如果pool中沒有對象,那么會調用New函數創建一個新的對象,否則會從put進去的對象中獲取。
存儲在池中的任何項目都可以隨時自動刪除,並且不會被通知。Pool可以安全地同時使用多個goroutine。池的目的是緩存已分配但未使用的對象以供以后重用,從而減輕對gc的壓力。也就是說,它可以輕松構建高效,線程安全的free列表。但是,它不適用於所有free列表。池的適當使用是管理一組默認共享的臨時項,並且可能由包的並發獨立客戶端重用。池提供了一種在許多客戶端上分攤分配開銷的方法。很好地使用池的一個例子是fmt包,它維護一個動態大小的臨時輸出緩沖區存儲。底層存儲隊列在負載下(當許多goroutine正在積極打印時)進行縮放,並在靜止時收縮。另一方面,作為短期對象的一部分維護的空閑列表不適合用於池, 因為在該場景中開銷不能很好地攤銷。 使這些對象實現自己的空閑列表更有效。首次使用后不得復制池。
pool 的兩個特點
1、在本地私有池和本地共享池均獲取 obj 失敗時,則會從其他p偷一個 obj 返回給調用方。
2、obj在池中的生命周期取決於垃圾回收任務的下一次執行時間,並且從池中獲取到的值可能是 put 進去的其中一個值,也可能是 newfun處 新生成的一個值,在應用時很容易入坑。
在多個goroutine之間使用同一個pool做到高效,是因為sync.pool為每個P都分配了一個子池,
當執行一個pool的get或者put操作的時候都會先把當前的goroutine固定到某個P的子池上面,
然后再對該子池進行操作。每個子池里面有一個私有對象和共享列表對象,
私有對象是只有對應的P能夠訪問,因為一個P同一時間只能執行一個goroutine,
【因此對私有對象存取操作是不需要加鎖的】。
源碼分析
type Pool struct { // 不允許復制,一個結構體,有一個Lock()方法,嵌入別的結構體中,表示不允許復制 // noCopy對象,擁有一個Lock方法,使得Cond對象在進行go vet掃描的時候,能夠被檢測到是否被復制 noCopy noCopy //local 和 localSize 維護一個動態 poolLocal 數組 // 每個固定大小的池, 真實類型是 [P]poolLocal // 其實就是一個[P]poolLocal 的指針地址 local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal localSize uintptr // size of the local array victim unsafe.Pointer // local from previous cycle victimSize uintptr // size of victims array // New optionally specifies a function to generate // a value when Get would otherwise return nil. // It may not be changed concurrently with calls to Get. // New 是一個回調函數指針,當Get 獲取到目標對象為 nil 時,需要調用此處的回調函數用於生成 新的對象 New func() interface{} }
-
Pool結構體里面noCopy代表這個結構體是禁止拷貝的,它可以在我們使用
go vet工具的時候生效;
local是一個poolLocal數組的指針,localSize代表這個數組的大小;同樣victim也是一個poolLocal數組的指針,每次垃圾回收的時候,Pool 會把 victim 中的對象移除,然后把 local 的數據給 victim;local和victim的邏輯我們下面會詳細介紹到。
New函數是在創建pool的時候設置的,當pool沒有緩存對象的時候,會調用New方法生成一個新的對象。
下面我們對照着pool的結構圖往下講,避免找不到北:

// Local per-P Pool appendix. /* 因為poolLocal中的對象可能會被其他P偷走, private域保證這個P不會被偷光,至少保留一個對象供自己用。 否則,如果這個P只剩一個對象,被偷走了, 那么當它本身需要對象時又要從別的P偷回來,造成了不必要的開銷 */ type poolLocalInternal struct { private interface{} // Can be used only by the respective P. shared poolChain // Local P can pushHead/popHead; any P can popTail. } type poolLocal struct { poolLocalInternal // Prevents false sharing on widespread platforms with // 128 mod (cache line size) = 0 . /** cache使用中常見的一個問題是false sharing。 當不同的線程同時讀寫同一cache line上不同數據時就可能發生false sharing。 false sharing會導致多核處理器上嚴重的系統性能下降。 字節對齊,避免 false sharing (偽共享) */ pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte }
local字段存儲的是一個poolLocal數組的指針,poolLocal數組大小是goroutine中P的數量,訪問時,P的id對應poolLocal數組下標索引,所以Pool的最大個數runtime.GOMAXPROCS(0)。
通過這樣的設計,每個P都有了自己的本地空間,多個 goroutine 使用同一個 Pool 時,減少了競爭,提升了性能。如果對goroutine的P、G、M有疑惑的同學不妨看看這篇文章:The Go scheduler。
poolLocal里面有一個pad數組用來占位用,防止在 cache line 上分配多個 poolLocalInternal從而造成false sharing,cache使用中常見的一個問題是false sharing。當不同的線程同時讀寫同一cache line上不同數據時就可能發生false sharing。false sharing會導致多核處理器上嚴重的系統性能下降。具體的可以參考偽共享(False Sharing)。
poolLocalInternal包含兩個字段private和shared。
private代表緩存的一個元素,只能由相應的一個 P 存取。因為一個 P 同時只能執行一個 goroutine,所以不會有並發的問題;所以無需加鎖
shared則可以由任意的 P 訪問,但是只有本地的 P 才能 pushHead/popHead,其它 P 可以 popTail。因為可能有多個goroutine同時操作,所以需要加鎖。
type poolChain struct { // head is the poolDequeue to push to. This is only accessed // by the producer, so doesn't need to be synchronized. head *poolChainElt // tail is the poolDequeue to popTail from. This is accessed // by consumers, so reads and writes must be atomic. tail *poolChainElt } type poolChainElt struct { poolDequeue // next and prev link to the adjacent poolChainElts in this // poolChain. // // next is written atomically by the producer and read // atomically by the consumer. It only transitions from nil to // non-nil. // // prev is written atomically by the consumer and read // atomically by the producer. It only transitions from // non-nil to nil. next, prev *poolChainElt } type poolDequeue struct { // headTail packs together a 32-bit head index and a 32-bit // tail index. Both are indexes into vals modulo len(vals)-1. // // tail = index of oldest data in queue // head = index of next slot to fill // // Slots in the range [tail, head) are owned by consumers. // A consumer continues to own a slot outside this range until // it nils the slot, at which point ownership passes to the // producer. // // The head index is stored in the most-significant bits so // that we can atomically add to it and the overflow is // harmless. headTail uint64 // vals is a ring buffer of interface{} values stored in this // dequeue. The size of this must be a power of 2. // // vals[i].typ is nil if the slot is empty and non-nil // otherwise. A slot is still in use until *both* the tail // index has moved beyond it and typ has been set to nil. This // is set to nil atomically by the consumer and read // atomically by the producer. vals []eface } type eface struct { typ, val unsafe.Pointer }
poolChain是一個雙端隊列,里面的head和tail分別指向隊列頭尾;poolDequeue里面存放真正的數據,是一個單生產者、多消費者的固定大小的無鎖的環狀隊列,headTail是環狀隊列的首位位置的指針,可以通過位運算解析出首尾的位置,生產者可以從 head 插入、head 刪除,而消費者僅可從 tail 刪除。
這個雙端隊列的模型大概是這個樣子:

poolDequeue里面的環狀隊列大小是固定的,后面分析源碼我們會看到,當環狀隊列滿了的時候會創建一個size是原來兩倍大小的環狀隊列。大家這張圖好好體會一下,會反復用到。
Get方法#
// Get selects an arbitrary item from the Pool, removes it from the // Pool, and returns it to the caller. // Get may choose to ignore the pool and treat it as empty. // Callers should not assume any relation between values passed to Put and // the values returned by Get. // // If Get would otherwise return nil and p.New is non-nil, Get returns // the result of calling p.New. func (p *Pool) Get() interface{} { if race.Enabled { race.Disable() } l, pid := p.pin() //1.把當前goroutine綁定在當前的P上 x := l.private //2.優先從local的private中獲取 l.private = nil if x == nil { // Try to pop the head of the local shard. We prefer // the head over the tail for temporal locality of // reuse. x, _ = l.shared.popHead() //3,private沒有,那么從shared的頭部獲取 if x == nil { x = p.getSlow(pid) //4. 如果都沒有,那么去別的local上去偷一個 } } runtime_procUnpin() //解除搶占 if race.Enabled { race.Enable() if x != nil { race.Acquire(poolRaceAddr(x)) } } //5. 如果沒有獲取到,嘗試使用New函數生成一個新的 if x == nil && p.New != nil { x = p.New() } return x }
-
這一段代碼首先會將當前goroutine綁定在當前的P上返回對應的local,然后嘗試從local的private中獲取,然后需要把private字段置空,因為已經拿到了想要的對象;
-
private中獲取不到,那么就去shared的頭部獲取;
-
shared也沒有,那么嘗試遍歷所有的 local,嘗試從它們的 shared 彈出一個元素;
-
最后如果還是沒有,那么就直接調用預先設置好的 New 函數,創建一個出來。
pin#
// pin 會將當前 goroutine 訂到 P 上, 禁止搶占(preemption) 並從 poolLocal 池中返回 P 對應的 poolLocal // 調用方必須在完成取值后調用 runtime_procUnpin() 來取消禁止搶占。 // pin pins the current goroutine to P, disables preemption and // returns poolLocal pool for the P and the P's id. // Caller must call runtime_procUnpin() when done with the pool. func (p *Pool) pin() (*poolLocal, int) { pid := runtime_procPin() // In pinSlow we store to local and then to localSize, here we load in opposite order. // Since we've disabled preemption, GC cannot happen in between. // Thus here we must observe local at least as large localSize. // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness). // 因為可能存在動態的 P(運行時調整 P 的個數)procresize/GOMAXPROCS // 如果 P.id 沒有越界,則直接返回 PID /** 具體的邏輯就是首先拿到當前的pid, 然后以pid作為index找到local中的poolLocal, 但是如果pid大於了localsize, 說明當前線程的poollocal不存在,就會新創建一個poolLocal */ s := atomic.LoadUintptr(&p.localSize) // load-acquire l := p.local // load-consume if uintptr(pid) < s { return indexLocal(l, pid), pid } // 沒有結果時,涉及全局加鎖 // 例如重新分配數組內存,添加到全局列表 return p.pinSlow() } func indexLocal(l unsafe.Pointer, i int) *poolLocal { lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{})) return (*poolLocal)(lp) }
pin方法里面首先會調用runtime_procPin方法會先獲取當前goroutine,然后綁定到對應的M上,然后返回M目前綁定的P的id,因為這個pid后面會用到,防止在使用途中P被搶占,具體的細節可以看這篇:https://zhuanlan.zhihu.com/p/99710992。接下來會使用原子操作取出localSize,如果當前pid大於localSize,那么就表示Pool還沒創建對應的poolLocal,那么調用pinSlow進行創建工作,否則調用indexLocal取出pid對應的poolLocal返回。
indexLocal里面是使用了地址操作,傳入的i是數組的index值,所以需要獲取poolLocal{}的size做一下地址的位移操作,然后再轉成轉成poolLocal地址返回。
pinSlow#
func (p *Pool) pinSlow() (*poolLocal, int) { // Retry under the mutex. // Can not lock the mutex while pinned. //因為需要對全局進行加鎖,pinSlow() 會首先取消 P 的不可搶占,然后使用 allPoolsMu 進行加鎖 runtime_procUnpin() // 解除pin allPoolsMu.Lock() // 加上全局鎖 defer allPoolsMu.Unlock() pid := runtime_procPin() // pin住 // poolCleanup won't be called while we are pinned. s := p.localSize l := p.local // 重新對pid進行檢查 再次檢查是否符合條件,因為可能中途已被其他線程調用 // 當再次固定 P 時 poolCleanup 不會被調用 if uintptr(pid) < s { return indexLocal(l, pid), pid } // 初始化local前會將pool放入到allPools數組中 if p.local == nil { allPools = append(allPools, p) } // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. size := runtime.GOMAXPROCS(0) // 當前P的數量 local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release return &local[pid], pid }
因為allPoolsMu是一個全局Mutex鎖,因此上鎖會比較慢可能被阻塞,所以上鎖前調用runtime_procUnpin方法解除pin的操作;
在解除綁定后,pinSlow 可能被其他的線程調用過了,p.local 可能會發生變化。因此這時候需要再次對 pid 進行檢查。
最后初始化local,並使用原子操作對local和localSize設值,返回當前P對應的local。
到這里pin方法終於講完了。畫一個簡單的圖描述一下這整個流程:

下面我們再回到Get方法中,如果private中沒有值,那么會調用shared的popHead方法獲取值。
popHead#
func (c *poolChain) popHead() (interface{}, bool) { d := c.head // 這里頭部是一個poolChainElt // 遍歷poolChain鏈表 for d != nil { // 從poolChainElt的環狀列表中獲取值 if val, ok := d.popHead(); ok { return val, ok } // There may still be unconsumed elements in the // previous dequeue, so try backing up. // load poolChain下一個對象 d = loadPoolChainElt(&d.prev) } return nil, false } // popHead removes and returns the element at the head of the queue. // It returns false if the queue is empty. It must only be called by a // single producer. func (d *poolDequeue) popHead() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // headTail的高32位為head,低32位為tail if tail == head { // Queue is empty. // 首尾相等,那么這個隊列就是空的 return nil, false } // Confirm tail and decrement head. We do this before // reading the value to take back ownership of this // slot. head-- // 這里需要head--之后再獲取slot ptrs2 := d.pack(head, tail) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { // We successfully took back slot. slot = &d.vals[head&uint32(len(d.vals)-1)] break } } val := *(*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { // 說明沒取到緩存的對象,返回 nil val = nil } // Zero the slot. Unlike popTail, this isn't racing with // pushHead, so we don't need to be careful here. *slot = eface{} // 重置slot return val, true }
poolChain的popHead方法里面會獲取到poolChain的頭結點,不記得poolChain數據結構的同學建議往上面翻一下再回來。接着有個for循環會挨個從poolChain的頭結點往下遍歷,直到獲取對象返回。
-
poolDequeue的popHead方法首先會獲取到headTail的值,然后調用unpack解包,headTail是一個64位的值,高32位表示head,低32位表示tail。
-
判斷head和tail是否相等,相等那么這個隊列就是空的;
-
如果隊列不是空的,那么將head減一之后再使用,因為head當前指的位置是空值,表示下一個新對象存放的位置;
-
CAS重新設值新的headTail,成功之后獲取slot,這里因為vals大小是2的n 次冪,因此
len(d.vals)-1)之后低n位全是1,和head取與之后可以獲取到head的低n位的值; -
如果slot所對應的對象是dequeueNil,那么表示是空值,直接返回,否則將slot指針對應位置的值置空,返回val。
如果shared的popHead方法也沒獲取到值,那么就需要調用getSlow方法獲取了。
getSlow#
// 從其他P的共享緩沖區偷取 obj func (p *Pool) getSlow(pid int) interface{} { // See the comment in pin regarding ordering of the loads. size := atomic.LoadUintptr(&p.localSize) // load-acquire 獲取當前 poolLocal 的大小 locals := p.local // load-consume 獲取當前 poolLocal // Try to steal one element from other procs. // 遍歷locals列表,從其他的local的shared列表尾部獲取對象 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i+1)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // Try the victim cache. We do this after attempting to steal // from all primary caches because we want objects in the // victim cache to age out if at all possible. size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) // victim的private不為空則返回 if x := l.private; x != nil { l.private = nil return x } // 遍歷victim對應的locals列表,從其他的local的shared列表尾部獲取對象 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // Mark the victim cache as empty for future gets don't bother // with it. // 獲取不到,將victimSize置為0 atomic.StoreUintptr(&p.victimSize, 0) return nil }
getSlow方法會遍歷locals列表,這里需要注意的是,遍歷是從索引為 pid+1 的 poolLocal 處開始,嘗試調用shared的popTail方法獲取對象;如果沒有拿到,則從 victim 里找。如果都沒找到,那么就將victimSize置為0,下次就不找victim了。
poolChain&popTail#
func (c *poolChain) popTail() (interface{}, bool) { d := loadPoolChainElt(&c.tail) if d == nil { return nil, false // 如果最后一個節點是空的,那么直接返回 } for { // It's important that we load the next pointer // *before* popping the tail. In general, d may be // transiently empty, but if next is non-nil before // the pop and the pop fails, then d is permanently // empty, which is the only condition under which it's // safe to drop d from the chain. // 這里獲取的是next節點,與一般的雙向鏈表是相反的 d2 := loadPoolChainElt(&d.next) if val, ok := d.popTail(); ok { return val, ok } if d2 == nil { // This is the only dequeue. It's empty right // now, but could be pushed to in the future. return nil, false } // The tail of the chain has been drained, so move on // to the next dequeue. Try to drop it from the chain // so the next pop doesn't have to look at the empty // dequeue again. // 因為d已經沒有數據了,所以重置tail為d2,並刪除d2的上一個節點 if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { // We won the race. Clear the prev pointer so // the garbage collector can collect the empty // dequeue and so popHead doesn't back up // further than necessary. storePoolChainElt(&d2.prev, nil) } d = d2 } }
- 判斷poolChain,如果最后一個節點是空的,那么直接返回;
- 進入for循環,獲取tail的next節點,這里需要注意的是這個雙向鏈表與一般的鏈表是反向的,不清楚的可以再去看看第一張圖;
- 調用popTail獲取poolDequeue列表的對象,有對象直接返回;
- d2為空則表示已經遍歷完整個poolChain雙向列表了,都為空,那么直接返回;
- 通過CAS將tail重置為d2,因為d已經沒有數據了,並將d2的prev節點置為nil,然后將d置為d2,進入下一個循環;
poolDequeue&popTail#
// popTail removes and returns the element at the tail of the queue. // It returns false if the queue is empty. It may be called by any // number of consumers. func (d *poolDequeue) popTail() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // 和pophead一樣,將headTail解包 if tail == head { // Queue is empty. // 首位相等,表示列表中沒有數據,返回 return nil, false } // Confirm head and tail (for our speculative check // above) and increment tail. If this succeeds, then // we own the slot at tail. ptrs2 := d.pack(head, tail+1) // CAS重置tail位置 if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { // Success. // 獲取tail位置對象 slot = &d.vals[tail&uint32(len(d.vals)-1)] break } } // We now own slot. val := *(*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } // Tell pushHead that we're done with this slot. Zeroing the // slot is also important so we don't leave behind references // that could keep this object live longer than necessary. // // We write to val first and then publish that we're done with // this slot by atomically writing to typ. slot.val = nil atomic.StorePointer(&slot.typ, nil) // At this point pushHead owns the slot. return val, true }
如果看懂了popHead,那么這個popTail方法是和它非常的相近的。
popTail簡單來說也是從隊列尾部移除一個元素,如果隊列為空,返回 false。但是需要注意的是,這個popTail可能會被多個消費者調用,所以需要循環CAS獲取對象;在poolDequeue環狀列表中tail是有數據的,不必像popHead中head--。
最后,需要將slot置空。
大家可以再對照一下圖回顧一下代碼:

Put方法#
// Put adds x to the pool. func (p *Pool) Put(x interface{}) { if x == nil { return } if race.Enabled { if fastrand()%4 == 0 { // Randomly drop x on floor. return } race.ReleaseMerge(poolRaceAddr(x)) race.Disable() } l, _ := p.pin() // 先獲得當前P綁定的 localPool if l.private == nil { l.private = x x = nil } if x != nil { l.shared.pushHead(x) } // 調用方必須在完成取值后調用 runtime_procUnpin() 來取消禁用搶占 runtime_procUnpin() if race.Enabled { race.Enable() } }
看完了Get方法,看Put方法就容易多了。同樣Put方法首先會去Pin住當前goroutine和P,然后嘗試將 x 賦值給 private 字段。如果private不為空,那么就調用pushHead將其放入到shared隊列中。
poolChain&pushHead#
func (c *poolChain) pushHead(val interface{}) { d := c.head // 頭節點沒有初始化,那么設值一下 if d == nil { // Initialize the chain. const initSize = 8 // Must be a power of 2 d = new(poolChainElt) d.vals = make([]eface, initSize) c.head = d storePoolChainElt(&c.tail, d) } // 將對象加入到環狀隊列中 if d.pushHead(val) { return } // The current dequeue is full. Allocate a new one of twice // the size. newSize := len(d.vals) * 2 // 這里做了限制,單個環狀隊列不能超過2的30次方大小 if newSize >= dequeueLimit { // Can't make it any bigger. newSize = dequeueLimit } // 初始化新的環狀列表,大小是d的兩倍 d2 := &poolChainElt{prev: d} d2.vals = make([]eface, newSize) c.head = d2 storePoolChainElt(&d.next, d2) // push到新的隊列中 d2.pushHead(val) }
如果頭節點為空,那么需要創建一個新的poolChainElt對象作為頭節點,大小為8;然后調用pushHead放入到環狀隊列中;
如果放置失敗,那么創建一個 poolChainElt 節點,並且雙端隊列的長度翻倍,當然長度也不能超過dequeueLimit,即2的30次方;
然后將新的節點d2和d互相綁定一下,並將d2設值為頭節點,將傳入的對象push到d2中;
poolDequeue&pushHead#
// pushHead adds val at the head of the queue. It returns false if the // queue is full. It must only be called by a single producer. func (d *poolDequeue) pushHead(val interface{}) bool { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // 解包headTail // 判斷隊列是否已滿 if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { // Queue is full. return false } slot := &d.vals[head&uint32(len(d.vals)-1)] // 找到head的槽位 // Check if the head slot has been released by popTail. typ := atomic.LoadPointer(&slot.typ) // 檢查slot是否和popTail有沖突 if typ != nil { // Another goroutine is still cleaning up the tail, so // the queue is actually still full. return false } // The head slot is free, so we own it. if val == nil { val = dequeueNil(nil) } // 將 val 賦值到 slot,並將 head 指針值加 1 *(*interface{})(unsafe.Pointer(slot)) = val // Increment head. This passes ownership of slot to popTail // and acts as a store barrier for writing the slot. atomic.AddUint64(&d.headTail, 1<<dequeueBits) return true }
首先通過位運算判斷隊列是否已滿,也就是將尾部指針加上 len(d.vals) ,因為head指向的是將要被填充的位置,所以head和tail位置是相隔len(d.vals),然后再取低 31 位,看它是否和 head 相等。如果隊列滿了,直接返回 false;
然后找到找到head的槽位slot,並判斷typ是否為空,因為popTail 是先設置 val,再將 typ 設置為 nil,所以如果有沖突,那么直接返回;
最后設值slot,並將head加1返回;
GC#
在pool.go文件的 init 函數里,注冊了 GC 發生時,如何清理 Pool 的函數:
func init() { runtime_registerPoolCleanup(poolCleanup) } // 當 stop the world (STW) 來臨,在 GC 之前會調用該函數 func poolCleanup() { // This function is called with the world stopped, at the beginning of a garbage collection. // It must not allocate and probably should not call any runtime functions. // Because the world is stopped, no pool user can be in a // pinned section (in effect, this has all Ps pinned). // Drop victim caches from all pools. for _, p := range oldPools { p.victim = nil p.victimSize = 0 } // Move primary cache to victim cache. for _, p := range allPools { p.victim = p.local p.victimSize = p.localSize p.local = nil p.localSize = 0 } // The pools with non-empty primary caches now have non-empty // victim caches and no pools have primary caches. oldPools, allPools = allPools, nil }
poolCleanup 會在 STW 階段被調用。主要是將 local 和 victim 作交換,那么不至於GC 把所有的 Pool 都清空了,而是需要兩個 GC 周期才會被釋放。如果 sync.Pool 的獲取、釋放速度穩定,那么就不會有新的池對象進行分配。存在Pool中的對象會在沒有任何通知的情況下被自動移除掉。實際上,這個清理過程是在每次垃圾回收之前做的。垃圾回收是固定兩分鍾觸發一次。而且每次清理會將Pool中的所有對象都清理掉!
總結#

整個設計充分利用了go.runtime的調度器優勢:一個P下goroutine競爭的無鎖化;
一個goroutine固定在一個局部調度器P上,從當前 P 對應的 poolLocal 取值, 若取不到,則從對應的 shared 數組上取,若還是取不到;則嘗試從其他 P 的 shared 中偷。 若偷不到,則調用 New 創建一個新的對象。池中所有臨時對象在一次 GC 后會被全部清空。
通過以上的解讀,我們可以看到,Get方法並不會對獲取到的對象值做任何的保證,因為放入本地池中的值有可能會在任何時候被刪除,但是不通知調用者。放入共享池中的值有可能被其他的goroutine偷走。 所以對象池比較適合用來存儲一些臨時且狀態無關的數據,但是不適合用來存儲數據庫連接的實例,因為存入對象池重的值有可能會在垃圾回收時被刪除掉,這違反了數據庫連接池建立的初衷。
根據上面的說法,Golang的對象池嚴格意義上來說是一個臨時的對象池,適用於儲存一些會在goroutine間分享的臨時對象。主要作用是減少GC,提高性能。在Golang中最常見的使用場景是fmt包中的輸出緩沖區。
在Golang中如果要實現連接池的效果,可以用container/list來實現,開源界也有一些現成的實現,比如go-commons-pool,具體的讀者可以去自行了解。
Go語言的goroutine雖然可以創建很多,但是真正能物理上並發運行的goroutine數量是有限的,是由runtime.GOMAXPROCS(0)設置的。所以這個Pool高效的設計的地方就在於將數據分散在了各個真正並發的線程中,每個線程優先從自己的poolLocal中獲取數據,很大程度上降低了鎖競爭。
Reference#
https://www.cnblogs.com/luozhiyun/p/14194872.html
https://blog.csdn.net/u010853261/article/details/90647884
