轉載請聲明出處哦~,本篇文章發布於luozhiyun的博客:https://www.luozhiyun.com
本文使用的go的源碼時14.4
Pool介紹
總所周知Go 是一個自動垃圾回收的編程語言,采用三色並發標記算法標記對象並回收。如果你想使用 Go 開發一個高性能的應用程序的話,就必須考慮垃圾回收給性能帶來的影響。因為Go 在垃圾回收的時候會有一個STW(stop-the-world,程序暫停)的時間,並且如果對象太多,做標記也需要時間。
所以如果采用對象池來創建對象,增加對象的重復利用率,使用的時候就不必在堆上重新創建對象可以節省開銷。
在Go中,sync.Pool提供了對象池的功能。它對外提供了三個方法:New、Get 和 Put。下面用一個簡短的例子來說明一下Pool使用:
var pool *sync.Pool
type Person struct {
Name string
}
func init() {
pool = &sync.Pool{
New: func() interface{}{
fmt.Println("creating a new person")
return new(Person)
},
}
}
func main() {
person := pool.Get().(*Person)
fmt.Println("Get Pool Object:", person)
person.Name = "first"
pool.Put(person)
fmt.Println("Get Pool Object:",pool.Get().(*Person))
fmt.Println("Get Pool Object:",pool.Get().(*Person))
}
結果:
creating a new person
Get Pool Object: &{}
Get Pool Object: &{first}
creating a new person
Get Pool Object: &{}
這里我用了init方法初始化了一個pool,然后get了三次,put了一次到pool中,如果pool中沒有對象,那么會調用New函數創建一個新的對象,否則會重put進去的對象中獲取。
源碼分析
type Pool struct {
noCopy noCopy
local unsafe.Pointer
localSize uintptr
victim unsafe.Pointer
victimSize uintptr
New func() interface{}
}
Pool結構體里面noCopy代表這個結構體是禁止拷貝的,它可以在我們使用 go vet
工具的時候生效;
local是一個poolLocal數組的指針,localSize代表這個數組的大小;同樣victim也是一個poolLocal數組的指針,每次垃圾回收的時候,Pool 會把 victim 中的對象移除,然后把 local 的數據給 victim;local和victim的邏輯我們下面會詳細介紹到。
New函數是在創建pool的時候設置的,當pool沒有緩存對象的時候,會調用New方法生成一個新的對象。
下面我們對照着pool的結構圖往下講,避免找不到北:
type poolLocal struct {
poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
local字段存儲的是一個poolLocal數組的指針,poolLocal數組大小是goroutine中P的數量,訪問時,P的id對應poolLocal數組下標索引,所以Pool的最大個數runtime.GOMAXPROCS(0)。
通過這樣的設計,每個P都有了自己的本地空間,多個 goroutine 使用同一個 Pool 時,減少了競爭,提升了性能。如果對goroutine的P、G、M有疑惑的同學不妨看看這篇文章:The Go scheduler。
poolLocal里面有一個pad數組用來占位用,防止在 cache line 上分配多個 poolLocalInternal從而造成false sharing,有關於false sharing可以看看這篇文章:
What’s false sharing and how to solve it ,文中對於false sharing的定義:
That’s what false sharing is: one core update a variable would force other cores to update cache either.
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
poolLocalInternal包含兩個字段private和shared。
private代表緩存的一個元素,只能由相應的一個 P 存取。因為一個 P 同時只能執行一個 goroutine,所以不會有並發的問題;
shared則可以由任意的 P 訪問,但是只有本地的 P 才能 pushHead/popHead,其它 P 可以 popTail。
type poolChain struct {
head *poolChainElt
tail *poolChainElt
}
type poolChainElt struct {
poolDequeue
next, prev *poolChainElt
}
type poolDequeue struct {
headTail uint64
vals []eface
}
poolChain是一個雙端隊列,里面的head和tail分別指向隊列頭尾;poolDequeue里面存放真正的數據,是一個單生產者、多消費者的固定大小的無鎖的環狀隊列,headTail是環狀隊列的首位位置的指針,可以通過位運算解析出首尾的位置,生產者可以從 head 插入、head 刪除,而消費者僅可從 tail 刪除。
這個雙端隊列的模型大概是這個樣子:
poolDequeue里面的環狀隊列大小是固定的,后面分析源碼我們會看到,當環狀隊列滿了的時候會創建一個size是原來兩倍大小的環狀隊列。大家這張圖好好體會一下,會反復用到。
Get方法
func (p *Pool) Get() interface{} {
...
//1.把當前goroutine綁定在當前的P上
l, pid := p.pin()
//2.優先從local的private中獲取
x := l.private
l.private = nil
if x == nil {
//3,private沒有,那么從shared的頭部獲取
x, _ = l.shared.popHead()
//4. 如果都沒有,那么去別的local上去偷一個
if x == nil {
x = p.getSlow(pid)
}
}
//解除搶占
runtime_procUnpin()
...
//5. 如果沒有獲取到,嘗試使用New函數生成一個新的
if x == nil && p.New != nil {
x = p.New()
}
return x
}
-
這一段代碼首先會將當前goroutine綁定在當前的P上返回對應的local,然后嘗試從local的private中獲取,然后需要把private字段置空,因為已經拿到了想要的對象;
-
private中獲取不到,那么就去shared的頭部獲取;
-
shared也沒有,那么嘗試遍歷所有的 local,嘗試從它們的 shared 彈出一個元素;
-
最后如果還是沒有,那么就直接調用預先設置好的 New 函數,創建一個出來。
pin
func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
pin方法里面首先會調用runtime_procPin方法會先獲取當前goroutine,然后綁定到對應的M上,然后返回M目前綁定的P的id,因為這個pid后面會用到,防止在使用途中P被搶占,具體的細節可以看這篇:https://zhuanlan.zhihu.com/p/99710992。
接下來會使用原子操作取出localSize,如果當前pid大於localSize,那么就表示Pool還沒創建對應的poolLocal,那么調用pinSlow進行創建工作,否則調用indexLocal取出pid對應的poolLocal返回。
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
indexLocal里面是使用了地址操作,傳入的i是數組的index值,所以需要獲取poolLocal{}的size做一下地址的位移操作,然后再轉成轉成poolLocal地址返回。
pinSlow
func (p *Pool) pinSlow() (*poolLocal, int) {
// 解除pin
runtime_procUnpin()
// 加上全局鎖
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
// pin住
pid := runtime_procPin()
s := p.localSize
l := p.local
// 重新對pid進行檢查
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 初始化local前會將pool放入到allPools數組中
if p.local == nil {
allPools = append(allPools, p)
}
// 當前P的數量
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
atomic.StoreUintptr(&p.localSize, uintptr(size))
return &local[pid], pid
}
因為allPoolsMu是一個全局Mutex鎖,因此上鎖會比較慢可能被阻塞,所以上鎖前調用runtime_procUnpin方法解除pin的操作;
在解除綁定后,pinSlow 可能被其他的線程調用過了,p.local 可能會發生變化。因此這時候需要再次對 pid 進行檢查。
最后初始化local,並使用原子操作對local和localSize設值,返回當前P對應的local。
到這里pin方法終於講完了。畫一個簡單的圖描述一下這整個流程:
下面我們再回到Get方法中往下走,代碼我再貼一遍,以便閱讀:
func (p *Pool) Get() interface{} {
...
//2.優先從local的private中獲取
x := l.private
l.private = nil
if x == nil {
//3,private沒有,那么從shared的頭部獲取
x, _ = l.shared.popHead()
//4. 如果都沒有,那么去別的local上去偷一個
if x == nil {
x = p.getSlow(pid)
}
}
...
return x
}
如果private中沒有值,那么會調用shared的popHead方法獲取值。
popHead
func (c *poolChain) popHead() (interface{}, bool) {
// 這里頭部是一個poolChainElt
d := c.head
// 遍歷poolChain鏈表
for d != nil {
// 從poolChainElt的環狀列表中獲取值
if val, ok := d.popHead(); ok {
return val, ok
}
// load poolChain下一個對象
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
popHead方法里面會獲取到poolChain的頭結點,不記得poolChain數據結構的同學建議往上面翻一下再回來。
接着有個for循環會挨個從poolChain的頭結點往下遍歷,直到獲取對象返回。
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
// headTail的高32位為head,低32位為tail
head, tail := d.unpack(ptrs)
// 首尾相等,那么這個隊列就是空的
if tail == head {
return nil, false
}
// 這里需要head--之后再獲取slot
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
val := *(*interface{})(unsafe.Pointer(slot))
// 說明沒取到緩存的對象,返回 nil
if val == dequeueNil(nil) {
val = nil
}
// 重置slot
*slot = eface{}
return val, true
}
-
poolDequeue的popHead方法首先會獲取到headTail的值,然后調用unpack解包,headTail是一個64位的值,高32位表示head,低32位表示tail。
-
判斷head和tail是否相等,相等那么這個隊列就是空的;
-
如果隊列不是空的,那么將head減一之后再使用,因為head當前指的位置是空值,表示下一個新對象存放的位置;
-
CAS重新設值新的headTail,成功之后獲取slot,這里因為vals大小是2的n 次冪,因此
len(d.vals)-1)
之后低n位全是1,和head取與之后可以獲取到head的低n位的值; -
如果slot所對應的對象是dequeueNil,那么表示是空值,直接返回,否則將slot指針對應位置的值置空,返回val。
如果shared的popHead方法也沒獲取到值,那么就需要調用getSlow方法獲取了。
getSlow
func (p *Pool) getSlow(pid int) interface{} {
size := atomic.LoadUintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 遍歷locals列表,從其他的local的shared列表尾部獲取對象
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
// victim的private不為空則返回
if x := l.private; x != nil {
l.private = nil
return x
}
// 遍歷victim對應的locals列表,從其他的local的shared列表尾部獲取對象
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 獲取不到,將victimSize置為0
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
getSlow方法會遍歷locals列表,這里需要注意的是,遍歷是從索引為 pid+1 的 poolLocal 處開始,嘗試調用shared的popTail方法獲取對象;如果沒有拿到,則從 victim 里找。如果都沒找到,那么就將victimSize置為0,下次就不找victim了。
poolChain&popTail
func (c *poolChain) popTail() (interface{}, bool) {
d := loadPoolChainElt(&c.tail)
// 如果最后一個節點是空的,那么直接返回
if d == nil {
return nil, false
}
for {
// 這里獲取的是next節點,與一般的雙向鏈表是相反的
d2 := loadPoolChainElt(&d.next)
// 獲取尾部對象
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
return nil, false
}
// 因為d已經沒有數據了,所以重置tail為d2,並刪除d2的上一個節點
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
- 判斷poolChain,如果最后一個節點是空的,那么直接返回;
- 進入for循環,獲取tail的next節點,這里需要注意的是這個雙向鏈表與一般的鏈表是反向的,不清楚的可以再去看看第一張圖;
- 調用popTail獲取poolDequeue列表的對象,有對象直接返回;
- d2為空則表示已經遍歷完整個poolChain雙向列表了,都為空,那么直接返回;
- 通過CAS將tail重置為d2,因為d已經沒有數據了,並將d2的prev節點置為nil,然后將d置為d2,進入下一個循環;
poolDequeue&popTail
func (d *poolDequeue) popTail() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
// 和pophead一樣,將headTail解包
head, tail := d.unpack(ptrs)
// 首位相等,表示列表中沒有數據,返回
if tail == head {
return nil, false
}
ptrs2 := d.pack(head, tail+1)
// CAS重置tail位置
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// 獲取tail位置對象
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
val := *(*interface{})(unsafe.Pointer(slot))
// 判斷對象是不是為空
if val == dequeueNil(nil) {
val = nil
}
// 將slot置空
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
return val, true
}
如果看懂了popHead,那么這個popTail方法是和它非常的相近的。
popTail簡單來說也是從隊列尾部移除一個元素,如果隊列為空,返回 false。但是需要注意的是,這個popTail可能會被多個消費者調用,所以需要循環CAS獲取對象;在poolDequeue環狀列表中tail是有數據的,不必像popHead中head--
。
最后,需要將slot置空。
大家可以再對照一下圖回顧一下代碼:
Put方法
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
...
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
...
}
看完了Get方法,看Put方法就容易多了。同樣Put方法首先會去Pin住當前goroutine和P,然后嘗試將 x 賦值給 private 字段。如果private不為空,那么就調用pushHead將其放入到shared隊列中。
poolChain&pushHead
func (c *poolChain) pushHead(val interface{}) {
d := c.head
// 頭節點沒有初始化,那么設值一下
if d == nil {
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
// 將對象加入到環狀隊列中
if d.pushHead(val) {
return
}
newSize := len(d.vals) * 2
// 這里做了限制,單個環狀隊列不能超過2的30次方大小
if newSize >= dequeueLimit {
newSize = dequeueLimit
}
// 初始化新的環狀列表,大小是d的兩倍
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
// push到新的隊列中
d2.pushHead(val)
}
如果頭節點為空,那么需要創建一個新的poolChainElt對象作為頭節點,大小為8;然后調用pushHead放入到環狀隊列中;
如果放置失敗,那么創建一個 poolChainElt 節點,並且雙端隊列的長度翻倍,當然長度也不能超過dequeueLimit,即2的30次方;
然后將新的節點d2和d互相綁定一下,並將d2設值為頭節點,將傳入的對象push到d2中;
poolDequeue&pushHead
func (d *poolDequeue) pushHead(val interface{}) bool {
ptrs := atomic.LoadUint64(&d.headTail)
// 解包headTail
head, tail := d.unpack(ptrs)
// 判斷隊列是否已滿
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
return false
}
// 找到head的槽位
slot := &d.vals[head&uint32(len(d.vals)-1)]
// 檢查slot是否和popTail有沖突
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
return false
}
if val == nil {
val = dequeueNil(nil)
}
// 將 val 賦值到 slot,並將 head 指針值加 1
*(*interface{})(unsafe.Pointer(slot)) = val
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
首先通過位運算判斷隊列是否已滿,也就是將尾部指針加上 len(d.vals)
,因為head指向的是將要被填充的位置,所以head和tail位置是相隔len(d.vals)
,然后再取低 31 位,看它是否和 head 相等。如果隊列滿了,直接返回 false;
然后找到找到head的槽位slot,並判斷typ是否為空,因為popTail 是先設置 val,再將 typ 設置為 nil,所以如果有沖突,那么直接返回;
最后設值slot,並將head加1返回;
GC
在pool.go文件的 init 函數里,注冊了 GC 發生時,如何清理 Pool 的函數:
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
func poolCleanup() {
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
oldPools, allPools = allPools, nil
}
poolCleanup
會在 STW 階段被調用。主要是將 local 和 victim 作交換,那么不至於GC 把所有的 Pool 都清空了,而是需要兩個 GC
周期才會被釋放。如果 sync.Pool
的獲取、釋放速度穩定,那么就不會有新的池對象進行分配。
總結
Pool這個概念在后台優化中是一個非常重要的手段,比如說在使用Http的時候會使用Http連接池,使用數據庫的時候,也會用到數據庫連接池。這些通過對象重用和預先分配可以減少服務器的壓力。
當我們在后期的項目開發中,如果發現GC耗時很高,有大量臨時對象時不妨可以考慮使用Pool。
例如發現現系統中的 goroutine 數量非常多,由於一個goroutine初始棧是2048字節,所以一個服務器上運行數十萬的goroutine 也是非常耗時的;這時候就可以考慮使用Worker Pool 來減少 goroutine 的使用。