原文鏈接:Redis實現消息隊列的方案
Redis作為內存中的數據結構存儲,常用作數據庫、緩存和消息代理。它支持數據結構,如 字符串,散列,列表,集合,帶有范圍查詢的排序集(sorted sets),位圖(bitmaps),超級日志(hyperloglogs),具有半徑查詢和流的地理空間索引。Redis具有內置復制,Lua腳本,LRU驅逐,事務和不同級別的磁盤持久性,並通過Redis Sentinel和Redis Cluster自動分區。
為了實現其出色的性能,Redis使用內存數據集(in-memory dataset)。
MQ應用有很多,比如ActiveMQ,RabbitMQ,Kafka等,但是也可以基於redis來實現,可以降低系統的維護成本和實現復雜度,本篇介紹redis中實現消息隊列的幾種方案。
- 基於List的 LPUSH+BRPOP 的實現
- PUB/SUB,訂閱/發布模式
- 基於Sorted-Set的實現
- 基於Stream類型的實現
基於異步消息隊列List lpush-brpop(rpush-blpop)
使用rpush和lpush操作入隊列,lpop和rpop操作出隊列。
List支持多個生產者和消費者並發進出消息,每個消費者拿到都是不同的列表元素。
但是當隊列為空時,lpop和rpop會一直空輪訓,消耗資源;所以引入阻塞讀blpop和brpop(b代表blocking),阻塞讀在隊列沒有數據的時候進入休眠狀態,
一旦數據到來則立刻醒過來,消息延遲幾乎為零。
注意
你以為上面的方案很完美?還有個問題需要解決:空閑連接的問題。
如果線程一直阻塞在那里,Redis客戶端的連接就成了閑置連接,閑置過久,服務器一般會主動斷開連接,減少閑置資源占用,這個時候blpop和brpop或拋出異常,
所以在編寫客戶端消費者的時候要小心,如果捕獲到異常,還有重試。
缺點:
- 做消費者確認ACK麻煩,不能保證消費者消費消息后是否成功處理的問題(宕機或處理異常等),通常需要維護一個Pending列表,保證消息處理確認。
- 不能做廣播模式,如pub/sub,消息發布/訂閱模型
- 不能重復消費,一旦消費就會被刪除
- 不支持分組消費
如何實現:Redis應用-異步消息隊列與延時隊列
PUB/SUB,訂閱/發布模式
SUBSCRIBE,用於訂閱信道
PUBLISH,向信道發送消息
UNSUBSCRIBE,取消訂閱
此模式允許生產者只生產一次消息,由中間件負責將消息復制到多個消息隊列,每個消息隊列由對應的消費組消費。
優點
典型的廣播模式,一個消息可以發布到多個消費者
多信道訂閱,消費者可以同時訂閱多個信道,從而接收多類消息
消息即時發送,消息不用等待消費者讀取,消費者會自動接收到信道發布的消息
缺點
消息一旦發布,不能接收。換句話就是發布時若客戶端不在線,則消息丟失,不能尋回
不能保證每個消費者接收的時間是一致的
若消費者客戶端出現消息積壓,到一定程度,會被強制斷開,導致消息意外丟失。通常發生在消息的生產遠大於消費速度時
可見,Pub/Sub 模式不適合做消息存儲,消息積壓類的業務,而是擅長處理廣播,即時通訊,即時反饋的業務。
基於Sorted-Set的實現
Sortes Set(有序列表),類似於java的SortedSet和HashMap的結合體,一方面她是一個set,保證內部value的唯一性,另一方面它可以給每個value賦予一個score,代表這個value的
排序權重。內部實現是“跳躍表”。
有序集合的方案是在自己確定消息順ID時比較常用,使用集合成員的Score來作為消息ID,保證順序,還可以保證消息ID的單調遞增。通常可以使用時間戳+序號的方案。確保了消息ID的單調遞增,利用SortedSet的依據
Score排序的特征,就可以制作一個有序的消息隊列了。
優點
就是可以自定義消息ID,在消息ID有意義時,比較重要。
缺點
缺點也明顯,不允許重復消息(因為是集合),同時消息ID確定有錯誤會導致消息的順序出錯。
基於Stream類型的實現

Stream為redis 5.0后新增的數據結構。支持多播的可持久化消息隊列,實現借鑒了Kafka設計。
Redis Stream的結構如上圖所示,它有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的ID和對應的內容。消息是持久化的,Redis重啟后,內容還在。
每個Stream都有唯一的名稱,它就是Redis的key,在我們首次使用xadd指令追加消息時自動創建。
每個Stream都可以掛多個消費組,每個消費組會有個游標last_delivered_id在Stream數組之上往前移動,表示當前消費組已經消費到哪條消息了。每個消費組都有一個Stream內唯一的名稱,消費組不會自動創建,它需要單獨的指令xgroup create進行創建,需要指定從Stream的某個消息ID開始消費,這個ID用來初始化last_delivered_id變量。
每個消費組(Consumer Group)的狀態都是獨立的,相互不受影響。也就是說同一份Stream內部的消息會被每個消費組都消費到。
同一個消費組(Consumer Group)可以掛接多個消費者(Consumer),這些消費者之間是競爭關系,任意一個消費者讀取了消息都會使游標last_delivered_id往前移動。每個消費者者有一個組內唯一名稱。
消費者(Consumer)內部會有個狀態變量pending_ids,它記錄了當前已經被客戶端讀取的消息,但是還沒有ack。如果客戶端沒有ack,這個變量里面的消息ID會越來越多,一旦某個消息被ack,它就開始減少。這個pending_ids變量在Redis官方被稱之為PEL,也就是Pending Entries List,這是一個很核心的數據結構,它用來確保客戶端至少消費了消息一次,而不會在網絡傳輸的中途丟失了沒處理。
增刪改查
- xadd 追加消息
- xdel 刪除消息,這里的刪除僅僅是設置了標志位,不影響消息總長度
- xrange 獲取消息列表,會自動過濾已經刪除的消息
- xlen 消息長度
- del 刪除Stream
獨立消費
我們可以在不定義消費組的情況下進行Stream消息的獨立消費,當Stream沒有新消息時,甚至可以阻塞等待。Redis設計了一個單獨的消費指令xread,可以將Stream當成普通的消息隊列(list)來使用。使用xread時,我們可以完全忽略消費組(Consumer Group)的存在,就好比Stream就是一個普通的列表(list)。
創建消費組
Stream通過xgroup create指令創建消費組(Consumer Group),需要傳遞起始消息ID參數用來初始化last_delivered_id變量。
消費
Stream提供了xreadgroup指令可以進行消費組的組內消費,需要提供消費組名稱、消費者名稱和起始消息ID。它同xread一樣,也可以阻塞等待新消息。讀到新消息后,對應的消息ID就會進入消費者的PEL(正在處理的消息)結構里,客戶端處理完畢后使用xack指令通知服務器,本條消息已經處理完畢,該消息ID就會從PEL中移除。
Stream消息太多怎么辦

讀者很容易想到,要是消息積累太多,Stream的鏈表豈不是很長,內容會不會爆掉就是個問題了。xdel指令又不會刪除消息,它只是給消息做了個標志位。
Redis自然考慮到了這一點,所以它提供了一個定長Stream功能。在xadd的指令提供一個定長長度maxlen,就可以將老的消息干掉,確保最多不超過指定長度。
127.0.0.1:6379> xlen codehole
(integer) 5
127.0.0.1:6379> xadd codehole maxlen 3 * name xiaorui age 1
1527855160273-0
127.0.0.1:6379> xlen codehole
(integer) 3
我們看到Stream的長度被砍掉了。
消息如果忘記ACK會怎樣?
Stream在每個消費者結構中保存了正在處理中的消息ID列表PEL,如果消費者收到了消息處理完了但是沒有回復ack,就會導致PEL列表不斷增長,如果有很多消費組的話,那么這個PEL占用的內存就會放大。
PEL如何避免消息丟失?
在客戶端消費者讀取Stream消息時,Redis服務器將消息回復給客戶端的過程中,客戶端突然斷開了連接,消息就丟失了。但是PEL里已經保存了發出去的消息ID。待客戶端重新連上之后,可以再次收到PEL中的消息ID列表。不過此時xreadgroup的起始消息必須是任意有效的消息ID,一般將參數設為0-0,表示讀取所有的PEL消息以及自last_delivered_id之后的新消息。
分區Partition
Redis沒有原生支持分區的能力,想要使用分區,需要分配多個Stream,然后在客戶端使用一定的策略來講消息放入不同的stream。
結論
Stream的消費模型借鑒了kafka的消費分組的概念,它彌補了Redis Pub/Sub不能持久化消息的缺陷。但是它又不同於kafka,kafka的消息可以分partition,而Stream不行。如果非要分parition的話,得在客戶端做,提供不同的Stream名稱,對消息進行hash取模來選擇往哪個Stream里塞。如果讀者稍微研究過Redis作者的另一個開源項目Disque的話,這極可能是作者意識到Disque項目的活躍程度不夠,所以將Disque的內容移植到了Redis里面。這只是本人的猜測,未必是作者的初衷。如果讀者有什么不同的想法,可以在評論區一起參與討論。
參考文章: