數據庫鏈接池的實現步驟
ConnPool
type ConnPool interface { Get() (*Conn, error) // 獲取資源 Pulish(*Conn) error // 釋放資源,返回池中 Shutdown() error // 關閉池 } type Connpool struct { lock sync.Mutex ConnList []*Conn //鏈接 capacity int32 // 鏈接池最大鏈接限制 numOpen int32 // 當前池中空閑鏈接數 running int32 // 正在使用的鏈接數 expiryDuration time.Duration //掃描時間 defaultExpiration time.Duration//鏈接的過期時間 factory newConn // 創建連接的方法 j *janitor //監視器 isClose bool //鏈接池是否關閉 }
這里主要介紹newConn,自定義函數類型,返回數據庫鏈接,如這里為redis鏈接:
type newConn func()(redis.Conn)
Conn
type Conn struct{ Expiration int64 Conn redis.Conn } func (C *Conn)close(){ err := C.Conn.Close() if err != nil{ log.Println("關閉鏈接失敗!",err) } }
初始化Pool
func NewGenericPool(capacity int32,expiryDuration time.Duration,defaultExpiration time.Duration) (*Connpool, error) { if capacity <= 0 { return nil, errors.New("Pool capacity <0,not Create") } p := &Connpool{ capacity: int32(capacity), expiryDuration: expiryDuration, running: 0, numOpen: 0, defaultExpiration:defaultExpiration, factory:func()(redis.Conn){ rs,err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil{ return nil } return rs }, isClose:false, } // 啟動定期清理過期鏈接,獨立goroutine運行, // 進一步節省系統資源 j := p.monitorAndClear() p.j = j return p, nil }
監控器
監視器定期掃描池中空閑鏈接是否過期,主要使用了定時器,利用select監聽定時器的信道,每到掃描時間就會執行掃描操作,過期則刪除,三個字段:
c *Connpool //監控的鏈接池 Interval time.Duration //定期掃描時間 stop chan bool //通知鏈接池關閉,這里使用一個空struct{}更好,你可以自定義一個類型,type sig struct{}
func (c *Connpool)monitorAndClear()*janitor{ return runJanitor(c,c.expiryDuration) } type janitor struct { c *Connpool Interval time.Duration stop chan bool } func (j *janitor) Run() { //創建定時器 ticker := time.NewTicker(j.Interval) print("開啟定時器\n") for { select { case <-ticker.C://當定時器每次到達設置的時間時就會向管道發送消息,檢測鏈接隊列中鏈接是否過期 print("開始掃描\n") j.c.DeleteExpired() case <-j.stop: //監視器退出信道, ticker.Stop() close(j.stop) return } } } func (j *janitor)stopJanitor(){ j.stop <- true } func runJanitor(c *Connpool,ci time.Duration)*janitor{ j := &janitor{ c:c, Interval: ci, stop: make(chan bool), } go j.Run()//運行監控器 return j }
掃描&&刪除
//////////////////////////////////////// func (c *Connpool) DeleteExpired() { //現在時間戳 now := time.Now().UnixNano() //加互斥鎖 c.lock.Lock() for i,conn := range c.ConnList { // "Inlining" of expired if conn.Expiration > 0 && now > conn.Expiration { //超時則刪除 o, ok := c.delete( c.ConnList,i) //類型斷言 if ok { c.ConnList = o.([]*Conn) } conn.close() } } c.lock.Unlock()//解互斥鎖 } func (c *Connpool) delete(slice interface{}, index int) (interface{}, bool) { //判斷是否是切片類型 v := reflect.ValueOf(slice) if v.Kind() != reflect.Slice { return nil, false } //參數檢查 if v.Len() == 0 || index < 0 || index > v.Len() - 1 { return nil, false } return reflect.AppendSlice(v.Slice(0, index), v.Slice(index+1, v.Len())).Interface(), true } ////////////////////////////////////////////////////
獲取鏈接&&釋放資源&&關閉鏈接池
// 獲取資源 func (c *Connpool) Get() (*Conn){ if c.isClose { return nil } var conn *Conn // 標志,表示當前運行的鏈接數量是否已達容量上限 waiting := false // 涉及從鏈接隊列取可用鏈接,需要加鎖 c.lock.Lock() ConnList := c.ConnList n := len(ConnList) - 1 fmt.Println("空閑鏈接數量:",n+1) fmt.Println("鏈接池現在運行的鏈接數量:",c.running) // 當前鏈接隊列為空(無空閑鏈接) if n < 0 { //沒有空閑的鏈接有兩種可能: //1.運行的鏈接超出了pool容量 //2.當前是空pool,從未往pool添加鏈接或者一段時間內沒有鏈接添加,被定期清除 // 運行鏈接數目已達到該Pool的容量上限,置等待標志 if c.running >= c.capacity { //print("超過上限") waiting = true } else { // 當前無空閑鏈接但是Pool還沒有滿, // 則可以直接新開一個鏈接執行任務 c.running++ conn = &Conn{ time.Now().Add(c.defaultExpiration).UnixNano(), c.factory(), } } // 有空閑鏈接,從隊列尾部取出一個使用 } else { conn = ConnList[n] ConnList[n] = nil c.ConnList = ConnList[:n] c.running++ } // 解鎖 c.lock.Unlock() if waiting { //當一個鏈接執行完以后會添加到池中,有了空閑的鏈接就可以繼續執行: // 阻塞等待直到有空閑鏈接 for len(c.ConnList) == 0{ continue } c.lock.Lock() ConnList = c.ConnList l := len(ConnList) - 1 conn = ConnList[l] ConnList[l] = nil c.ConnList = ConnList[:l] c.running++ c.lock.Unlock() } return conn } // 釋放資源,返回池中 func (c *Connpool) Pulish(conn *Conn) error { if c.isClose { return nil } conn.Expiration = time.Now().UnixNano() c.lock.Lock() c.running -- c.ConnList = append(c.ConnList,conn) c.lock.Unlock() return nil } // 關閉池 func (c *Connpool) Shutdown() error { c.isClose = true for _,conn := range c.ConnList { conn.close() } c.j.stopJanitor() return nil }
演示代碼
package main import ( "Pool/Conn_Pool" "fmt" "github.com/garyburd/redigo/redis" "time" ) func main() { //容量、掃描時間、鍵值默認過期時間 Pool,_ := Conn_Pool.NewGenericPool(10,10 * time.Second,5 *time.Second) c := Pool.Get() //通過Do函數,發送redis命令 v, err := c.Conn.Do("SET", "name1", "小王") if err != nil { fmt.Println(err) return } v, err = redis.String(c.Conn.Do("GET", "name1")) if err != nil { fmt.Println(err) return } fmt.Println(v) Pool.Pulish(c) time.Sleep(time.Second) c = Pool.Get() //通過Do函數,發送redis命令 v, err = c.Conn.Do("SET", "name2", "李四") if err != nil { fmt.Println(err) return } v, err = redis.String(c.Conn.Do("GET", "name2")) if err != nil { fmt.Println(err) return } fmt.Println(v) Pool.Pulish(c) time.Sleep(time.Second) c = Pool.Get() //通過Do函數,發送redis命令 v, err = c.Conn.Do("SET", "name3", "sb") if err != nil { fmt.Println(err) return } v, err = redis.String(c.Conn.Do("GET", "name3")) if err != nil { fmt.Println(err) return } fmt.Println(v) Pool.Pulish(c) select { } }
源碼
package Conn_Pool import ( "errors" "fmt" "log" "reflect" "sync" "time" "github.com/garyburd/redigo/redis" ) type Conn struct{ Expiration int64 Conn redis.Conn } func (C *Conn)close(){ err := C.Conn.Close() if err != nil{ log.Println("關閉鏈接失敗!",err) } } type newConn func()(redis.Conn) //type sig struct{} type ConnPool interface { Get() (*Conn, error) // 獲取資源 Pulish(*Conn) error // 釋放資源,返回池中 Shutdown() error // 關閉池 } type Connpool struct { lock sync.Mutex ConnList []*Conn //鏈接 capacity int32 // 鏈接池最大鏈接限制 numOpen int32 // 當前池中空閑鏈接數 running int32 // 正在使用的鏈接數 expiryDuration time.Duration //掃描時間 defaultExpiration time.Duration//鏈接的過期時間 factory newConn // 創建連接的方法 j *janitor //監視器 isClose bool //鏈接池是否關閉 } func NewGenericPool(capacity int32,expiryDuration time.Duration,defaultExpiration time.Duration) (*Connpool, error) { if capacity <= 0 { return nil, errors.New("Pool capacity <0,not Create") } p := &Connpool{ capacity: int32(capacity), expiryDuration: expiryDuration, running: 0, numOpen: 0, defaultExpiration:defaultExpiration, factory:func()(redis.Conn){ rs,err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil{ return nil } return rs }, isClose:false, } // 啟動定期清理過期鏈接,獨立goroutine運行, // 進一步節省系統資源 j := p.monitorAndClear() p.j = j return p, nil } // 獲取資源 func (c *Connpool) Get() (*Conn){ if c.isClose { return nil } var conn *Conn // 標志,表示當前運行的鏈接數量是否已達容量上限 waiting := false // 涉及從鏈接隊列取可用鏈接,需要加鎖 c.lock.Lock() ConnList := c.ConnList n := len(ConnList) - 1 fmt.Println("空閑鏈接數量:",n+1) fmt.Println("鏈接池現在運行的鏈接數量:",c.running) // 當前worker隊列為空(無空閑worker) if n < 0 { //沒有空閑的鏈接有兩種可能: //1.運行的鏈接超出了pool容量 //2.當前是空pool,從未往pool添加鏈接或者一段時間內沒有鏈接添加,被定期清除 // 運行鏈接數目已達到該Pool的容量上限,置等待標志 if c.running >= c.capacity { //print("超過上限") waiting = true } else { // 當前無空閑鏈接但是Pool還沒有滿, // 則可以直接新開一個鏈接執行任務 c.running++ conn = &Conn{ time.Now().Add(c.defaultExpiration).UnixNano(), c.factory(), } } // 有空閑鏈接,從隊列尾部取出一個使用 } else { conn = ConnList[n] ConnList[n] = nil c.ConnList = ConnList[:n] c.running++ } // 判斷是否有鏈接可用結束,解鎖 c.lock.Unlock() if waiting { //當一個鏈接執行完以后會添加到池中,有了空閑的鏈接就可以繼續執行: // 阻塞等待直到有空閑鏈接 for len(c.ConnList) == 0{ continue } c.lock.Lock() ConnList = c.ConnList l := len(ConnList) - 1 conn = ConnList[l] ConnList[l] = nil c.ConnList = ConnList[:l] c.running++ c.lock.Unlock() } return conn } // 釋放資源,返回池中 func (c *Connpool) Pulish(conn *Conn) error { if c.isClose { return nil } conn.Expiration = time.Now().UnixNano()//更新鏈接的過期時間 c.lock.Lock() c.running -- c.ConnList = append(c.ConnList,conn) c.lock.Unlock() return nil } // 關閉池 func (c *Connpool) Shutdown() error { c.isClose = true for _,conn := range c.ConnList { conn.close() } c.j.stopJanitor() return nil } //////////////////////////////////////// func (c *Connpool) DeleteExpired() { //現在時間戳 now := time.Now().UnixNano() //map加互斥鎖 c.lock.Lock() for i,conn := range c.ConnList { // "Inlining" of expired //檢測map if conn.Expiration > 0 && now > conn.Expiration { //超時則刪除 o, ok := c.delete( c.ConnList,i) //類型斷言 if ok { c.ConnList = o.([]*Conn) } conn.close() } } c.lock.Unlock()//解互斥鎖 } func (c *Connpool) delete(slice interface{}, index int) (interface{}, bool) { //判斷是否是切片類型 v := reflect.ValueOf(slice) if v.Kind() != reflect.Slice { return nil, false } //參數檢查 if v.Len() == 0 || index < 0 || index > v.Len() - 1 { return nil, false } return reflect.AppendSlice(v.Slice(0, index), v.Slice(index+1, v.Len())).Interface(), true } //////////////////////////////////////////////////// //////////////////////////監視器///////////////////// func (c *Connpool)monitorAndClear()*janitor{ return runJanitor(c,c.expiryDuration) } type janitor struct { c *Connpool Interval time.Duration stop chan bool } func (j *janitor) Run() { //創建定時器 ticker := time.NewTicker(j.Interval) print("開啟定時器\n") for { select { case <-ticker.C://當定時器每次到達設置的時間時就會向管道發送消息,此時檢查鏈接隊列中鏈接是否過期 print("開始掃描\n") j.c.DeleteExpired() case <-j.stop: //監視器退出信道, ticker.Stop() close(j.stop) return } } } func (j *janitor)stopJanitor(){ j.stop <- true } func runJanitor(c *Connpool,ci time.Duration)*janitor{ j := &janitor{ c:c, Interval: ci, stop: make(chan bool), } go j.Run() return j } /////////////////////////////////////