兼容go redis cluster的pipeline批量


原文鏈接:兼容go redis cluster的pipeline批量

前言:

     redis cluster集群機制是不錯,但因為是smart client設計,沒有proxy中間層,導致很多redis批量命令在不同slot時不能適配,比如 mset、mget、pipeline等。 該篇文章講述了redis cluster multi key批量操作的一些解決方案,尤其是golang的場景下。  

    該文章后續仍在不斷的更新修改中, 請移步到原文地址 http://xiaorui.cc/?p=5557

 

老生常談的redis cluster概念

    Redis Cluster在設計中沒有使用一致性哈希(Consistency Hashing),而是使用數據分片(Sharding)引入哈希槽(hash slot)來實現。一個 Redis Cluster包含16384(0~16383)個哈希槽,存儲在Redis Cluster中的所有鍵都會被映射到這些slot中,集群中的每個鍵都屬於這16384個哈希槽中的一個,集群使用公式slot=CRC16 key/16384來計算key屬於哪個槽。

    集群中的每個主節點(Master)都負責處理16384個哈希槽中的一部分,當集群處於穩定狀態時,每個哈希槽都只由一個主節點進行處理,每個主節點可以有一個到N個從節點。當主節點出現宕機或網絡斷線等不可用時,從節點能自動提升為主節點進行處理。

問題

不同slot區間的key可能在不同的節點上,如果在一個節點上執行不屬於他slot區間的key會發生什么? 下面是可以會出現的error信息, redis cluster返回的錯誤很多,但核心異常信息不屬於一個slot。

// xiaorui.cc

all keys must map to the same key slot
ERR CROSSSLOT Keys in request don't hash to the same slot

bad lua script for redis cluster, all the keys that the script uses should be passed using the KEYS array
eval/evalsha command keys must in same slot

 

redis cluster是個smart client設計模式,客戶端會存有redis cluster的分片和節點關系的緩存。當你使用multi key的redis命令時 (mset、mget ),client通常會選擇參數的第一個key的slot的主機進行發送。如果參數的多個key在不同的slot,在不同的主機上,那么必然會出錯。 

 

 

解決方法

第一種:
最簡單的方法是在所有的執行key前面加入hashtag,讓他的key都在一個slot上。這樣你的程序和架構簡單了,帶來的問題是數據傾斜,后面可能內存受限、qps受限。
第二種:

使用proxy方案做適配redis cluster集群, 像codis。滴滴、12306都在大量使用該代理。

第三種:
在客戶端上實現命令的slot分組,然后分別並發處理。
 

第一種和第二種方法沒什么說的,具體說下客戶端怎么搞?先前有說過,客戶端會緩存slot及node主機的關系,我們在客戶端上根據參數keys做slot分離不就行了。像python的redis-py庫包已幫你做了redis cluster的適配,對於上層代碼無感知。golang其實也有這樣的庫,這里可以參考下 github.com/chasex/redis-go-cluster 代碼, 這個是基於gomodule/redigo封裝的redis集群客戶端庫。

 

redis-go-cluster處理入口, 劫持了MSET、MSETNX、MGET批量命令,根據slot分到不同的slice里,然后針對多個slice進行並發請求。

// xiaorui.cc

func (cluster *Cluster) Do(cmd string, args ...interface{}) (interface{}, error) {
    if len(args) < 1 {
    return nil, fmt.Errorf("Do: no key found in args")
    }

    if cmd == "MSET" || cmd == "MSETNX" {
    return cluster.multiSet(cmd, args...)
    }

    if cmd == "MGET" {
    return cluster.multiGet(cmd, args...)
    }
    ...
}


func (cluster *Cluster) multiSet(cmd string, args ...interface{}) (interface{}, error) {
    ...
    // 命令分組
    tasks := make([]*multiTask, 0)

    cluster.rwLock.RLock()
    for i := 0; i < len(args); i += 2 {
        key, err := key(args[i])
        if err != nil {
            cluster.rwLock.RUnlock()
            return nil, fmt.Errorf("multiSet: invalid key %v", args[i])
        }

        slot := hash(key)

        var j int
        for j = 0; j < len(tasks); j++ {
            // 相同的slot的key放在一個multiTask的slice里。
            if tasks[j].slot == slot {
                tasks[j].args = append(tasks[j].args, args[i])   // key
                tasks[j].args = append(tasks[j].args, args[i+1]) // value

                break
            }
        }

        if j == len(tasks) {
            node := cluster.slots[slot]
            if node == nil {
                cluster.rwLock.RUnlock()
                return nil, fmt.Errorf("multiSet: %s[%d] no node found", key, slot)
            }

            task := &multiTask{
                node: node,
                slot: slot,
                cmd:  cmd,
                args: []interface{}{args[i], args[i+1]},
                done: make(chan int),
            }
            tasks = append(tasks, task)
        }
    }
    cluster.rwLock.RUnlock()

    // 每個slot隊列為為一個並發
    for i := range tasks {
        go handleSetTask(tasks[i])
    }

    // 確定都執行完
    for i := range tasks {
        <-tasks[i].done
    }

    for i := range tasks {
        _, err := String(tasks[i].reply, tasks[i].err)
        if err != nil {
            return nil, err
        }
    }

    return "OK", nil
}

type multiTask struct {
    node *redisNode
    slot uint16

    cmd  string
    args []interface{}

    reply   interface{}
    replies []interface{}
    err     error

    done chan int
}

下面是redis cluster pipeline批量實現, 跟mset mget的實現差不都,都是通過算出key的slot放在不同的slice里面,繼而並發的使用pipeline。

// xiaorui.cc

// NewBatch create a new batch to pack mutiple commands.
func (cluster *Cluster) NewBatch() *Batch {
    return &Batch{
        cluster: cluster,
        batches: make([]nodeBatch, 0),
        index:   make([]int, 0),
    }
}

// Put add a redis command to batch, DO NOT put MGET/MSET/MSETNX.
func (batch *Batch) Put(cmd string, args ...interface{}) error {
    if len(args) < 1 {
        return fmt.Errorf("Put: no key found in args")
    }

    if cmd == "MGET" || cmd == "MSET" || cmd == "MSETNX" {
        return fmt.Errorf("Put: %s not supported", cmd)
    }

    node, err := batch.cluster.getNodeByKey(args[0])
    if err != nil {
        return fmt.Errorf("Put: %v", err)
    }

    var i int
    for i = 0; i < len(batch.batches); i++ {
        if batch.batches[i].node == node {
            batch.batches[i].cmds = append(batch.batches[i].cmds,
                nodeCommand{cmd: cmd, args: args})

            batch.index = append(batch.index, i)
            break
        }
    }
    ...
}

func (cluster *Cluster) RunBatch(bat *Batch) ([]interface{}, error) {
    for i := range bat.batches {
        go doBatch(&bat.batches[i])
    }

    for i := range bat.batches {
        <-bat.batches[i].done
    }

    var replies []interface{}
    for _, i := range bat.index {
        if bat.batches[i].err != nil {
            return nil, bat.batches[i].err
        }

        replies = append(replies, bat.batches[i].cmds[0].reply)
        bat.batches[i].cmds = bat.batches[i].cmds[1:]
    }

    return replies, nil
}

func doBatch(batch *nodeBatch) {
    conn, err := batch.node.getConn()
    ...

    for i := range batch.cmds {
        conn.send(batch.cmds[i].cmd, batch.cmds[i].args...)
    }

    err = conn.flush()
    ...

    for i := range batch.cmds {
        reply, err := conn.receive()
        if err != nil {
            batch.err = err
            conn.shutdown()
            batch.done <- 1
            return
        }

        batch.cmds[i].reply, batch.cmds[i].err = reply, err
    }

    ...
}

go-redis這個庫也實現了批量key的匹配,下面是go-redis pipieline的源碼。 https://github.com/go-redis

 

// xiaorui.cc

func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
        ...
	cmdsMap := c.mapCmdsBySlot(cmds)
	for slot, cmds := range cmdsMap {
		node, err := state.slotMasterNode(slot)
		if err != nil {
			setCmdsErr(cmds, err)
			continue
		}
		cmdsMap := map[*clusterNode][]Cmder{node: cmds}

		for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
			if attempt > 0 {
				time.Sleep(c.retryBackoff(attempt))
			}

			failedCmds := newCmdsMap()
			var wg sync.WaitGroup

			for node, cmds := range cmdsMap {
				wg.Add(1)
				go func(node *clusterNode, cmds []Cmder) {
					defer wg.Done()

					cn, err := node.Client.getConn()
					if err != nil {
						if err == pool.ErrClosed {
							c.mapCmdsByNode(cmds, failedCmds)
						} else {
							setCmdsErr(cmds, err)
						}
						return
					}

					err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
					node.Client.releaseConnStrict(cn, err)
				}(node, cmds)
			}

			wg.Wait()
			if len(failedCmds.m) == 0 {
				break
			}
			cmdsMap = failedCmds.m
		}
	}

	return cmdsFirstErr(cmds)
}

總結:

     推薦大家在使用redis cluster時也使用批量模式,這樣對於減少網絡IO延遲很有效果。pipeline還可以減少syscall消耗,畢竟數據合並了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM