Go語言之從0到1實現一個簡單的Redis連接池


Go語言之從0到1實現一個簡單的Redis連接池

前言

最近學習了一些Go語言開發相關內容,但是苦於手頭沒有可以練手的項目,學的時候理解不清楚,學過容易忘。

結合之前組內分享時學到的Redis相關知識,以及Redis Protocol文檔,就想着自己造個輪子練練手。

這次我把目標放在了Redis client implemented with Go,使用原生Go語言和TCP實現一個簡單的Redis連接池和協議解析,以此來讓自己入門Go語言,並加深理解和記憶。(這樣做直接導致的后果是,最近寫JS時if語句總是忘帶括號QAQ)。

本文只能算是學習Go語言時的一個隨筆,並不是真正要造一個線上環境可用的Go-Redis庫~(︿( ̄︶ ̄)︿攤手)

順便安利以下自己做的一個跨平台開源Redis管理軟件:AwesomeRedisManager官網AwesomeRedisManager源碼

Redis協議主要參考這篇文檔通信協議(protocol),閱讀后了解到,Redis Protocol並沒有什么復雜之處,主要是使用TCP來傳輸一些固定格式的字符串數據達到發送命令和解析Response數據的目的。

命令格式

根據文檔了解到,Redis命令格式為(CR LF即\r\n):

*<參數數量N> CR LF
$<參數 1 的字節數量> CR LF
<參數 1 的數據> CR LF
...
$<參數 N 的字節數量> CR LF
<參數 N 的數據> CR LF

命令的每一行都使用CRLF結尾,在命令結構的開頭就聲明了命令的參數數量,每一條參數都帶有長度標記,方便服務端解析。

例如,發送一個SET命令set name jeferwang

*3
$3
SET
$4
name
$9
jeferwang

響應格式

Redis的響應回復數據主要分為五種類型:

  • 狀態回復:一行數據,使用+開頭(例如:OK、PONG等)
+OK\r\n
+PONG\r\n
  • 錯誤回復:一行數據,使用-開頭(Redis執行命令時產生的錯誤)
-ERR unknown command 'demo'\r\n
  • 整數回復:一行數據,使用:開頭(例如:llen返回的長度數值等)
:100\r\n
  • 批量回復(可以理解為字符串):兩行數據,使用$開頭,第一行為內容長度,第二行為具體內容
$5\r\n
abcde\r\n

特殊情況:$-1\r\n即為返回空數據,可以轉化為nil
  • 多條批量回復:使用*開頭,第一行標識本次回復包含多少條批量回復,后面每兩行為一個批量回復(lrange、hgetall等命令的返回數據)
*2\r\n
$5\r\n
ABCDE\r\n
$2\r\n
FG\r\n

更詳細的命令和回復格式可以從Redis Protocol文檔了解到,本位只介紹一些基本的開發中需要用到的內容

以下為部分代碼,完整代碼見GitHub:redis4go

實現流程

  1. 首先,我們根據官網文檔了解到了Redis傳輸協議,即Redis使用TCP傳輸命令的格式和接收數據的格式,據此,我們可以使用Go實現對Redis協議的解析
  2. 接下來,在可以建立Redis連接並進行數據傳輸的前提下,實現一個連接池。
  3. 實現拼接Redis命令的方法,通過TCP發送到RedisServer
  4. 讀取RedisResponse,實現解析數據的方法

模塊結構分析

簡單分析Redis連接池的結構,可以先簡單規划為5個部分:

  • 結構體定義entity.go
  • Redis連接和調用redis_conn.go
  • Redis數據類型解析data_type.go
  • 連接池實現pool.go

共划分為上述四個部分

對象結構定義

為了實現連接池及Redis數據庫連接,我們需要如下結構:

  • Redis服務器配置RedisConfig:包含Host、Port等信息
  • Redis連接池配置PoolConfig:繼承RedisConfig,包含PoolSize等信息
  • Redis連接池結構:包含連接隊列、連接池配置等信息
  • 單個Redis連接:包含TCP連接Handler、是否處於空閑標記位、當前使用的數據庫等信息
package redis4go

import (
	"net"
	"sync"
)

type RedisConfig struct {
	Host     string // RedisServer主機地址
	Port     int    // RedisServer主機端口
	Password string // RedisServer需要的Auth驗證,不填則為空
}

// 連接池的配置數據
type PoolConfig struct {
	RedisConfig
	PoolSize int // 連接池的大小
}

// 連接池結構
type Pool struct {
	Config PoolConfig          // 建立連接池時的配置
	Queue  chan *RedisConn     // 連接池
	Store  map[*RedisConn]bool // 所有的連接
	mu     sync.Mutex          // 加鎖
}

// 單個Redis連接的結構
type RedisConn struct {
	mu        sync.Mutex   // 加鎖
	p         *Pool        // 所屬的連接池
	IsRelease bool         // 是否處於釋放狀態
	IsClose   bool         // 是否已關閉
	TcpConn   *net.TCPConn // 建立起的到RedisServer的連接
	DBIndex   int          // 當前連接正在使用第幾個Redis數據庫
}

type RedisResp struct {
	rType byte     // 回復類型(+-:$*)
	rData [][]byte // 從TCP連接中讀取的數據統一使用二維數組返回
}

根據之前的規划,定義好基本的結構之后,我們可以先實現一個簡單的Pool對象池

Redis連接

建立連接

首先我們需要實現一個建立Redis連接的方法

// 創建一個RedisConn對象
func createRedisConn(config RedisConfig) (*RedisConn, error) {
	tcpAddr := &net.TCPAddr{IP: net.ParseIP(config.Host), Port: config.Port}
	tcpConn, err := net.DialTCP("tcp", nil, tcpAddr)
	if err != nil {
		return nil, err
	}
	return &RedisConn{
		IsRelease: true,
		IsClose:   false,
		TcpConn:   tcpConn,
		DBIndex:   0,
	}, nil
}

實現連接池

在Go語言中,我們可以使用一個chan來很輕易地實現一個指定容量的隊列,來作為連接池使用,當池中沒有連接時,申請獲取連接時將會被阻塞,直到放入新的連接。

package redis4go

func CreatePool(config PoolConfig) (*Pool, error) {
	pool := &Pool{
		Config: config,
		Queue:  make(chan *RedisConn, config.PoolSize),
		Store:  make(map[*RedisConn]bool, config.PoolSize),
	}
	for i := 0; i < config.PoolSize; i++ {
		redisConn, err := createRedisConn(config.RedisConfig)
		if err != nil {
			// todo 處理之前已經創建好的鏈接
			return nil, err
		}
		redisConn.p = pool
		pool.Queue <- redisConn
		pool.Store[redisConn] = true
	}
	return pool, nil
}

// 獲取一個連接
func (pool *Pool) getConn() *RedisConn {
	pool.mu.Lock()
	// todo 超時機制
	conn := <-pool.Queue
	conn.IsRelease = false
	pool.mu.Unlock()
	return conn
}

// 關閉連接池
func (pool *Pool) Close() {
	for conn := range pool.Store {
		err := conn.Close()
		if err != nil {
			// todo 處理連接關閉的錯誤?
		}
	}
}

發送命令&解析回復數據

下面是向RedisServer發送命令,以及讀取回復數據的簡單實現

func (conn *RedisConn) Call(params ...interface{}) (*RedisResp, error) {
	reqData, err := mergeParams(params...)
	if err != nil {
		return nil, err
	}
	conn.Lock()
	defer conn.Unlock()
	_, err = conn.TcpConn.Write(reqData)
	if err != nil {
		return nil, err
	}
	resp, err := conn.getReply()
	if err != nil {
		return nil, err
	}
	if resp.rType == '-' {
		return resp, resp.ParseError()
	}
	return resp, nil
}

func (conn *RedisConn) getReply() (*RedisResp, error) {
	b := make([]byte, 1)
	_, err := conn.TcpConn.Read(b)
	if err != nil {
		return nil, err
	}
	resp := new(RedisResp)
	resp.rType = b[0]
	switch b[0] {
	case '+':
		// 狀態回復
		fallthrough
	case '-':
		// 錯誤回復
		fallthrough
	case ':':
		// 整數回復
		singleResp := make([]byte, 1)
		for {
			_, err := conn.TcpConn.Read(b)
			if err != nil {
				return nil, err
			}
			if b[0] != '\r' && b[0] != '\n' {
				singleResp = append(singleResp, b[0])
			}
			if b[0] == '\n' {
				break
			}
		}
		resp.rData = append(resp.rData, singleResp)
	case '$':
		buck, err := conn.readBuck()
		if err != nil {
			return nil, err
		}
		resp.rData = append(resp.rData, buck)
	case '*':
		// 條目數量
		itemNum := 0
		for {
			_, err := conn.TcpConn.Read(b)
			if err != nil {
				return nil, err
			}
			if b[0] == '\r' {
				continue
			}
			if b[0] == '\n' {
				break
			}
			itemNum = itemNum*10 + int(b[0]-'0')
		}
		for i := 0; i < itemNum; i++ {
			buck, err := conn.readBuck()
			if err != nil {
				return nil, err
			}
			resp.rData = append(resp.rData, buck)
		}
	default:
		return nil, errors.New("錯誤的服務器回復")
	}
	return resp, nil
}

func (conn *RedisConn) readBuck() ([]byte, error) {
	b := make([]byte, 1)
	dataLen := 0
	for {
		_, err := conn.TcpConn.Read(b)
		if err != nil {
			return nil, err
		}
		if b[0] == '$' {
			continue
		}
		if b[0] == '\r' {
			break
		}
		dataLen = dataLen*10 + int(b[0]-'0')
	}
	bf := bytes.Buffer{}
	for i := 0; i < dataLen+3; i++ {
		_, err := conn.TcpConn.Read(b)
		if err != nil {
			return nil, err
		}
		bf.Write(b)
	}
	return bf.Bytes()[1 : bf.Len()-2], nil
}

func mergeParams(params ...interface{}) ([]byte, error) {
	count := len(params) // 參數數量
	bf := bytes.Buffer{}
	// 參數數量
	{
		bf.WriteString("*")
		bf.WriteString(strconv.Itoa(count))
		bf.Write([]byte{'\r', '\n'})
	}
	for _, p := range params {
		bf.Write([]byte{'$'})
		switch p.(type) {
		case string:
			str := p.(string)
			bf.WriteString(strconv.Itoa(len(str)))
			bf.Write([]byte{'\r', '\n'})
			bf.WriteString(str)
			break
		case int:
			str := strconv.Itoa(p.(int))
			bf.WriteString(strconv.Itoa(len(str)))
			bf.Write([]byte{'\r', '\n'})
			bf.WriteString(str)
			break
		case nil:
			bf.WriteString("-1")
			break
		default:
			// 不支持的參數類型
			return nil, errors.New("參數只能是String或Int")
		}
		bf.Write([]byte{'\r', '\n'})
	}
	return bf.Bytes(), nil
}

實現幾個常用數據類型的解析

package redis4go

import (
	"errors"
	"strconv"
)

func (resp *RedisResp) ParseError() error {
	if resp.rType != '-' {
		return nil
	}
	return errors.New(string(resp.rData[0]))
}

func (resp *RedisResp) ParseInt() (int, error) {
	switch resp.rType {
	case '-':
		return 0, resp.ParseError()
	case '$':
		fallthrough
	case ':':
		str, err := resp.ParseString()
		if err != nil {
			return 0, err
		}
		return strconv.Atoi(str)
	default:
		return 0, errors.New("錯誤的回復類型")
	}
}

func (resp *RedisResp) ParseString() (string, error) {
	switch resp.rType {
	case '-':
		return "", resp.ParseError()
	case '+':
		fallthrough
	case ':':
		fallthrough
	case '$':
		return string(resp.rData[0]), nil
	default:
		return "", errors.New("錯誤的回復類型")
	}
}
func (resp *RedisResp) ParseList() ([]string, error) {
	switch resp.rType {
	case '-':
		return nil, resp.ParseError()
	case '*':
		list := make([]string, 0, len(resp.rData))
		for _, data := range resp.rData {
			list = append(list, string(data))
		}
		return list, nil
	default:
		return nil, errors.New("錯誤的回復類型")
	}
}
func (resp *RedisResp) ParseMap() (map[string]string, error) {
	switch resp.rType {
	case '-':
		return nil, resp.ParseError()
	case '*':
		mp := make(map[string]string)
		for i := 0; i < len(resp.rData); i += 2 {
			mp[string(resp.rData[i])] = string(resp.rData[i+1])
		}
		return mp, nil
	default:
		return nil, errors.New("錯誤的回復類型")
	}
}

在開發的過程中,隨手編寫了幾個零零散散的測試文件,經測試,一些簡單的Redis命令以及能跑通了。

package redis4go

import (
	"testing"
)

func getConn() (*RedisConn, error) {
	pool, err := CreatePool(PoolConfig{
		RedisConfig: RedisConfig{
			Host: "127.0.0.1",
			Port: 6379,
		},
		PoolSize: 10,
	})
	if err != nil {
		return nil, err
	}
	conn := pool.getConn()
	return conn, nil
}

func TestRedisResp_ParseString(t *testing.T) {
	demoStr := string([]byte{'A', '\n', '\r', '\n', 'b', '1'})
	conn, _ := getConn()
	_, _ = conn.Call("del", "name")
	_, _ = conn.Call("set", "name", demoStr)
	resp, err := conn.Call("get", "name")
	if err != nil {
		t.Fatal("Call Error:", err.Error())
	}
	str, err := resp.ParseString()
	if err != nil {
		t.Fatal("Parse Error:", err.Error())
	}
	if str != demoStr {
		t.Fatal("結果錯誤")
	}
}

func TestRedisResp_ParseList(t *testing.T) {
	conn, _ := getConn()
	_, _ = conn.Call("del", "testList")
	_, _ = conn.Call("lpush", "testList", 1, 2, 3, 4, 5)
	res, err := conn.Call("lrange", "testList", 0, -1)
	if err != nil {
		t.Fatal("Call Error:", err.Error())
	}
	ls, err := res.ParseList()
	if err != nil {
		t.Fatal("Parse Error:", err.Error())
	}
	if len(ls) != 5 {
		t.Fatal("結果錯誤")
	}
}

func TestRedisResp_ParseMap(t *testing.T) {
	conn, _ := getConn()
	_, _ = conn.Call("del", "testMap")
	_, err := conn.Call("hmset", "testMap", 1, 2, 3, 4, 5, 6)
	if err != nil {
		t.Fatal("設置Value失敗")
	}
	res, err := conn.Call("hgetall", "testMap")
	if err != nil {
		t.Fatal("Call Error:", err.Error())
	}
	ls, err := res.ParseMap()
	if err != nil {
		t.Fatal("Parse Error:", err.Error())
	}
	if len(ls) != 3 || ls["1"] != "2" {
		t.Fatal("結果錯誤")
	}
}

至此,已經算是達到了學習Go語言和學習Redis Protocol的目的,不過代碼中也有很多地方需要優化和完善,性能方面考慮的也並不周全。輪子就不重復造了,畢竟有很多功能完善的庫,從頭造一個輪子需要消耗的精力太多啦並且沒必要~

下一次我將會學習官方推薦的gomodule/redigo源碼,並分享我的心得。

--The End--


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM