先說結論。使用連接池的情況下,每一條Redis命令都將從連接池中獲得一個連接,執行完后隨即回收。因此在做切庫操作時,使用Pipline來必須保證前后幾條命令在同一個庫執行。
一,現象
某個微服務中,我們的Redis key 集中在11庫,因此連接池的默認庫為11。由於歷史原因,當需要獲取設備信息時,需要切換到1庫。
最初代碼如下:
單獨請求這個路由時,完全沒有任何問題。一切按照預期的執行。
但在並發的時候,一些原來在默認庫的操作未能取得正確結果,通過查看日志發現,Redis庫被切到1了,因此導致錯誤。
二,推演過程
既然被切庫了,一定是在某一時刻將切庫的后連接放回了資源池。
最初誤認為一個請求周期使用的是同一個連接池。
通過調試發現,Redis的每一個命令都會重新取得一個連接,執行后立即回收,而且回收到資源池的順序類似於堆。
問題重現:執行切庫到1,回收到資源池。當另一個使用默認庫的請求剛好拿到這個切換到1庫的連接,繼續執行11庫的操作,發生錯誤。
三,解決方案
定位到問題以后,我們要做的就是怎樣保持切庫前后的操作都使用同一個連接,Redis提供的Pipline剛好可以完成這樣的操作。
改造后的代碼如下:
rdb := models.RedisCon
pipe := rdb.Pipeline()
k := "device:" + udid
pipe.Do("select", 1)
_, _ = pipe.Get(k).Result()
pipe.Do("select", 11)
cmders, err := pipe.Exec()
如果需要獲取執行后的結果,還需要解析
strMap := redis.GetCmdResult(cmders)
did, _ = strMap[1].(string)
func GetCmdResult(cmders []redis.Cmder) map[int]interface{}
func GetCmdResult(cmders []redis.Cmder) map[int]interface{} {
strMap := make(map[int]interface{}, len(cmders))
for idx, cmder := range cmders {
//*ClusterSlotsCmd 未實現
switch reflect.TypeOf(cmder).String() {
case "*redis.Cmd":
cmd := cmder.(*redis.Cmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.StringCmd":
cmd := cmder.(*redis.StringCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.SliceCmd":
cmd := cmder.(*redis.SliceCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.StringSliceCmd":
cmd := cmder.(*redis.StringSliceCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.StringStringMapCmd":
cmd := cmder.(*redis.StringStringMapCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.StringIntMapCmd":
cmd := cmder.(*redis.StringIntMapCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.BoolCmd":
cmd := cmder.(*redis.BoolCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.BoolSliceCmd":
cmd := cmder.(*redis.BoolSliceCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.IntCmd":
cmd := cmder.(*redis.IntCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.FloatCmd":
cmd := cmder.(*redis.FloatCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.StatusCmd":
cmd := cmder.(*redis.StatusCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.TimeCmd":
cmd := cmder.(*redis.TimeCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.DurationCmd":
cmd := cmder.(*redis.DurationCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.StringStructMapCmd":
cmd := cmder.(*redis.StringStructMapCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.XMessageSliceCmd":
cmd := cmder.(*redis.XMessageSliceCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.XStreamSliceCmd":
cmd := cmder.(*redis.XStreamSliceCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.XPendingCmd":
cmd := cmder.(*redis.XPendingCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.XPendingExtCmd":
cmd := cmder.(*redis.XPendingExtCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.ZSliceCmd":
cmd := cmder.(*redis.ZSliceCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.ZWithKeyCmd":
cmd := cmder.(*redis.ZWithKeyCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.CommandsInfoCmd":
cmd := cmder.(*redis.CommandsInfoCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.GeoLocationCmd":
cmd := cmder.(*redis.GeoLocationCmd)
strMap[idx], _ = cmd.Result()
break
case "*redis.GeoPosCmd":
cmd := cmder.(*redis.GeoPosCmd)
strMap[idx], _ = cmd.Result()
break
}
}
return strMap
}
更新,用這種新的取值方式更方便