Golang 實現 Redis(6): 實現 pipeline 模式的 redis 客戶端


本文是使用 golang 實現 redis 系列的第六篇, 將介紹如何實現一個 Pipeline 模式的 Redis 客戶端。

本文的完整代碼在github.com/hdt3213/godis/redis/client

通常 TCP 客戶端的通信模式都是阻塞式的: 客戶端發送請求 -> 等待服務端響應 -> 發送下一個請求。因為需要等待網絡傳輸數據,完成一次請求循環需要等待較多時間。

我們能否不等待服務端響應直接發送下一條請求呢?答案是肯定的。

TCP 作為全雙工協議可以同時進行上行和下行通信,不必擔心客戶端和服務端同時發包會導致沖突。

p.s. 打電話的時候兩個人同時講話就會沖突聽不清,只能輪流講。這種通信方式稱為半雙工。廣播只能由電台發送到收音機不能反向傳輸,這種方式稱為單工。

我們為每一個 tcp 連接分配了一個 goroutine 可以保證先收到的請求先先回復。另一個方面,tcp 協議會保證數據流的有序性,同一個 tcp 連接上先發送的請求服務端先接收,先回復的響應客戶端先收到。因此我們不必擔心混淆響應所對應的請求。

這種在服務端未響應時客戶端繼續向服務端發送請求的模式稱為 Pipeline 模式。因為減少等待網絡傳輸的時間,Pipeline 模式可以極大的提高吞吐量,減少所需使用的 tcp 鏈接數。

pipeline 模式的 redis 客戶端需要有兩個后台協程程負責 tcp 通信,調用方通過 channel 向后台協程發送指令,並阻塞等待直到收到響應,這是一個典型的異步編程模式。

我們先來定義 client 的結構:

type Client struct {
    conn        net.Conn // 與服務端的 tcp 連接
    pendingReqs chan *Request // 等待發送的請求
    waitingReqs chan *Request // 等待服務器響應的請求
    ticker      *time.Ticker // 用於觸發心跳包的計時器
    addr        string

    ctx        context.Context
    cancelFunc context.CancelFunc
    writing    *sync.WaitGroup // 有請求正在處理不能立即停止,用於實現 graceful shutdown
}

type Request struct {
    id        uint64 // 請求id
    args      [][]byte // 上行參數
    reply     redis.Reply // 收到的返回值
    heartbeat bool // 標記是否是心跳請求
    waiting   *wait.Wait // 調用協程發送請求后通過 waitgroup 等待請求異步處理完成
    err       error
}

調用者將請求發送給后台協程,並通過 wait group 等待異步處理完成:

func (client *Client) Send(args [][]byte) redis.Reply {
	request := &request{
		args:      args,
		heartbeat: false,
		waiting:   &wait.Wait{},
	}
	request.waiting.Add(1)
	client.working.Add(1)
	defer client.working.Done()
	client.pendingReqs <- request // 請求入隊
	timeout := request.waiting.WaitWithTimeout(maxWait) // 等待響應或者超時
	if timeout {
		return reply.MakeErrReply("server time out")
	}
	if request.err != nil {
		return reply.MakeErrReply("request failed")
	}
	return request.reply
}

client 的核心部分是后台的讀寫協程。先從寫協程開始:

// 寫協程入口
func (client *Client) handleWrite() {
	for req := range client.pendingReqs {
		client.doRequest(req)
	}
}

// 發送請求
func (client *Client) doRequest(req *request) {
	if req == nil || len(req.args) == 0 {
		return
	}
    // 序列化請求
	re := reply.MakeMultiBulkReply(req.args)
	bytes := re.ToBytes()
	_, err := client.conn.Write(bytes)
	i := 0
    // 失敗重試
	for err != nil && i < 3 {
		err = client.handleConnectionError(err)
		if err == nil {
			_, err = client.conn.Write(bytes)
		}
		i++
	}
	if err == nil {
        // 發送成功等待服務器響應
		client.waitingReqs <- req
	} else {
		req.err = err
		req.waiting.Done()
	}
}

讀協程是我們熟悉的協議解析器模板, 不熟悉的朋友可以到實現 Redis 協議解析器了解更多。

// 收到服務端的響應
func (client *Client) finishRequest(reply redis.Reply) {
	defer func() {
		if err := recover(); err != nil {
			debug.PrintStack()
			logger.Error(err)
		}
	}()
	request := <-client.waitingReqs
	if request == nil {
		return
	}
	request.reply = reply
	if request.waiting != nil {
		request.waiting.Done()
	}
}

// 讀協程是個 RESP 協議解析器
func (client *Client) handleRead() error {
	ch := parser.ParseStream(client.conn)
	for payload := range ch {
		if payload.Err != nil {
			client.finishRequest(reply.MakeErrReply(payload.Err.Error()))
			continue
		}
		client.finishRequest(payload.Data)
	}
	return nil
}

最后編寫 client 的構造器和啟動異步協程的代碼:

func MakeClient(addr string) (*Client, error) {
    conn, err := net.Dial("tcp", addr)
    if err != nil {
        return nil, err
    }
    ctx, cancel := context.WithCancel(context.Background())
    return &Client{
        addr:        addr,
        conn:        conn,
        sendingReqs: make(chan *Request, chanSize),
        waitingReqs: make(chan *Request, chanSize),
        ctx:         ctx,
        cancelFunc:  cancel,
        writing:     &sync.WaitGroup{},
    }, nil
}

func (client *Client) Start() {
    client.ticker = time.NewTicker(10 * time.Second)
    go client.handleWrite()
    go func() {
        err := client.handleRead()
        logger.Warn(err)
    }()
    go client.heartbeat()
}

關閉 client 的時候記得等待請求完成:

func (client *Client) Close() {
    // 先阻止新請求進入隊列
    close(client.sendingReqs)

    // 等待處理中的請求完成
    client.writing.Wait()

    // 釋放資源
    _ = client.conn.Close() // 關閉與服務端的連接,連接關閉后讀協程會退出
    client.cancelFunc() // 使用 context 關閉讀協程
    close(client.waitingReqs) // 關閉隊列
}

測試一下:

func TestClient(t *testing.T) {
    client, err := MakeClient("localhost:6379")
    if err != nil {
        t.Error(err)
    }
    client.Start()

    result = client.Send([][]byte{
        []byte("SET"),
        []byte("a"),
        []byte("a"),
    })
    if statusRet, ok := result.(*reply.StatusReply); ok {
        if statusRet.Status != "OK" {
            t.Error("`set` failed, result: " + statusRet.Status)
        }
    }

    result = client.Send([][]byte{
        []byte("GET"),
        []byte("a"),
    })
    if bulkRet, ok := result.(*reply.BulkReply); ok {
        if string(bulkRet.Arg) != "a" {
            t.Error("`get` failed, result: " + string(bulkRet.Arg))
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM