go 數據庫鏈接池


 

數據庫鏈接池的實現步驟

 

 

 

 

ConnPool

type ConnPool interface {
	Get() (*Conn, error) // 獲取資源
	Pulish(*Conn) error     // 釋放資源,返回池中
	Shutdown() error             // 關閉池
}


type Connpool struct {
	lock 		 sync.Mutex
	ConnList     []*Conn //鏈接
	capacity     int32  // 鏈接池最大鏈接限制
	numOpen      int32  // 當前池中空閑鏈接數
	running      int32  // 正在使用的鏈接數
	expiryDuration      time.Duration //掃描時間
	defaultExpiration time.Duration//鏈接的過期時間
	factory      newConn // 創建連接的方法
	j			 *janitor //監視器
	isClose      bool    //鏈接池是否關閉
}

這里主要介紹newConn,自定義函數類型,返回數據庫鏈接,如這里為redis鏈接:

type newConn func()(redis.Conn)

  

Conn

type Conn struct{
	Expiration int64
	Conn redis.Conn
}

func (C *Conn)close(){
	err := C.Conn.Close()
	if err != nil{
		log.Println("關閉鏈接失敗!",err)
	}
}

  

 

初始化Pool

func NewGenericPool(capacity int32,expiryDuration time.Duration,defaultExpiration time.Duration) (*Connpool, error) {
	if capacity <= 0 {
		return nil, errors.New("Pool capacity <0,not Create")
	}
	p := &Connpool{
		capacity:       int32(capacity),
		expiryDuration: expiryDuration,
		running:        0,
		numOpen:        0,
		defaultExpiration:defaultExpiration,
		factory:func()(redis.Conn){
			rs,err := redis.Dial("tcp", "127.0.0.1:6379")
			if err != nil{
				return nil
			}
			return rs
		},
		isClose:false,
	}
	// 啟動定期清理過期鏈接,獨立goroutine運行,
	// 進一步節省系統資源
	j := p.monitorAndClear()
	p.j = j
	return p, nil
}

 

 
        

監控器

監視器定期掃描池中空閑鏈接是否過期,主要使用了定時器,利用select監聽定時器的信道,每到掃描時間就會執行掃描操作,過期則刪除,三個字段:

     c        *Connpool //監控的鏈接池
	Interval time.Duration //定期掃描時間
	stop     chan bool  //通知鏈接池關閉,這里使用一個空struct{}更好,你可以自定義一個類型,type sig struct{}
func (c *Connpool)monitorAndClear()*janitor{
	return runJanitor(c,c.expiryDuration)
}

type janitor struct {
	c        *Connpool
	Interval time.Duration
	stop     chan bool
}

func (j *janitor) Run() {
	//創建定時器
	ticker := time.NewTicker(j.Interval)
	print("開啟定時器\n")
	for {
		select {
		case <-ticker.C://當定時器每次到達設置的時間時就會向管道發送消息,檢測鏈接隊列中鏈接是否過期
			print("開始掃描\n")
			j.c.DeleteExpired()
		case <-j.stop: //監視器退出信道,
			ticker.Stop()
			close(j.stop)
			return
		}
	}
}
func (j *janitor)stopJanitor(){
	j.stop <- true
}

func runJanitor(c *Connpool,ci time.Duration)*janitor{
	j := &janitor{
		c:c,
		Interval: ci,
		stop:     make(chan bool),
	}
	go j.Run()//運行監控器
	return j
} 

掃描&&刪除

////////////////////////////////////////
func (c *Connpool) DeleteExpired() {
	//現在時間戳
	now := time.Now().UnixNano()
	//加互斥鎖
	c.lock.Lock()
	for i,conn := range c.ConnList {
		// "Inlining" of expired
		if conn.Expiration > 0 && now > conn.Expiration {
			//超時則刪除
			o, ok := c.delete( c.ConnList,i)
			//類型斷言
			if ok {
				c.ConnList = o.([]*Conn)
			}
			conn.close()
		}
	}
	c.lock.Unlock()//解互斥鎖
}

func (c *Connpool) delete(slice interface{}, index int) (interface{}, bool) {
	//判斷是否是切片類型
	v := reflect.ValueOf(slice)
	if v.Kind() != reflect.Slice {
		return nil, false
	}
	//參數檢查
	if v.Len() == 0 || index < 0 || index > v.Len() - 1 {
		return nil, false
	}

	return reflect.AppendSlice(v.Slice(0, index), v.Slice(index+1, v.Len())).Interface(), true
}


////////////////////////////////////////////////////

  

 

 

獲取鏈接&&釋放資源&&關閉鏈接池

// 獲取資源
func (c *Connpool) Get() (*Conn){
	if c.isClose {
		return nil
	}
	var conn *Conn
	// 標志,表示當前運行的鏈接數量是否已達容量上限
	waiting := false
	// 涉及從鏈接隊列取可用鏈接,需要加鎖
	c.lock.Lock()
	ConnList := c.ConnList
	n := len(ConnList) - 1
	fmt.Println("空閑鏈接數量:",n+1)
	fmt.Println("鏈接池現在運行的鏈接數量:",c.running)
	// 當前鏈接隊列為空(無空閑鏈接)
	if n < 0 {
		//沒有空閑的鏈接有兩種可能:
		//1.運行的鏈接超出了pool容量
		//2.當前是空pool,從未往pool添加鏈接或者一段時間內沒有鏈接添加,被定期清除
		// 運行鏈接數目已達到該Pool的容量上限,置等待標志
		if c.running >= c.capacity {
			//print("超過上限")
			waiting = true
		} else {
			// 當前無空閑鏈接但是Pool還沒有滿,
			// 則可以直接新開一個鏈接執行任務
			c.running++
			conn = &Conn{
				time.Now().Add(c.defaultExpiration).UnixNano(),
				c.factory(),
			}
		}
		// 有空閑鏈接,從隊列尾部取出一個使用
	} else {

		conn = ConnList[n]
		ConnList[n] = nil
		c.ConnList = ConnList[:n]
		c.running++
	}
	// 解鎖
	c.lock.Unlock()
	if waiting {
		//當一個鏈接執行完以后會添加到池中,有了空閑的鏈接就可以繼續執行:
		// 阻塞等待直到有空閑鏈接
		for len(c.ConnList) == 0{
			continue
		}
		c.lock.Lock()
		ConnList = c.ConnList
		l := len(ConnList) - 1
		conn = ConnList[l]
		ConnList[l] = nil
		c.ConnList = ConnList[:l]
		c.running++
		c.lock.Unlock()
	}
	return conn
}

// 釋放資源,返回池中
func (c *Connpool) Pulish(conn *Conn) error {
	if c.isClose {
		return nil
	}
	conn.Expiration = time.Now().UnixNano()
	c.lock.Lock()
	c.running --
	c.ConnList = append(c.ConnList,conn)
	c.lock.Unlock()
	return nil
}

// 關閉池
func (c *Connpool) Shutdown() error {
	c.isClose = true
	for _,conn := range c.ConnList {
		conn.close()
	}
	c.j.stopJanitor()
	return nil
}

 

演示代碼

package main

import (
	"Pool/Conn_Pool"
	"fmt"
	"github.com/garyburd/redigo/redis"
	"time"
)



func main() {
	//容量、掃描時間、鍵值默認過期時間
	Pool,_ := Conn_Pool.NewGenericPool(10,10 * time.Second,5 *time.Second)
	c := Pool.Get()
	//通過Do函數,發送redis命令
	v, err := c.Conn.Do("SET", "name1", "小王")
	if err != nil {
		fmt.Println(err)
		return
	}
	v, err = redis.String(c.Conn.Do("GET", "name1"))
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println(v)
	Pool.Pulish(c)


	time.Sleep(time.Second)
	c = Pool.Get()
	//通過Do函數,發送redis命令
	v, err = c.Conn.Do("SET", "name2", "李四")
	if err != nil {
		fmt.Println(err)
		return
	}
	v, err = redis.String(c.Conn.Do("GET", "name2"))
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println(v)
	Pool.Pulish(c)


	time.Sleep(time.Second)
	c = Pool.Get()
	//通過Do函數,發送redis命令
	v, err = c.Conn.Do("SET", "name3", "sb")
	if err != nil {
		fmt.Println(err)
		return
	}
	v, err = redis.String(c.Conn.Do("GET", "name3"))
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println(v)
	Pool.Pulish(c)
	select {

	}
}

 

 

 

 

源碼

package Conn_Pool

import (
	"errors"
	"fmt"
	"log"
	"reflect"
	"sync"
	"time"
	"github.com/garyburd/redigo/redis"
)

type Conn struct{
	Expiration int64
	Conn redis.Conn
}

func (C *Conn)close(){
	err := C.Conn.Close()
	if err != nil{
		log.Println("關閉鏈接失敗!",err)
	}
}
type newConn func()(redis.Conn)

//type sig struct{}


type ConnPool interface {
	Get() (*Conn, error) // 獲取資源
	Pulish(*Conn) error     // 釋放資源,返回池中
	Shutdown() error             // 關閉池
}


type Connpool struct {
	lock 		 sync.Mutex
	ConnList     []*Conn //鏈接
	capacity     int32  // 鏈接池最大鏈接限制
	numOpen      int32  // 當前池中空閑鏈接數
	running      int32  // 正在使用的鏈接數
	expiryDuration      time.Duration //掃描時間
	defaultExpiration time.Duration//鏈接的過期時間
	factory      newConn // 創建連接的方法
	j			 *janitor //監視器
	isClose      bool    //鏈接池是否關閉
}

func NewGenericPool(capacity int32,expiryDuration time.Duration,defaultExpiration time.Duration) (*Connpool, error) {
	if capacity <= 0 {
		return nil, errors.New("Pool capacity <0,not Create")
	}
	p := &Connpool{
		capacity:       int32(capacity),
		expiryDuration: expiryDuration,
		running:        0,
		numOpen:        0,
		defaultExpiration:defaultExpiration,
		factory:func()(redis.Conn){
			rs,err := redis.Dial("tcp", "127.0.0.1:6379")
			if err != nil{
				return nil
			}
			return rs
		},
		isClose:false,
	}
	// 啟動定期清理過期鏈接,獨立goroutine運行,
	// 進一步節省系統資源
	j := p.monitorAndClear()
	p.j = j
	return p, nil
}


// 獲取資源
func (c *Connpool) Get() (*Conn){
	if c.isClose {
		return nil
	}
	var conn *Conn
	// 標志,表示當前運行的鏈接數量是否已達容量上限
	waiting := false
	// 涉及從鏈接隊列取可用鏈接,需要加鎖
	c.lock.Lock()
	ConnList := c.ConnList
	n := len(ConnList) - 1
	fmt.Println("空閑鏈接數量:",n+1)
	fmt.Println("鏈接池現在運行的鏈接數量:",c.running)
	// 當前worker隊列為空(無空閑worker)
	if n < 0 {
		//沒有空閑的鏈接有兩種可能:
		//1.運行的鏈接超出了pool容量
		//2.當前是空pool,從未往pool添加鏈接或者一段時間內沒有鏈接添加,被定期清除
		// 運行鏈接數目已達到該Pool的容量上限,置等待標志
		if c.running >= c.capacity {
			//print("超過上限")
			waiting = true
		} else {
			// 當前無空閑鏈接但是Pool還沒有滿,
			// 則可以直接新開一個鏈接執行任務
			c.running++
			conn = &Conn{
				time.Now().Add(c.defaultExpiration).UnixNano(),
				c.factory(),
			}
		}
		// 有空閑鏈接,從隊列尾部取出一個使用
	} else {

		conn = ConnList[n]
		ConnList[n] = nil
		c.ConnList = ConnList[:n]
		c.running++
	}
	// 判斷是否有鏈接可用結束,解鎖
	c.lock.Unlock()
	if waiting {
		//當一個鏈接執行完以后會添加到池中,有了空閑的鏈接就可以繼續執行:
		// 阻塞等待直到有空閑鏈接
		for len(c.ConnList) == 0{
			continue
		}
		c.lock.Lock()
		ConnList = c.ConnList
		l := len(ConnList) - 1
		conn = ConnList[l]
		ConnList[l] = nil
		c.ConnList = ConnList[:l]
		c.running++
		c.lock.Unlock()
	}
	return conn
}

// 釋放資源,返回池中
func (c *Connpool) Pulish(conn *Conn) error {
	if c.isClose {
		return nil
	}
	conn.Expiration = time.Now().UnixNano()//更新鏈接的過期時間
	c.lock.Lock()
	c.running --
	c.ConnList = append(c.ConnList,conn)
	c.lock.Unlock()
	return nil
}

// 關閉池
func (c *Connpool) Shutdown() error {
	c.isClose = true
	for _,conn := range c.ConnList {
		conn.close()
	}
	c.j.stopJanitor()
	return nil
}



////////////////////////////////////////
func (c *Connpool) DeleteExpired() {
	//現在時間戳
	now := time.Now().UnixNano()
	//map加互斥鎖
	c.lock.Lock()
	for i,conn := range c.ConnList {
		// "Inlining" of expired
		//檢測map
		if conn.Expiration > 0 && now > conn.Expiration {
			//超時則刪除
			o, ok := c.delete( c.ConnList,i)
			//類型斷言
			if ok {
				c.ConnList = o.([]*Conn)
			}
			conn.close()
		}
	}
	c.lock.Unlock()//解互斥鎖
}

func (c *Connpool) delete(slice interface{}, index int) (interface{}, bool) {
	//判斷是否是切片類型
	v := reflect.ValueOf(slice)
	if v.Kind() != reflect.Slice {
		return nil, false
	}
	//參數檢查
	if v.Len() == 0 || index < 0 || index > v.Len() - 1 {
		return nil, false
	}

	return reflect.AppendSlice(v.Slice(0, index), v.Slice(index+1, v.Len())).Interface(), true
}


////////////////////////////////////////////////////



//////////////////////////監視器/////////////////////
func (c *Connpool)monitorAndClear()*janitor{
	return runJanitor(c,c.expiryDuration)
}

type janitor struct {
	c        *Connpool
	Interval time.Duration
	stop     chan bool
}

func (j *janitor) Run() {
	//創建定時器
	ticker := time.NewTicker(j.Interval)
	print("開啟定時器\n")
	for {
		select {
		case <-ticker.C://當定時器每次到達設置的時間時就會向管道發送消息,此時檢查鏈接隊列中鏈接是否過期
			print("開始掃描\n")
			j.c.DeleteExpired()
		case <-j.stop: //監視器退出信道,
			ticker.Stop()
			close(j.stop)
			return
		}
	}
}
func (j *janitor)stopJanitor(){
	j.stop <- true
}

func runJanitor(c *Connpool,ci time.Duration)*janitor{
	j := &janitor{
		c:c,
		Interval: ci,
		stop:     make(chan bool),
	}
	go j.Run()
	return j
}
/////////////////////////////////////

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM