玩轉redis-延時消息隊列


上一篇基於redis的list實現了一個簡單的消息隊列:玩轉redis-簡單消息隊列

源碼地址 使用demo

產品經理經常說的一句話,我們不光要有X功能,還要Y功能,這樣客戶才能更滿意。同樣的,只有簡單消息隊列是不夠的,還要有延時消息隊列才能算是一個完整的消息隊列。

看看redis的命令,放眼望去,的有序集合(sorted set)就是一個很好用的命令,完全可以用他做一個延時消息隊列

redis有序集合(sorted set)

redis有序集合,每個元素都會關聯一個double類型的分數。redis正是通過分數來為集合中的成員進行從小到大的排序。
有序集合的成員是唯一的,但分數(score)卻可以重復。

簡單操作

添加數據

127.0.0.1:6379> ZADD testSet1 5 a (integer) 1 127.0.0.1:6379> ZADD testSet1 1 b 8 c 7 d (integer) 3 

讀取

127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 3 1) "b" 127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 5 1) "b" 2) "a" 

也可以把score打出來

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf 5 WITHSCORES 1) "b" 2) "1" 3) "a" 4) "5" 

查出所有的數據

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf inf 1) "b" 2) "a" 3) "d" 4) "c" 

刪除數據

ZREMRANGEBYSCORE testSet1 0 2 

延時隊列的實現思路

總體的思路很簡單,就是每一個valuescore保存的是時間,也就是說,在添加一個元素時他的score是當前時間+延時的時間。輪循獲取數據時,查找小於或等於當前時間的數據項,就是具體的延時消息。

還有一個問題,就是ZRANGEBYSCORElistpop不同,pop是取出元素並且會把元素在list中刪除。ZRANGEBYSCORE只會取出數據不會把數據從sorted set中刪除。解決方法1,利用redis事務,先ZRANGEBYSCORE取出數據,然后再用ZREMRANGEBYSCORE 把數據刪除。

具體實現-code

添加延時消息,參數delay就是我們要延時多久:

func (p *Producer) PublishDelayMsg(topicName string, body []byte, delay time.Duration) error { if delay <= 0 { return errors.New("delay need great than zero") } tm := time.Now().Add(delay) msg := NewMessage("", body) msg.DelayTime = tm.Unix() sendData, _ := json.Marshal(msg) return p.redisCmd.ZAdd(topicName+zsetSuffix, redis.Z{Score: float64(tm.Unix()), Member: string(sendData)}).Err() } 

使用,比如我們想過1秒再處理

producer.PublishDelayMsg(topicName, body, time.Second) 

讀取消息並處理
這就比較簡單了,就是在一個ticker里循環讀取小於或等於當前時間的數據:

func (s *consumer) startGetDelayMessage() { go func() { ticker := time.NewTicker(s.options.RateLimitPeriod) defer func() { log.Println("stop get delay message.") ticker.Stop() }() topicName := s.topicName + zsetSuffix for { currentTime := time.Now().Unix() select { case <-s.ctx.Done(): log.Printf("context Done msg: %#v \n", s.ctx.Err()) return case <-ticker.C: var valuesCmd *redis.ZSliceCmd _, err := s.redisCmd.TxPipelined(func(pip redis.Pipeliner) error { valuesCmd = pip.ZRangeWithScores(topicName, 0, currentTime) pip.ZRemRangeByScore(topicName, "0", strconv.FormatInt(currentTime, 10)) return nil }) if err != nil { log.Printf("zset pip error: %#v \n", err) continue } rev := valuesCmd.Val() for _, revBody := range rev { msg := &Message{} json.Unmarshal([]byte(revBody.Member.(string)), msg) if s.handler != nil { s.handler.HandleMessage(msg) } } } } }() }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM