Go語言之從0到1實現一個簡單的Redis連接池
前言
最近學習了一些Go語言開發相關內容,但是苦於手頭沒有可以練手的項目,學的時候理解不清楚,學過容易忘。
結合之前組內分享時學到的Redis相關知識,以及Redis Protocol文檔,就想着自己造個輪子練練手。
這次我把目標放在了Redis client implemented with Go,使用原生Go語言和TCP實現一個簡單的Redis連接池和協議解析,以此來讓自己入門Go語言,並加深理解和記憶。(這樣做直接導致的后果是,最近寫JS時if語句總是忘帶括號QAQ)。
本文只能算是學習Go語言時的一個隨筆,並不是真正要造一個線上環境可用的Go-Redis庫~(︿( ̄︶ ̄)︿攤手)
順便安利以下自己做的一個跨平台開源Redis管理軟件:AwesomeRedisManager官網AwesomeRedisManager源碼
Redis協議主要參考這篇文檔通信協議(protocol),閱讀后了解到,Redis Protocol並沒有什么復雜之處,主要是使用TCP來傳輸一些固定格式的字符串數據達到發送命令和解析Response數據的目的。
命令格式
根據文檔了解到,Redis命令格式為(CR LF即\r\n):
*<參數數量N> CR LF
$<參數 1 的字節數量> CR LF
<參數 1 的數據> CR LF
...
$<參數 N 的字節數量> CR LF
<參數 N 的數據> CR LF
命令的每一行都使用CRLF結尾,在命令結構的開頭就聲明了命令的參數數量,每一條參數都帶有長度標記,方便服務端解析。
例如,發送一個SET命令set name jeferwang
:
*3
$3
SET
$4
name
$9
jeferwang
響應格式
Redis的響應回復數據主要分為五種類型:
- 狀態回復:一行數據,使用
+
開頭(例如:OK、PONG等)
+OK\r\n
+PONG\r\n
- 錯誤回復:一行數據,使用
-
開頭(Redis執行命令時產生的錯誤)
-ERR unknown command 'demo'\r\n
- 整數回復:一行數據,使用
:
開頭(例如:llen返回的長度數值等)
:100\r\n
- 批量回復(可以理解為字符串):兩行數據,使用
$
開頭,第一行為內容長度,第二行為具體內容
$5\r\n
abcde\r\n
特殊情況:$-1\r\n即為返回空數據,可以轉化為nil
- 多條批量回復:使用
*
開頭,第一行標識本次回復包含多少條批量回復,后面每兩行為一個批量回復(lrange、hgetall等命令的返回數據)
*2\r\n
$5\r\n
ABCDE\r\n
$2\r\n
FG\r\n
更詳細的命令和回復格式可以從Redis Protocol文檔了解到,本位只介紹一些基本的開發中需要用到的內容
以下為部分代碼,完整代碼見GitHub:redis4go
實現流程
- 首先,我們根據官網文檔了解到了Redis傳輸協議,即Redis使用TCP傳輸命令的格式和接收數據的格式,據此,我們可以使用Go實現對Redis協議的解析
- 接下來,在可以建立Redis連接並進行數據傳輸的前提下,實現一個連接池。
- 實現拼接Redis命令的方法,通過TCP發送到RedisServer
- 讀取RedisResponse,實現解析數據的方法
模塊結構分析
簡單分析Redis連接池的結構,可以先簡單規划為5個部分:
- 結構體定義
entity.go
- Redis連接和調用
redis_conn.go
- Redis數據類型解析
data_type.go
- 連接池實現
pool.go
共划分為上述四個部分
對象結構定義
為了實現連接池及Redis數據庫連接,我們需要如下結構:
- Redis服務器配置
RedisConfig
:包含Host、Port等信息 - Redis連接池配置
PoolConfig
:繼承RedisConfig
,包含PoolSize等信息 - Redis連接池結構:包含連接隊列、連接池配置等信息
- 單個Redis連接:包含TCP連接Handler、是否處於空閑標記位、當前使用的數據庫等信息
package redis4go
import (
"net"
"sync"
)
type RedisConfig struct {
Host string // RedisServer主機地址
Port int // RedisServer主機端口
Password string // RedisServer需要的Auth驗證,不填則為空
}
// 連接池的配置數據
type PoolConfig struct {
RedisConfig
PoolSize int // 連接池的大小
}
// 連接池結構
type Pool struct {
Config PoolConfig // 建立連接池時的配置
Queue chan *RedisConn // 連接池
Store map[*RedisConn]bool // 所有的連接
mu sync.Mutex // 加鎖
}
// 單個Redis連接的結構
type RedisConn struct {
mu sync.Mutex // 加鎖
p *Pool // 所屬的連接池
IsRelease bool // 是否處於釋放狀態
IsClose bool // 是否已關閉
TcpConn *net.TCPConn // 建立起的到RedisServer的連接
DBIndex int // 當前連接正在使用第幾個Redis數據庫
}
type RedisResp struct {
rType byte // 回復類型(+-:$*)
rData [][]byte // 從TCP連接中讀取的數據統一使用二維數組返回
}
根據之前的規划,定義好基本的結構之后,我們可以先實現一個簡單的Pool對象池
Redis連接
建立連接
首先我們需要實現一個建立Redis連接的方法
// 創建一個RedisConn對象
func createRedisConn(config RedisConfig) (*RedisConn, error) {
tcpAddr := &net.TCPAddr{IP: net.ParseIP(config.Host), Port: config.Port}
tcpConn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
return nil, err
}
return &RedisConn{
IsRelease: true,
IsClose: false,
TcpConn: tcpConn,
DBIndex: 0,
}, nil
}
實現連接池
在Go語言中,我們可以使用一個chan
來很輕易地實現一個指定容量的隊列,來作為連接池使用,當池中沒有連接時,申請獲取連接時將會被阻塞,直到放入新的連接。
package redis4go
func CreatePool(config PoolConfig) (*Pool, error) {
pool := &Pool{
Config: config,
Queue: make(chan *RedisConn, config.PoolSize),
Store: make(map[*RedisConn]bool, config.PoolSize),
}
for i := 0; i < config.PoolSize; i++ {
redisConn, err := createRedisConn(config.RedisConfig)
if err != nil {
// todo 處理之前已經創建好的鏈接
return nil, err
}
redisConn.p = pool
pool.Queue <- redisConn
pool.Store[redisConn] = true
}
return pool, nil
}
// 獲取一個連接
func (pool *Pool) getConn() *RedisConn {
pool.mu.Lock()
// todo 超時機制
conn := <-pool.Queue
conn.IsRelease = false
pool.mu.Unlock()
return conn
}
// 關閉連接池
func (pool *Pool) Close() {
for conn := range pool.Store {
err := conn.Close()
if err != nil {
// todo 處理連接關閉的錯誤?
}
}
}
發送命令&解析回復數據
下面是向RedisServer發送命令,以及讀取回復數據的簡單實現
func (conn *RedisConn) Call(params ...interface{}) (*RedisResp, error) {
reqData, err := mergeParams(params...)
if err != nil {
return nil, err
}
conn.Lock()
defer conn.Unlock()
_, err = conn.TcpConn.Write(reqData)
if err != nil {
return nil, err
}
resp, err := conn.getReply()
if err != nil {
return nil, err
}
if resp.rType == '-' {
return resp, resp.ParseError()
}
return resp, nil
}
func (conn *RedisConn) getReply() (*RedisResp, error) {
b := make([]byte, 1)
_, err := conn.TcpConn.Read(b)
if err != nil {
return nil, err
}
resp := new(RedisResp)
resp.rType = b[0]
switch b[0] {
case '+':
// 狀態回復
fallthrough
case '-':
// 錯誤回復
fallthrough
case ':':
// 整數回復
singleResp := make([]byte, 1)
for {
_, err := conn.TcpConn.Read(b)
if err != nil {
return nil, err
}
if b[0] != '\r' && b[0] != '\n' {
singleResp = append(singleResp, b[0])
}
if b[0] == '\n' {
break
}
}
resp.rData = append(resp.rData, singleResp)
case '$':
buck, err := conn.readBuck()
if err != nil {
return nil, err
}
resp.rData = append(resp.rData, buck)
case '*':
// 條目數量
itemNum := 0
for {
_, err := conn.TcpConn.Read(b)
if err != nil {
return nil, err
}
if b[0] == '\r' {
continue
}
if b[0] == '\n' {
break
}
itemNum = itemNum*10 + int(b[0]-'0')
}
for i := 0; i < itemNum; i++ {
buck, err := conn.readBuck()
if err != nil {
return nil, err
}
resp.rData = append(resp.rData, buck)
}
default:
return nil, errors.New("錯誤的服務器回復")
}
return resp, nil
}
func (conn *RedisConn) readBuck() ([]byte, error) {
b := make([]byte, 1)
dataLen := 0
for {
_, err := conn.TcpConn.Read(b)
if err != nil {
return nil, err
}
if b[0] == '$' {
continue
}
if b[0] == '\r' {
break
}
dataLen = dataLen*10 + int(b[0]-'0')
}
bf := bytes.Buffer{}
for i := 0; i < dataLen+3; i++ {
_, err := conn.TcpConn.Read(b)
if err != nil {
return nil, err
}
bf.Write(b)
}
return bf.Bytes()[1 : bf.Len()-2], nil
}
func mergeParams(params ...interface{}) ([]byte, error) {
count := len(params) // 參數數量
bf := bytes.Buffer{}
// 參數數量
{
bf.WriteString("*")
bf.WriteString(strconv.Itoa(count))
bf.Write([]byte{'\r', '\n'})
}
for _, p := range params {
bf.Write([]byte{'$'})
switch p.(type) {
case string:
str := p.(string)
bf.WriteString(strconv.Itoa(len(str)))
bf.Write([]byte{'\r', '\n'})
bf.WriteString(str)
break
case int:
str := strconv.Itoa(p.(int))
bf.WriteString(strconv.Itoa(len(str)))
bf.Write([]byte{'\r', '\n'})
bf.WriteString(str)
break
case nil:
bf.WriteString("-1")
break
default:
// 不支持的參數類型
return nil, errors.New("參數只能是String或Int")
}
bf.Write([]byte{'\r', '\n'})
}
return bf.Bytes(), nil
}
實現幾個常用數據類型的解析
package redis4go
import (
"errors"
"strconv"
)
func (resp *RedisResp) ParseError() error {
if resp.rType != '-' {
return nil
}
return errors.New(string(resp.rData[0]))
}
func (resp *RedisResp) ParseInt() (int, error) {
switch resp.rType {
case '-':
return 0, resp.ParseError()
case '$':
fallthrough
case ':':
str, err := resp.ParseString()
if err != nil {
return 0, err
}
return strconv.Atoi(str)
default:
return 0, errors.New("錯誤的回復類型")
}
}
func (resp *RedisResp) ParseString() (string, error) {
switch resp.rType {
case '-':
return "", resp.ParseError()
case '+':
fallthrough
case ':':
fallthrough
case '$':
return string(resp.rData[0]), nil
default:
return "", errors.New("錯誤的回復類型")
}
}
func (resp *RedisResp) ParseList() ([]string, error) {
switch resp.rType {
case '-':
return nil, resp.ParseError()
case '*':
list := make([]string, 0, len(resp.rData))
for _, data := range resp.rData {
list = append(list, string(data))
}
return list, nil
default:
return nil, errors.New("錯誤的回復類型")
}
}
func (resp *RedisResp) ParseMap() (map[string]string, error) {
switch resp.rType {
case '-':
return nil, resp.ParseError()
case '*':
mp := make(map[string]string)
for i := 0; i < len(resp.rData); i += 2 {
mp[string(resp.rData[i])] = string(resp.rData[i+1])
}
return mp, nil
default:
return nil, errors.New("錯誤的回復類型")
}
}
在開發的過程中,隨手編寫了幾個零零散散的測試文件,經測試,一些簡單的Redis命令以及能跑通了。
package redis4go
import (
"testing"
)
func getConn() (*RedisConn, error) {
pool, err := CreatePool(PoolConfig{
RedisConfig: RedisConfig{
Host: "127.0.0.1",
Port: 6379,
},
PoolSize: 10,
})
if err != nil {
return nil, err
}
conn := pool.getConn()
return conn, nil
}
func TestRedisResp_ParseString(t *testing.T) {
demoStr := string([]byte{'A', '\n', '\r', '\n', 'b', '1'})
conn, _ := getConn()
_, _ = conn.Call("del", "name")
_, _ = conn.Call("set", "name", demoStr)
resp, err := conn.Call("get", "name")
if err != nil {
t.Fatal("Call Error:", err.Error())
}
str, err := resp.ParseString()
if err != nil {
t.Fatal("Parse Error:", err.Error())
}
if str != demoStr {
t.Fatal("結果錯誤")
}
}
func TestRedisResp_ParseList(t *testing.T) {
conn, _ := getConn()
_, _ = conn.Call("del", "testList")
_, _ = conn.Call("lpush", "testList", 1, 2, 3, 4, 5)
res, err := conn.Call("lrange", "testList", 0, -1)
if err != nil {
t.Fatal("Call Error:", err.Error())
}
ls, err := res.ParseList()
if err != nil {
t.Fatal("Parse Error:", err.Error())
}
if len(ls) != 5 {
t.Fatal("結果錯誤")
}
}
func TestRedisResp_ParseMap(t *testing.T) {
conn, _ := getConn()
_, _ = conn.Call("del", "testMap")
_, err := conn.Call("hmset", "testMap", 1, 2, 3, 4, 5, 6)
if err != nil {
t.Fatal("設置Value失敗")
}
res, err := conn.Call("hgetall", "testMap")
if err != nil {
t.Fatal("Call Error:", err.Error())
}
ls, err := res.ParseMap()
if err != nil {
t.Fatal("Parse Error:", err.Error())
}
if len(ls) != 3 || ls["1"] != "2" {
t.Fatal("結果錯誤")
}
}
至此,已經算是達到了學習Go語言和學習Redis Protocol的目的,不過代碼中也有很多地方需要優化和完善,性能方面考慮的也並不周全。輪子就不重復造了,畢竟有很多功能完善的庫,從頭造一個輪子需要消耗的精力太多啦並且沒必要~
下一次我將會學習官方推薦的gomodule/redigo
源碼,並分享我的心得。
--The End--