筆者最近在項目中基於 go-redis 實現 Redis 緩存優化性能。go-redis 是一個 Go 語言實現的 Redis 客戶端,既然是網絡服務的客戶端,為了高效利用有限資源,避免重復創建和銷毀網絡連接,就必需對其進行管理。而資源管理又是編程領域中的一個重點難點,抱着對是否能利用 Go 語言語法簡潔的特點來優雅實現連接池的好奇,筆者決定閱讀並分析 go-redis 連接池部分的源碼,一探究竟。以下是對源碼的分析,分為接口與結構體、連接池管理、建立與關閉連接、獲取與放回連接、監控統計等5大部分,源碼鏈接。
接口與結構體
連接結構體:
type Conn struct { netConn net.Conn // 基於 tcp 的網絡連接 rd *proto.Reader // 根據 Redis 通信協議實現的 Reader wr *proto.Writer // 根據 Redis 通信協議實現的 Writer Inited bool // 是否完成初始化 pooled bool // 是否放進連接池 createdAt time.Time // 創建時間 usedAt int64 // 使用時間,atomic }
連接池接口:
type Pooler interface { NewConn(context.Context) (*Conn, error) // 創建連接 CloseConn(*Conn) error // 關閉連接 Get(context.Context) (*Conn, error) // 獲取連接 Put(*Conn) // 放回連接 Remove(*Conn, error) // 移除連接 Len() int // 連接池長度 IdleLen() int // 空閑連接數量 Stats() *Stats // 連接池統計 Close() error // 關閉連接池 }
連接池結構體:
type ConnPool struct { opt *Options // 連接池配置 dialErrorsNum uint32 // 連接錯誤次數,atomic lastDialErrorMu sync.RWMutex // 上一次連接錯誤鎖,讀寫鎖 lastDialError error // 上一次連接錯誤 queue chan struct{} // 工作連接隊列 connsMu sync.Mutex // 連接隊列鎖 conns []*Conn // 連接隊列 idleConns []*Conn // 空閑連接隊列 poolSize int // 連接池大小 idleConnsLen int // 空閑連接隊列長度 stats Stats // 連接池統計 _closed uint32 // 連接池關閉標志,atomic closedCh chan struct{} // 通知連接池關閉通道 }
連接池管理
初始化
var _ Pooler = (*ConnPool)(nil) func NewConnPool(opt *Options) *ConnPool { p := &ConnPool{ opt: opt, queue: make(chan struct{}, opt.PoolSize), conns: make([]*Conn, 0, opt.PoolSize), idleConns: make([]*Conn, 0, opt.PoolSize), closedCh: make(chan struct{}), } p.checkMinIdleConns() if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { go p.reaper(opt.IdleCheckFrequency) } return p }
- 創建連接池,傳入連接池配置選項參數 opt,工廠函數根據 opt 創建連接池實例。連接池主要依靠以下四個數據結構實現管理和通信:
- queue: 存儲工作連接的緩沖通道
- conns:存儲所有連接的切片
- idleConns:存儲空閑連接的切片
- closed:用於通知所有協程連接池已經關閉的通道
- 檢查連接池的空閑連接數量是否滿足最小空閑連接數量要求,若不滿足,則創建足夠的空閑連接。
- 若連接池配置選項規定了空閑連接超時和檢查空閑連接頻率,則開啟一個清理空閑連接的協程。
關閉
func (p *ConnPool) Close() error { if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) { return ErrClosed } close(p.closedCh) var firstErr error p.connsMu.Lock() for _, cn := range p.conns { if err := p.closeConn(cn); err != nil && firstErr == nil { firstErr = err } } p.conns = nil p.poolSize = 0 p.idleConns = nil p.idleConnsLen = 0 p.connsMu.Unlock() return firstErr }
- 原子性檢查連接池是否已經關閉,若沒關閉,則將關閉標志置為1
- 關閉 closedCh 通道,連接池中的所有協程都可以通過判斷該通道是否關閉來確定連接池是否已經關閉。
- 連接隊列鎖上鎖,關閉隊列中的所有連接,並置空所有維護連接池狀態的數據結構,解鎖。
過濾
func (p *ConnPool) Filter(fn func(*Conn) bool) error { var firstErr error p.connsMu.Lock() for _, cn := range p.conns { if fn(cn) { if err := p.closeConn(cn); err != nil && firstErr == nil { firstErr = err } } } p.connsMu.Unlock() return firstErr }
實質上是遍歷連接池中的所有連接,並調用傳入的 fn 過濾函數作用在每個連接上,過濾出符合業務要求的連接。
清理
func (p *ConnPool) reaper(frequency time.Duration) { ticker := time.NewTicker(frequency) defer ticker.Stop() for { select { case <-ticker.C: // It is possible that ticker and closedCh arrive together, // and select pseudo-randomly pick ticker case, we double // check here to prevent being executed after closed. if p.closed() { return } _, err := p.ReapStaleConns() if err != nil { internal.Logger.Printf("ReapStaleConns failed: %s", err) continue } case <-p.closedCh: return } } } func (p *ConnPool) ReapStaleConns() (int, error) { var n int for { p.getTurn() p.connsMu.Lock() cn := p.reapStaleConn() p.connsMu.Unlock() p.freeTurn() if cn != nil { _ = p.closeConn(cn) n++ } else { break } } atomic.AddUint32(&p.stats.StaleConns, uint32(n)) return n, nil } func (p *ConnPool) reapStaleConn() *Conn { if len(p.idleConns) == 0 { return nil } cn := p.idleConns[0] if !p.isStaleConn(cn) { return nil } p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...) p.idleConnsLen-- p.removeConn(cn) return cn }
- 開啟一個用於檢查並清理過期連接的 goroutine 每隔 frequency 時間遍歷檢查連接池中是否存在過期連接,並清理。
- 創建一個時間間隔為 frequency 的計時器,在連接池關閉時關閉該計時器
- 循環判斷計時器是否到時和連接池是否關閉
- 移除空閑連接隊列中的過期連接
建立與關閉連接
建立連接
func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) { cn, err := p.dialConn(ctx, pooled) if err != nil { return nil, err } p.connsMu.Lock() p.conns = append(p.conns, cn) if pooled { // If pool is full remove the cn on next Put. if p.poolSize >= p.opt.PoolSize { cn.pooled = false } else { p.poolSize++ } } p.connsMu.Unlock() return cn, nil } func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) { if p.closed() { return nil, ErrClosed } if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) { return nil, p.getLastDialError() } netConn, err := p.opt.Dialer(ctx) if err != nil { p.setLastDialError(err) if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) { go p.tryDial() } return nil, err } cn := NewConn(netConn) cn.pooled = pooled return cn, nil } func (p *ConnPool) tryDial() { for { if p.closed() { return } conn, err := p.opt.Dialer(context.Background()) if err != nil { p.setLastDialError(err) time.Sleep(time.Second) continue } atomic.StoreUint32(&p.dialErrorsNum, 0) _ = conn.Close() return } }
創建連接流程圖:
newConn流程圖.png
DialConn流程圖.png
移除與關閉連接
func (p *ConnPool) Remove(cn *Conn, reason error) { p.removeConnWithLock(cn) p.freeTurn() _ = p.closeConn(cn) } func (p *ConnPool) CloseConn(cn *Conn) error { p.removeConnWithLock(cn) return p.closeConn(cn) } func (p *ConnPool) removeConnWithLock(cn *Conn) { p.connsMu.Lock() p.removeConn(cn) p.connsMu.Unlock() } func (p *ConnPool) removeConn(cn *Conn) { for i, c := range p.conns { if c == cn { p.conns = append(p.conns[:i], p.conns[i+1:]...) if cn.pooled { p.poolSize-- p.checkMinIdleConns() } return } } } func (p *ConnPool) closeConn(cn *Conn) error { if p.opt.OnClose != nil { _ = p.opt.OnClose(cn) } return cn.Close() }
連接池無論移除還是關閉連接,底層調用的都是 removeConnWithLock 函數。removeConnWithLock 函數的工作流程如下:
- 連接隊列上鎖
- 遍歷連接隊列找到要關閉的連接,並將其移除出連接隊列
- 更新連接池統計數據
- 檢查連接池最小空閑連接數量
- 連接隊列解鎖
- 關閉連接,先執行關閉連接時的回調函數(創建連接池時的配置選項傳入),再關閉連接
獲取與放回連接
獲取
// Get returns existed connection from the pool or creates a new one. func (p *ConnPool) Get(ctx context.Context) (*Conn, error) { if p.closed() { return nil, ErrClosed } err := p.waitTurn(ctx) if err != nil { return nil, err } for { p.connsMu.Lock() cn := p.popIdle() p.connsMu.Unlock() if cn == nil { break } if p.isStaleConn(cn) { _ = p.CloseConn(cn) continue } atomic.AddUint32(&p.stats.Hits, 1) return cn, nil } atomic.AddUint32(&p.stats.Misses, 1) newcn, err := p.newConn(ctx, true) if err != nil { p.freeTurn() return nil, err } return newcn, nil } func (p *ConnPool) getTurn() { p.queue <- struct{}{} } func (p *ConnPool) waitTurn(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() default: } select { case p.queue <- struct{}{}: return nil default: } timer := timers.Get().(*time.Timer) timer.Reset(p.opt.PoolTimeout) select { case <-ctx.Done(): if !timer.Stop() { <-timer.C } timers.Put(timer) return ctx.Err() case p.queue <- struct{}{}: if !timer.Stop() { <-timer.C } timers.Put