Golang SQL連接池梳理


公眾號首發、歡迎關注

一、如何理解數據庫連接

數據庫連接池是由客戶端維護的存放數據庫連接的池子,連接被維護在池子里面,誰用誰來取,目的是降低頻繁的創建和關閉連接的開銷。

關於如何理解數據庫連接,大家可以借助這個TCP編程的Demo來理解。

為了便於理解,可以MySQL-Server的連接池想象成就是這個簡單的Tcp-Server

func main() {
	// 1. 監聽端口 2.accept連接 3.開goroutine處理連接
	listen, err := net.Listen("tcp", "0.0.0.0:9090")
	if err != nil {
		fmt.Printf("error : %v", err)
		return
	}
	for{
		conn, err := listen.Accept()
		if err != nil {
			fmt.Printf("Fail listen.Accept : %v", err)
			continue
		}
		go ProcessConn(conn)
	}
}

// 處理網絡請求
func ProcessConn(conn net.Conn) {
	// defer conn.Close()
	for  {
		bt,err:= coder.Decode(conn)
		if err != nil {
			fmt.Printf("Fail to decode error [%v]", err)
			return
		}
		s := string(bt)
		fmt.Printf("Read from conn:[%v]\n",s)
	}
}

對於我們現在看的sql包下的連接池,可以簡化認為它就是如下的tcp-client

conn, err := net.Dial("tcp", ":9090")
	defer conn.Close()
	if err != nil {
		fmt.Printf("error : %v", err)
		return
	}

	// 將數據編碼並發送出去
	coder.Encode(conn,"hi server i am here");

	time.Sleep(time.Second*10

總體的思路可以認為,程序啟動的時候,根據我們的配置,sql包中的DB會為我們提前創建幾條這樣的conn,然后維護起來,不close()掉,我們想使用的時候問他拿即可。

至於為什么是這個tcp的demo呢?因為數據庫連接的建立底層依賴的是tcp連接。基於tcp連接的基礎上實現客戶端和服務端數據的傳輸,再往上封裝一層mysql的握手、鑒權、交互協議對數據包進行解析、反解析,進而跑通整個流程。

二、連接池的工作原理

  • 連接池的建立
    • 后台系統初始化時,連接池會根據系統的配置建立。
    • 但是在接受客戶端請求之前,並沒有真正的創建連接。
    • 在go語言中,先注冊驅動_ "github.com/go-sql-driver/mysql"
    • 初始化DB,調用Open函數,這時其實沒有真正的去獲取連接,而是去獲取DB操作的數據結構。
  • 連接池中連接的使用和管理
  • 連接池的關閉
    • 釋放連接
    • 關閉連接的請求隊列
    • connectionOpener(負責打開連接的協程)
    • connectionResetter(重制連接狀態的協程)
    • connectionCleaner(定期清理過期連接的協程)

三、database/sql包結構

image-20200719230058101

driver/driver.go :定義了實現數據庫驅動所需要的接口,這些接口由sql包和具體的驅動包來實現

driver/types.go:定義了數據類型別名和轉換

convert:rows的scan

sql.go: 關於SQL數據庫的一些通用的接口、類型。包括:連接池、數據類型、連接、事物、statement

import "github.com/go-sql-driver/mysql” // 具體的驅動包
import "database/sql"

// 初始化連接
func initDB() (err error) {
	db, err = sql.Open("mysql", "root:root@tcp(127.0.0.1:3306)/test")
	if err != nil {
		panic(err)
	}
	// todo 不要在這里關閉它, 函數一結束,defer就執行了
	// defer db.Close()
	err = db.Ping()
	if err != nil {
		return err
	}
	return nil
}

四、三個重要的結構體

4.1、DB

/**
	DB是代表零個或多個基礎連接池的數據庫句柄。 對於多個goroutine並發使用是安全的。
	sql包會自動創建並釋放連接。 它還維護空閑連接的空閑池。 
	如果數據庫具有每個連接狀態的概念,則可以在事務(Tx)或連接(Conn)中可靠地觀察到這種狀態。
  調用DB.Begin之后,返回的Tx將綁定到單個連接。 
  在事務上調用Commit或Rollback后,該事務的連接將返回到DB的空閑連接池。
  池大小可以通過SetMaxIdleConns控制。
*/
type DB struct {
	// Atomic access only. At top of struct to prevent mis-alignment
	// on 32-bit platforms. Of type time.Duration.
  // 統計使用:等待新的連接所需要的總時間
	waitDuration int64 // Total time waited for new connections.

  // 由具體的數據庫驅動實現的 connector
	connector driver.Connector
  
	// numClosed is an atomic counter which represents a total number of
	// closed connections. Stmt.openStmt checks it before cleaning closed
	// connections in Stmt.css.
  // 關閉的連接數
	numClosed uint64

	mu           sync.Mutex // protects following fields
  
  // 連接池,在go中,連接的封裝結構體是:driverConn
	freeConn     []*driverConn
  
  // 連接請求的map, key是自增的int64類型的數,用於唯一標示這個請求分配的
	connRequests map[uint64]chan connRequest
  
  // 類似於binlog中的next trx_ix ,下一個事物的id
	nextRequest  uint64 // Next key to use in connRequests.
  
  // 已經打開,或者等待打開的連接數
	numOpen      int    // number of opened and pending open connections
  
	// Used to signal the need for new connections
	// a goroutine running connectionOpener() reads on this chan and
	// maybeOpenNewConnections sends on the chan (one send per needed connection)
	// It is closed during db.Close(). The close tells the connectionOpener
	// goroutine to exit.
  // 他是個chan,用於通知connectionOpener()協程應該打開新的連接了。
	openerCh          chan struct{}
  
  // 他是個chan,用於通知connectionResetter協程:重制連接的狀態。
	resetterCh        chan *driverConn
  
	closed            bool
  
  // 依賴,key是連接、statement
	dep               map[finalCloser]depSet
	lastPut           map[*driverConn]string // stacktrace of last conn's put; debug only
  
  // 連接池的大小,0意味着使用默認的大小2, 小於0表示不使用連接池
	maxIdle           int    // zero means defaultMaxIdleConns; negative means 0
  // 最大打開的連接數,包含連接池中的連接和連接池之外的空閑連接, 0表示不做限制
	maxOpen           int    // <= 0 means unlimited
  
  // 連接被重用的時間,設置為0表示一直可以被重用。
	maxLifetime       time.Duration  // maximum amount of time a connection may be reused
  
  // 他是個chan,用於通知connectionCleaner協程去請求過期的連接
  // 當有設置最大存活時間時才會生效
	cleanerCh         chan struct{}
  
  // 等待的連接總數,當maxIdle為0時,waitCount也會一直為
  // 因為maxIdle為0,每一個請求過來都會打開一條新的連接。
	waitCount         int64 // Total number of connections waited for.
  
  // 釋放連接時,因為連接池已滿而關閉的連接總數
  // 如果maxLifeTime沒有被設置,maxIdleClosed為0
	maxIdleClosed     int64 // Total number of connections closed due to idle.
  
  // 因為超過了最大連接時間,而被關閉的連接總數
	maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
  
  // 當DB被關閉時,關閉connection opener和session resetter這兩個協程
	stop func() // stop cancels the connection opener and the session resetter.
}

4.2、driverConn

連接的封裝結構體:driverConn

// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
/**
	driverConn使用互斥鎖包裝Conn包裝
*/
type driverConn struct {
  // 持有對整個數據庫的抽象結構體
	db        *DB   		
	createdAt time.Time 

	sync.Mutex  // guards following
  
  // 對應於具體的連接,eg.mysqlConn
	ci          driver.Conn
  
  // 標記當前連接的狀態:當前連接是否已經關閉
	closed      bool
  // 標記當前連接的狀態:當前連接是否最終關閉,包裝 ci.Close has been called
	finalClosed bool // ci.Close has been called
  
  // 在這些連接上打開的statement
	openStmt    map[*driverStmt]bool
  
  // connectionResetter返回的結果
	lastErr     error // lastError captures the result of the session resetter.

	// guarded by db.mu
  // 連接是否被占用了
	inUse      bool
  
  // 在歸還連接時需要運行的代碼。在noteUnusedDriverStatement中添加
	onPut      []func() // code (with db.mu held) run when conn is next returned
  
	dbmuClosed bool     // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}

4.3、Conn

具體的連接: driver包下的Conn如下,是個接口,需要被具體的實現。

// Conn is assumed to be stateful.
type Conn interface {
	// Prepare returns a prepared statement, bound to this connection.
	Prepare(query string) (Stmt, error)

	// Close invalidates and potentially stops any current
	// prepared statements and transactions, marking this
	// connection as no longer in use.
	//
	// Because the sql package maintains a free pool of
	// connections and only calls Close when there's a surplus of
	// idle connections, it shouldn't be necessary for drivers to
	// do their own connection caching.
	Close() error

	// Begin starts and returns a new transaction.
	//
	// Deprecated: Drivers should implement ConnBeginTx instead (or additionally).
	Begin() (Tx, error)
}

五、流程梳理

5.1、先獲取DB實例

在golang中,要想獲取連接,一般我們都得通過下面這段代碼獲取到DB的封裝結構體實例。

通過上面的三個結構體可以看出 DB 、driverConn、Conn的關系如下:

所以我們的代碼一般長成下面這樣,先獲取一個DB結構體的實例,DB結構體中有維護連接池、以及和創建連接,關閉連接協程通信的channel,已經各種配置參數。

上圖中淺藍色部分的 freeConn就是空閑連接池,里面的driver包下的Conn interface就是具體的連接。

/**
 * MySQL連接相關的邏輯
 */
type Conenctor struct {
	BaseInfo BaseInfo
	DB       *sql.DB
}

func (c *Conenctor) Open() {
	// 讀取配置
	c.loadConfig()
	dataSource := c.BaseInfo.RootUserName + ":" + c.BaseInfo.RootPassword + "@tcp(" + c.BaseInfo.Addr + ":" + c.BaseInfo.Port + ")/" + c.BaseInfo.DBName
	db, Err := sql.Open("mysql", dataSource)
	if Err != nil {
		common.Error("Fail to opendb dataSource:[%v] Err:[%v]", dataSource, Err.Error())
		return
	}
	db.SetMaxOpenConns(500)
	db.SetMaxIdleConns(200)
	c.DB = db
	Err = db.Ping()
	if Err != nil {
		fmt.Printf("Fail to Ping DB Err :[%v]", Err.Error())
		return
	}
}

5.2、流程梳理入口:

比如我們自己寫代碼時,可能會搞這樣一個方法做增刪改

// 插入、更新、刪除
func (c *Conenctor) Exec(ctx context.Context, 
                         sqlText string,
                         params ...interface{}) (qr *QueryResults) {
	qr = &QueryResults{}
	result, err := c.DB.ExecContext(ctx, sqlText, params...)
	defer HandleException()
	if err != nil {
		qr.EffectRow = 0
		qr.Err = err
		common.Error("Fail to exec qurey sqlText:[%v] params:[%v] err:[%v]", sqlText, params, err)
		return
	}
	qr.EffectRow, _ = result.RowsAffected()
	qr.LastInsertId, _ = result.LastInsertId()
	return
}

主要是使用DB.ExecContext()執行SQL,獲取返回值。

ctx是業務代碼傳入的上線文,通常是做超時限制使用。

其實這里並不是嚴格意義上的去執行sql,它其實是通過和MySQL-Server之間建立的連接將sql+params發往MySQL-Server去解析和執行。

進入DB.ExecContext()

主要邏輯如下:exec()方法的主要功能是:獲取連接,發送sql和參數。

  • 如果獲取一次失敗一次,當失敗的次數達到sql包預定義的常量maxBadConnRetries的情況下,將會創建新的連接使用
  • 未超過maxBadConnRetries,被打上cachedOrNewConn,優先從空閑池中獲取連接
func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
   var res Result
   var err error
   for i := 0; i < maxBadConnRetries; i++ {
      res, err = db.exec(ctx, query, args, cachedOrNewConn)
      if err != driver.ErrBadConn {
         break
      }
   }
   if err == driver.ErrBadConn {
      return db.exec(ctx, query, args, alwaysNewConn)
   }
   return res, err
}

跟進exec() --> db.conn(ctx, strategy)

func (db *DB) exec(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (Result, error) {
  // 這個strategy就是上一步我們告訴他是創建新連接,還是優先從緩存池中獲取連接。
	dc, err := db.conn(ctx, strategy)
  ..
}

5.3、獲取連接

跟進conn()方法

conn方法的返回值是driverConn,也就是我們上面說的數據庫連接,作用就是說,跟據傳遞進來的獲取策略,獲取數據庫連接,如果正常就返回獲取到的數據庫連接,異常就返回錯誤err

這張圖是conn獲取連接的流程圖,根據下面這段代碼畫出來的,注釋有寫在代碼上

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
	db.mu.Lock()
  // 先監測db是否關閉了
	if db.closed {
		db.mu.Unlock()
    // DB都關閉了,直接返回DBClosed錯誤,沒必要再去獲取連接。
		return nil, errDBClosed
	}
  // 檢查用戶傳遞進來的Context是否過期了
	select {
	default:
  // 如果用戶那邊使用了ctx.Done(),毫無疑問,會進入這個case中,返回Ctx錯誤  
	case <-ctx.Done():
		db.mu.Unlock()
		return nil, ctx.Err()
	}
  // 連接被重用的時間,如果為0,表示 理論上這個連接永不過期,一直可以被使用
	lifetime := db.maxLifetime

  // 看一下空閑連接池(他是個slice)是否是還有空閑的連接
	numFree := len(db.freeConn)
  // 如果獲取策略是優先從連接池中獲取,並且連接池中確實存在空閑的連接,就從freeConn中取連接使用。
	if strategy == cachedOrNewConn && numFree > 0 {
    // 假設空閑池還剩下五條連接:【1,2,3,4,5】
    // 取出第一條 conn == 1
		conn := db.freeConn[0]
    // 切片的拷貝,實現remove掉第一個連接的目的。
		copy(db.freeConn, db.freeConn[1:])
    // 如果db.freeConn[1:]會導致freeConn變小,所以這里是 db.freeConn = db.freeConn[:numFree-1]
		db.freeConn = db.freeConn[:numFree-1]
    // 這里獲取的連接是driverConn,它其實是對真實連接,driver.Conn的封裝。
    // 在driver.Conn的基礎上多一層封裝可以實現在driver.Conn的基礎上,加持上狀態信息,如下
		conn.inUse = true
		db.mu.Unlock()
    // 檢查是否過期
		if conn.expired(lifetime) {
			conn.Close()
			return nil, driver.ErrBadConn
		}
		// Lock around reading lastErr to ensure the session resetter finished.
    // 加鎖處理,確保這個conn未曾被標記為 lastErr狀態。
    // 一旦被標記為這個狀態說明 ConnectionRestter協程在重置conn的狀態時發生了錯誤。也就是這個連接其實已經壞掉了,不可使用。
		conn.Lock()
		err := conn.lastErr
		conn.Unlock()
    // 如果檢測到這種錯誤,driver.ErrBadConn 表示連接不可用,關閉連接,返回錯誤。
		if err == driver.ErrBadConn {
			conn.Close()
			return nil, driver.ErrBadConn
		}
		return conn, nil
	}

	// Out of free connections or we were asked not to use one. If we're not
	// allowed to open any more connections, make a request and wait.
  // db.maxOpen > 0 表示當前DB實例允許打開連接
  // db.numOpen >= db.maxOpen表示當前DB能打開的連接數,已經大於它能打開的最大連接數,就構建一個request,然后等待獲取連接
	if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
		// Make the connRequest channel. It's buffered so that the
		// connectionOpener doesn't block while waiting for the req to be read.
	
    // 構建connRequest這個channel,緩存大小是1
    // 用於告訴connectionOpener協程,需要打開一個新的連接。
		req := make(chan connRequest, 1)
    
    /**
      nextRequestKeyLocked函數如下:
     
      func (db *DB) nextRequestKeyLocked() uint64 {
				next := db.nextRequest
				db.nextRequest++
				return next
			}
			
			主要作用就是將nextRequest+1,
			至於這個nextRequest的作用我們前面也說過了,它相當於binlog中的next_trx下一個事物的事物id。
			言外之意是這個nextRequest遞增的(因為這段代碼被加了lock)。
			看如下的代碼中,將這個自增后的nextRequest當返回值返回出去。
			然后緊接着將它作為map的key
			
			至於這個map嘛:
		  在本文一開始的位置,我們介紹了DB結構體有這樣一個屬性,連接請求的map, key是自增的int64類型的數,
      用於唯一標示這個請求分配的
      connRequests map[uint64]chan connRequest 
     */
		reqKey := db.nextRequestKeyLocked()
    // 將這個第n個請求對應channel緩存起來,開始等待有合適的機會分配給他連接
		db.connRequests[reqKey] = req
    // 等待數增加,解鎖
		db.waitCount++
		db.mu.Unlock()
    
		waitStart := time.Now()

		// Timeout the connection request with the context.
    // 進入下面的slice中
		select {
    // 如果客戶端傳入的上下文超時了,進入這個case
		case <-ctx.Done():
			// Remove the connection request and ensure no value has been sent
			// on it after removing.
      // 當上下文超時時,表示上層的客戶端代碼想斷開,意味着在這個方法收到這個信號后需要退出了
      // 這里將db的connRequests中的reqKey清除,防止還給他分配一個連接。
			db.mu.Lock()
			delete(db.connRequests, reqKey)
			db.mu.Unlock()

			atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
			// 這里也會嘗試從req channel中獲取一下有沒有可用的連接
      // 如果有的話執行 db.putConn(ret.conn, ret.err, false) ,目的是釋放掉這個連接
			select {
			default:
			case ret, ok := <-req:
				if ok && ret.conn != nil {
          // 看到這里只需要知道他是用來釋放連接的就ok,繼續往下看,稍后再殺回來
					db.putConn(ret.conn, ret.err, false)
				}
			}
      //返回ctx異常。
			return nil, ctx.Err()
    // 嘗試從 reqchannel 中取出連接
		case ret, ok := <-req:
			atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
			// 處理錯誤
			if !ok {
				return nil, errDBClosed
			}
      // 檢測連接是否過期了,前面也提到過,DB實例有維護一個參數,maxLifeTime,0表示永不過期
			if ret.err == nil && ret.conn.expired(lifetime) {
				ret.conn.Close()
				return nil, driver.ErrBadConn
			}
      // 健壯性檢查
			if ret.conn == nil {
				return nil, ret.err
			}
      
			// Lock around reading lastErr to ensure the session resetter finished.
      // 檢查連接是否可用
			ret.conn.Lock()
			err := ret.conn.lastErr
			ret.conn.Unlock()
			if err == driver.ErrBadConn {
				ret.conn.Close()
				return nil, driver.ErrBadConn
			}
			return ret.conn, ret.err
		}
	}
  // 代碼能運行到這里說明上面的if條件沒有被命中。
  // 換句話說,來到這里說明具備如下條件
  // 1:當前DB實例的空閑連接池中已經沒有空閑連接了,獲取明確指定,不從空閑池中獲取連接,就想新建連接。
  // 2: 當前DB實例允許打開連接
  // 3: DB實例目前打開的連接數還沒有到達它能打開的最大連接數的上限。
  
	// 記錄當前DB已經打開的連接數+1
	db.numOpen++ // optimistically
	db.mu.Unlock()
	ci, err := db.connector.Connect(ctx)
	if err != nil {
		db.mu.Lock()
		db.numOpen-- // correct for earlier optimism
		db.maybeOpenNewConnections()
		db.mu.Unlock()
		return nil, err
	}
	db.mu.Lock()
  // 構建一個連接實例,並返回
	dc := &driverConn{
		db:        db,
		createdAt: nowFunc(),
		ci:        ci,
		inUse:     true,
	}
	db.addDepLocked(dc, dc)
	db.mu.Unlock()
	return dc, nil
}

5.4、釋放連接

連接被是過后是需要被釋放的

釋放連接的邏輯封裝在DB實例中

db.putConn(ret.conn, ret.err, false)

釋放連接的流程圖如下:

流程圖根據如下的代碼畫出。

方法詳細信息如下:

func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
  // 釋放連接的操作加鎖
	db.mu.Lock()
  // debug的信息
	if !dc.inUse {
		if debugGetPut {
			fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
		}
		panic("sql: connection returned that was never out")
	}
	if debugGetPut {
		db.lastPut[dc] = stack()
	}
  // 標記driverConn處理不可用的狀態
	dc.inUse = false

	for _, fn := range dc.onPut {
		fn()
	}
	dc.onPut = nil

  // 本方法的入參中有參數err
  // 當會話獲取出這個連接后,發現這個連接過期了、或者被標記上來lastErr時,再調用這個putConn方法時,同時會將這個錯誤傳遞進來,然后在這里判斷,當出現壞掉的連接時就不直接把這個連接放回空閑連接池了。
	if err == driver.ErrBadConn {
		// Don't reuse bad connections.
		// Since the conn is considered bad and is being discarded, treat it
		// as closed. Don't decrement the open count here, finalClose will
		// take care of that.
    // 這個方法的作用如下:
    // 他會去判斷當前DB維護的map的容量,也就是前面提到的那種情況:當DB允許打開連接,但是現在的連接數已經達到當前DB允許打開的最大連接數上限了,那么針對接下來想要獲取連接的請求的處理邏輯就是,構建一個req channel,放入connRequests這個map中,表示他們正在等待連接的建立。
    // 換句話說,這時系統時繁忙的,業務處於高峰,那么問題來了,現在竟然出現了一個壞掉的連接,那為了把對業務線的影響降到最低,是不是得主動新建一個新的連接放到空閑連接池中呢?
    // 	db.maybeOpenNewConnections() 函數主要干的就是這個事。
    // 	方法詳情如下
    /*
    	func (db *DB) maybeOpenNewConnections() {
					numRequests := len(db.connRequests)
					if db.maxOpen > 0 {
						numCanOpen := db.maxOpen - db.numOpen
					if numRequests > numCanOpen {
						numRequests = numCanOpen
					}
			}
			for numRequests > 0 {
						db.numOpen++ // optimistically
						numRequests--
						if db.closed {
							return
						}
				  // 它只是往這個	openerCh channel中寫入一個空的結構體,會有專門的協程負責創建連接
					db.openerCh <- struct{}{}
			}
		}
    */
		db.maybeOpenNewConnections()
    //  解鎖,關閉連接,返回
		db.mu.Unlock()
		dc.Close()
		return
	}
	if putConnHook != nil {
		putConnHook(db, dc)
	}
  // 如果DB已經關閉了,標記 resetSession為 false
	if db.closed {
		// Connections do not need to be reset if they will be closed.
		// Prevents writing to resetterCh after the DB has closed.
    // 當DB都已經關了,意味着DB里面的連接池都沒有了,那當然不需要關閉連接池中的連接了~
		resetSession = false
	}
  // 如果DB沒有關閉的話,進入if代碼塊
	if resetSession {
    // 將dricerConn中的Conn驗證轉換為driver.SessionResetter
		if _, resetSession = dc.ci.(driver.SessionResetter); resetSession {
      // 在此處鎖定driverConn,以便在連接重置之前不會釋放。
      // 必須在將連接放入池之前獲取鎖,以防止在重置之前將其取出
			dc.Lock()
		}
	}
  // 真正將連接放回空閑連接池中
  // 滿足connRequest或將driverConn放入空閑池並返回true或false
  /*
  	func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
  			// 檢測如果DB都關閉塊,直接返回flase
				if db.closed {
					return false
				}
				// 如果DB當前打開的連接數大於DB能打開的最大的連接數,返回false
				if db.maxOpen > 0 && db.numOpen > db.maxOpen {
					return false
				}
				//如果等待獲取連接的map中有存貨
		 if c := len(db.connRequests); c > 0 {
		 		
				var req chan connRequest
				var reqKey uint64
				// 取出map中的第一個key
				for reqKey, req = range db.connRequests {
					break
				}
				// 將這個key,value再map中刪除
				delete(db.connRequests, reqKey) // Remove from pending requests.
				// 重新標記這個連接是可用的狀態
				if err == nil {
					dc.inUse = true
				}
				// 將這個連接放入到 req channel中,給等待連接到會話使用
				req <- connRequest{
					conn: dc,
					err:  err,
				}
				return true
				
		// 來到這個if,說明此時沒有任何請求在等待獲取連接,並且沒有發生錯誤,DB也沒有關閉
		} else if err == nil && !db.closed {
				// 比較當前空閑連接池的大小(默認是2) 和 freeConn空閑連接數的數量
				// 意思是,如果空閑的連接超出了這個規定的閾值,空閑連接是需要被收回的。
				if db.maxIdleConnsLocked() > len(db.freeConn) {
				  // 收回
					db.freeConn = append(db.freeConn, dc)
					db.startCleanerLocked()
					return true
				}
				// 如果空閑連接還沒到閾值,保留這個連接當作空閑連接
				db.maxIdleClosed++
		}		
				// 收回空閑連接返回false
				return false
}
  */
  
  // 如果將連接成功放入了空閑連接池,或者將連接成功給了等待連接到會話使用,此處返回true
  // 收回空閑連接返回false
  // 代碼詳情就是在上面的這段注釋中
	added := db.putConnDBLocked(dc, nil)
	db.mu.Unlock()
	
  // 如果
	if !added {
    // 如果DB沒有關閉,進入if
		if resetSession {
			dc.Unlock()
		}
		dc.Close()
		return
	}
  // 重新校驗,如果連接關閉了,進入if
	if !resetSession {
		return
	}
  
  // 如果負責重置 conn狀態的線程阻塞住了,那么標記這個driverConn為lastErr
	select {
	default:
		// If the resetterCh is blocking then mark the connection
		// as bad and continue on.
		dc.lastErr = driver.ErrBadConn
		dc.Unlock()
	case db.resetterCh <- dc:
	}
}

5.5、connectionOpener

5.5.1、是什么?

這個connectionOpener是一個工作協程,它會去嘗試消費指定的channel,負責創建數據庫連接,其實在前面閱讀獲取連接的邏輯時,有這樣的兩種情況會阻塞等待connectionOpener來新創建連接:

第一種:當獲取連接的策略是優先從cache連接池中獲取出來,但是空閑連接池已經沒有空閑的連接了,首先這時DB允許打開連接,但是DB能打開的連接數已經達到了它能打開的連接數的上線,所以得等待有空閑連接出現,或者等有連接被釋放后,DB能當前打開的連接數小於了它能打開的連接數的最大值,這時它會被阻塞等待去嘗試創建連接。

第二種:獲取連接的策略不再是優先從空閑緩沖池中獲取連接,直接明了的想獲取最一條新連接,同樣的此時DB已經打開的連接數大於它能打開連接數的上線,它會被阻塞等待創建連接。

image-20200731221533203

5.5.2、什么時候開啟的?
func OpenDB(c driver.Connector) *DB {
	ctx, cancel := context.WithCancel(context.Background())
	db := &DB{
		connector:    c,
		openerCh:     make(chan struct{}, connectionRequestQueueSize),
		resetterCh:   make(chan *driverConn, 50),
		lastPut:      make(map[*driverConn]string),
		connRequests: make(map[uint64]chan connRequest),
		stop:         cancel,
	}
	// 可以看到他是在DB被實例化時開啟的。
	go db.connectionOpener(ctx)
	go db.connectionResetter(ctx)

	return db
}
5.5.3、代碼詳情

可以看到它一直嘗試從db的openerCh中獲取內容,而且只要獲取到了內容,就會調用方法打開連接。

// Runs in a separate goroutine, opens new connections when requested.
func (db *DB) connectionOpener(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
    // here  
		case <-db.openerCh:
			db.openNewConnection(ctx)
		}
	}
}
5.5.4、誰往openerCh中投放消息?

往channl中投放消息的邏輯在db的mayBeOpenNewConnections中

func (db *DB) maybeOpenNewConnections() {
  // 通過檢查這個map的長度來決定是否往opennerCh中投放消息
	numRequests := len(db.connRequests)
	if db.maxOpen > 0 {
		numCanOpen := db.maxOpen - db.numOpen
		if numRequests > numCanOpen {
			numRequests = numCanOpen
		}
	}
	for numRequests > 0 {
		db.numOpen++ // optimistically
		numRequests--
		if db.closed {
			return
		}
    // 一旦執行了這一步,connectionOpener 就會監聽到去創建連接。
		db.openerCh <- struct{}{}
	}
}
5.5.5、注意點:

在DB結構體中有這樣一個屬性

  // 連接池的大小,0意味着使用默認的大小2, 小於0表示不使用連接池
	maxIdle           int    // zero means defaultMaxIdleConns; negative means 0

表示空閑連接池默認的大小,如果它為0,表示都沒有緩存池,也就意味着會為所有想獲取連接的請求創建新的conn,這時也就不會有這個opnerCh,更不會有connectionOpener

5.6、connectionCleaner

5.6.1、是什么?有啥用?

它同樣以一條協程的形式存在,用於定時清理數據庫連接池中過期的連接

func (db *DB) startCleanerLocked() {
	if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {
		db.cleanerCh = make(chan struct{}, 1)
		go db.connectionCleaner(db.maxLifetime)
	}
}
5.6.2、注意點

同樣的,DB中存在一個參數:maxLifetime

它表示數據庫連接最大的生命時長,如果將它設置為0,表示這個連接永不過期,既然所有的連接永不過期,就不會存在connectionCleaner去定時根據maxLifetime 來定時清理連接。

它的調用時機是:需要將連接放回到連接池時調用。

5.7、connectionRestter

5.7.1、作用

我們使用獲取的連接的封裝結構體是driverConn,其實它是會driver包下的Conn連接的又一層封裝,目的是增強

driver包下的Conn的,多出來了一些狀態。當將使用完畢的連接放入連接池時,就得將這些狀態清除掉。

使用誰去清除呢?就是這個go 協程:connectionRestter

當connectionRestter碰到錯誤時,會將這個conn標記為lastErr,連接使用者在使用連接時會先校驗conn的諸多狀態,比如出現lastErr,會返回給客戶端 badConnErr

六、MySQL連接池所受的限制

數據庫連接池的大小到底設置為多少,得根據業務流量以及數據庫所在機器的性能綜合考慮。

mysql連接數到配置在 my.cnf中,具體的參數是max_connections。

當業務流量異常猛烈時,很可能會出現這個問題:to many connections

對於操縱系統內核來說,當他接受到一個tcp請求就會在本地創建一個由文件系統管理的socket文件。在linux中我們將它叫做文件句柄。

linux為防止單一進程將系統資源全部耗費掉,會限制進程最大能打開的連接數為1024,這意味着,哪怕通過改配置文件,將mysql能打開的連接池設置為9999,事實上它能打開的文件數最多不會超過1024。

這個問題也好解決:

命令:設置單個進程能打開的最大連接數為65535

ulimit -HSn 65535

通過命令: 查看進程被限制的使用各種資源的量

ulimit -a 

core file size: 進程崩潰是轉儲文件大小限制
man loaded memort 最大鎖定內存大小
open file 能打開的文件句柄數

這些變量定義在 /etc/security/limits.conf配置文件中。

七、關於失效的連接

情況1: 客戶端主動斷開

如果是客戶端主動將連接close(), 那往這些連接中寫數據時會得到ErrBadConn的錯誤,如果此時依然可以重試,將會獲取新的連接。

代碼如下:

func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
	var res Result
	var err error
	for i := 0; i < maxBadConnRetries; i++ {
		res, err = db.exec(ctx, query, args, cachedOrNewConn)
		if err != driver.ErrBadConn {
			break
		}
	}
	if err == driver.ErrBadConn {
		return db.exec(ctx, query, args, alwaysNewConn)
	}
	return res, err
}

情況2: 服務端掛啦

因為這種數據庫連接底層使用的是tcp實現。(tcp本身是支持全雙工的,客戶端和服務端支持同時往對方發送數據)依賴諸如:校驗和、確認應答和序列號機制、超時重傳、連接管理(3次握手,4次揮手)、以及滑動窗口、流量控制、擁塞避免,去實現整個數據交互的可靠性,協調整體不擁擠。

這時客戶端拿着一條自認為是正常的連接,往連接里面寫數據。然鵝,另一端端服務端已經掛了~,但是不幸的是,客戶端的tcp連接根本感知不到~~~。

但是當它去讀取服務端的返回數據時會遇到錯誤:unexceptBadConn EOF

八、連接的有效性

  • 思路1:

設置連接的屬性: maxLifeTime

上面也說過了,當設置了這個屬性后,DB會開啟一條協程connectionCleaner,專門負責清理過期的連接。

這在一定程度上避免了服務端將連接斷掉后,客戶端無感知的情況。

maxLifeTime的值到底設置多大?參考值,比數據庫的wait_timeout小一些就ok。

  • 思路2:

主動檢查連接的有效性。

比如在連接放回到空閑連接池前ping測試。在使用連接發送數據前進行連通性測試。

公眾號首發、歡迎關注


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM