數據庫鏈接池的實現步驟

ConnPool
type ConnPool interface {
Get() (*Conn, error) // 獲取資源
Pulish(*Conn) error // 釋放資源,返回池中
Shutdown() error // 關閉池
}
type Connpool struct {
lock sync.Mutex
ConnList []*Conn //鏈接
capacity int32 // 鏈接池最大鏈接限制
numOpen int32 // 當前池中空閑鏈接數
running int32 // 正在使用的鏈接數
expiryDuration time.Duration //掃描時間
defaultExpiration time.Duration//鏈接的過期時間
factory newConn // 創建連接的方法
j *janitor //監視器
isClose bool //鏈接池是否關閉
}
這里主要介紹newConn,自定義函數類型,返回數據庫鏈接,如這里為redis鏈接:
type newConn func()(redis.Conn)
Conn
type Conn struct{
Expiration int64
Conn redis.Conn
}
func (C *Conn)close(){
err := C.Conn.Close()
if err != nil{
log.Println("關閉鏈接失敗!",err)
}
}
初始化Pool
func NewGenericPool(capacity int32,expiryDuration time.Duration,defaultExpiration time.Duration) (*Connpool, error) {
if capacity <= 0 {
return nil, errors.New("Pool capacity <0,not Create")
}
p := &Connpool{
capacity: int32(capacity),
expiryDuration: expiryDuration,
running: 0,
numOpen: 0,
defaultExpiration:defaultExpiration,
factory:func()(redis.Conn){
rs,err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil{
return nil
}
return rs
},
isClose:false,
}
// 啟動定期清理過期鏈接,獨立goroutine運行,
// 進一步節省系統資源
j := p.monitorAndClear()
p.j = j
return p, nil
}
監控器
監視器定期掃描池中空閑鏈接是否過期,主要使用了定時器,利用select監聽定時器的信道,每到掃描時間就會執行掃描操作,過期則刪除,三個字段:
c *Connpool //監控的鏈接池
Interval time.Duration //定期掃描時間
stop chan bool //通知鏈接池關閉,這里使用一個空struct{}更好,你可以自定義一個類型,type sig struct{}
func (c *Connpool)monitorAndClear()*janitor{
return runJanitor(c,c.expiryDuration)
}
type janitor struct {
c *Connpool
Interval time.Duration
stop chan bool
}
func (j *janitor) Run() {
//創建定時器
ticker := time.NewTicker(j.Interval)
print("開啟定時器\n")
for {
select {
case <-ticker.C://當定時器每次到達設置的時間時就會向管道發送消息,檢測鏈接隊列中鏈接是否過期
print("開始掃描\n")
j.c.DeleteExpired()
case <-j.stop: //監視器退出信道,
ticker.Stop()
close(j.stop)
return
}
}
}
func (j *janitor)stopJanitor(){
j.stop <- true
}
func runJanitor(c *Connpool,ci time.Duration)*janitor{
j := &janitor{
c:c,
Interval: ci,
stop: make(chan bool),
}
go j.Run()//運行監控器
return j
}
掃描&&刪除
////////////////////////////////////////
func (c *Connpool) DeleteExpired() {
//現在時間戳
now := time.Now().UnixNano()
//加互斥鎖
c.lock.Lock()
for i,conn := range c.ConnList {
// "Inlining" of expired
if conn.Expiration > 0 && now > conn.Expiration {
//超時則刪除
o, ok := c.delete( c.ConnList,i)
//類型斷言
if ok {
c.ConnList = o.([]*Conn)
}
conn.close()
}
}
c.lock.Unlock()//解互斥鎖
}
func (c *Connpool) delete(slice interface{}, index int) (interface{}, bool) {
//判斷是否是切片類型
v := reflect.ValueOf(slice)
if v.Kind() != reflect.Slice {
return nil, false
}
//參數檢查
if v.Len() == 0 || index < 0 || index > v.Len() - 1 {
return nil, false
}
return reflect.AppendSlice(v.Slice(0, index), v.Slice(index+1, v.Len())).Interface(), true
}
////////////////////////////////////////////////////
獲取鏈接&&釋放資源&&關閉鏈接池
// 獲取資源
func (c *Connpool) Get() (*Conn){
if c.isClose {
return nil
}
var conn *Conn
// 標志,表示當前運行的鏈接數量是否已達容量上限
waiting := false
// 涉及從鏈接隊列取可用鏈接,需要加鎖
c.lock.Lock()
ConnList := c.ConnList
n := len(ConnList) - 1
fmt.Println("空閑鏈接數量:",n+1)
fmt.Println("鏈接池現在運行的鏈接數量:",c.running)
// 當前鏈接隊列為空(無空閑鏈接)
if n < 0 {
//沒有空閑的鏈接有兩種可能:
//1.運行的鏈接超出了pool容量
//2.當前是空pool,從未往pool添加鏈接或者一段時間內沒有鏈接添加,被定期清除
// 運行鏈接數目已達到該Pool的容量上限,置等待標志
if c.running >= c.capacity {
//print("超過上限")
waiting = true
} else {
// 當前無空閑鏈接但是Pool還沒有滿,
// 則可以直接新開一個鏈接執行任務
c.running++
conn = &Conn{
time.Now().Add(c.defaultExpiration).UnixNano(),
c.factory(),
}
}
// 有空閑鏈接,從隊列尾部取出一個使用
} else {
conn = ConnList[n]
ConnList[n] = nil
c.ConnList = ConnList[:n]
c.running++
}
// 解鎖
c.lock.Unlock()
if waiting {
//當一個鏈接執行完以后會添加到池中,有了空閑的鏈接就可以繼續執行:
// 阻塞等待直到有空閑鏈接
for len(c.ConnList) == 0{
continue
}
c.lock.Lock()
ConnList = c.ConnList
l := len(ConnList) - 1
conn = ConnList[l]
ConnList[l] = nil
c.ConnList = ConnList[:l]
c.running++
c.lock.Unlock()
}
return conn
}
// 釋放資源,返回池中
func (c *Connpool) Pulish(conn *Conn) error {
if c.isClose {
return nil
}
conn.Expiration = time.Now().UnixNano()
c.lock.Lock()
c.running --
c.ConnList = append(c.ConnList,conn)
c.lock.Unlock()
return nil
}
// 關閉池
func (c *Connpool) Shutdown() error {
c.isClose = true
for _,conn := range c.ConnList {
conn.close()
}
c.j.stopJanitor()
return nil
}
演示代碼
package main
import (
"Pool/Conn_Pool"
"fmt"
"github.com/garyburd/redigo/redis"
"time"
)
func main() {
//容量、掃描時間、鍵值默認過期時間
Pool,_ := Conn_Pool.NewGenericPool(10,10 * time.Second,5 *time.Second)
c := Pool.Get()
//通過Do函數,發送redis命令
v, err := c.Conn.Do("SET", "name1", "小王")
if err != nil {
fmt.Println(err)
return
}
v, err = redis.String(c.Conn.Do("GET", "name1"))
if err != nil {
fmt.Println(err)
return
}
fmt.Println(v)
Pool.Pulish(c)
time.Sleep(time.Second)
c = Pool.Get()
//通過Do函數,發送redis命令
v, err = c.Conn.Do("SET", "name2", "李四")
if err != nil {
fmt.Println(err)
return
}
v, err = redis.String(c.Conn.Do("GET", "name2"))
if err != nil {
fmt.Println(err)
return
}
fmt.Println(v)
Pool.Pulish(c)
time.Sleep(time.Second)
c = Pool.Get()
//通過Do函數,發送redis命令
v, err = c.Conn.Do("SET", "name3", "sb")
if err != nil {
fmt.Println(err)
return
}
v, err = redis.String(c.Conn.Do("GET", "name3"))
if err != nil {
fmt.Println(err)
return
}
fmt.Println(v)
Pool.Pulish(c)
select {
}
}

源碼
package Conn_Pool
import (
"errors"
"fmt"
"log"
"reflect"
"sync"
"time"
"github.com/garyburd/redigo/redis"
)
type Conn struct{
Expiration int64
Conn redis.Conn
}
func (C *Conn)close(){
err := C.Conn.Close()
if err != nil{
log.Println("關閉鏈接失敗!",err)
}
}
type newConn func()(redis.Conn)
//type sig struct{}
type ConnPool interface {
Get() (*Conn, error) // 獲取資源
Pulish(*Conn) error // 釋放資源,返回池中
Shutdown() error // 關閉池
}
type Connpool struct {
lock sync.Mutex
ConnList []*Conn //鏈接
capacity int32 // 鏈接池最大鏈接限制
numOpen int32 // 當前池中空閑鏈接數
running int32 // 正在使用的鏈接數
expiryDuration time.Duration //掃描時間
defaultExpiration time.Duration//鏈接的過期時間
factory newConn // 創建連接的方法
j *janitor //監視器
isClose bool //鏈接池是否關閉
}
func NewGenericPool(capacity int32,expiryDuration time.Duration,defaultExpiration time.Duration) (*Connpool, error) {
if capacity <= 0 {
return nil, errors.New("Pool capacity <0,not Create")
}
p := &Connpool{
capacity: int32(capacity),
expiryDuration: expiryDuration,
running: 0,
numOpen: 0,
defaultExpiration:defaultExpiration,
factory:func()(redis.Conn){
rs,err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil{
return nil
}
return rs
},
isClose:false,
}
// 啟動定期清理過期鏈接,獨立goroutine運行,
// 進一步節省系統資源
j := p.monitorAndClear()
p.j = j
return p, nil
}
// 獲取資源
func (c *Connpool) Get() (*Conn){
if c.isClose {
return nil
}
var conn *Conn
// 標志,表示當前運行的鏈接數量是否已達容量上限
waiting := false
// 涉及從鏈接隊列取可用鏈接,需要加鎖
c.lock.Lock()
ConnList := c.ConnList
n := len(ConnList) - 1
fmt.Println("空閑鏈接數量:",n+1)
fmt.Println("鏈接池現在運行的鏈接數量:",c.running)
// 當前worker隊列為空(無空閑worker)
if n < 0 {
//沒有空閑的鏈接有兩種可能:
//1.運行的鏈接超出了pool容量
//2.當前是空pool,從未往pool添加鏈接或者一段時間內沒有鏈接添加,被定期清除
// 運行鏈接數目已達到該Pool的容量上限,置等待標志
if c.running >= c.capacity {
//print("超過上限")
waiting = true
} else {
// 當前無空閑鏈接但是Pool還沒有滿,
// 則可以直接新開一個鏈接執行任務
c.running++
conn = &Conn{
time.Now().Add(c.defaultExpiration).UnixNano(),
c.factory(),
}
}
// 有空閑鏈接,從隊列尾部取出一個使用
} else {
conn = ConnList[n]
ConnList[n] = nil
c.ConnList = ConnList[:n]
c.running++
}
// 判斷是否有鏈接可用結束,解鎖
c.lock.Unlock()
if waiting {
//當一個鏈接執行完以后會添加到池中,有了空閑的鏈接就可以繼續執行:
// 阻塞等待直到有空閑鏈接
for len(c.ConnList) == 0{
continue
}
c.lock.Lock()
ConnList = c.ConnList
l := len(ConnList) - 1
conn = ConnList[l]
ConnList[l] = nil
c.ConnList = ConnList[:l]
c.running++
c.lock.Unlock()
}
return conn
}
// 釋放資源,返回池中
func (c *Connpool) Pulish(conn *Conn) error {
if c.isClose {
return nil
}
conn.Expiration = time.Now().UnixNano()//更新鏈接的過期時間
c.lock.Lock()
c.running --
c.ConnList = append(c.ConnList,conn)
c.lock.Unlock()
return nil
}
// 關閉池
func (c *Connpool) Shutdown() error {
c.isClose = true
for _,conn := range c.ConnList {
conn.close()
}
c.j.stopJanitor()
return nil
}
////////////////////////////////////////
func (c *Connpool) DeleteExpired() {
//現在時間戳
now := time.Now().UnixNano()
//map加互斥鎖
c.lock.Lock()
for i,conn := range c.ConnList {
// "Inlining" of expired
//檢測map
if conn.Expiration > 0 && now > conn.Expiration {
//超時則刪除
o, ok := c.delete( c.ConnList,i)
//類型斷言
if ok {
c.ConnList = o.([]*Conn)
}
conn.close()
}
}
c.lock.Unlock()//解互斥鎖
}
func (c *Connpool) delete(slice interface{}, index int) (interface{}, bool) {
//判斷是否是切片類型
v := reflect.ValueOf(slice)
if v.Kind() != reflect.Slice {
return nil, false
}
//參數檢查
if v.Len() == 0 || index < 0 || index > v.Len() - 1 {
return nil, false
}
return reflect.AppendSlice(v.Slice(0, index), v.Slice(index+1, v.Len())).Interface(), true
}
////////////////////////////////////////////////////
//////////////////////////監視器/////////////////////
func (c *Connpool)monitorAndClear()*janitor{
return runJanitor(c,c.expiryDuration)
}
type janitor struct {
c *Connpool
Interval time.Duration
stop chan bool
}
func (j *janitor) Run() {
//創建定時器
ticker := time.NewTicker(j.Interval)
print("開啟定時器\n")
for {
select {
case <-ticker.C://當定時器每次到達設置的時間時就會向管道發送消息,此時檢查鏈接隊列中鏈接是否過期
print("開始掃描\n")
j.c.DeleteExpired()
case <-j.stop: //監視器退出信道,
ticker.Stop()
close(j.stop)
return
}
}
}
func (j *janitor)stopJanitor(){
j.stop <- true
}
func runJanitor(c *Connpool,ci time.Duration)*janitor{
j := &janitor{
c:c,
Interval: ci,
stop: make(chan bool),
}
go j.Run()
return j
}
/////////////////////////////////////
