一、從最簡單的開始:List 隊列(LPUSH+BRPOP實現)
首先,我們先從最簡單的場景開始講起。
如果你的業務需求足夠簡單,想把 Redis 當作隊列來使用,肯定最先想到的就是使用 List 這個數據類型。
因為 List 底層的實現就是一個「鏈表」,在頭部和尾部操作元素,時間復雜度都是 O(1),這意味着它非常符合消息隊列的模型。
如果把 List 當作隊列,你可以這么來用。
生產者使用 LPUSH 發布消息:
127.0.0.1:6379> LPUSH queue msg1 (integer) 1 127.0.0.1:6379> LPUSH queue msg2 (integer) 2
消費者這一側,使用 RPOP 拉取消息:
127.0.0.1:6379> RPOP queue "msg1" 127.0.0.1:6379> RPOP queue "msg2"
這個模型非常簡單,也很容易理解。
但這里有個小問題,當隊列中已經沒有消息了,消費者在執行 RPOP 時,會返回 NULL。
127.0.0.1:6379> RPOP queue (nil) // 沒消息了
而我們在編寫消費者邏輯時,一般是一個「死循環」,這個邏輯需要不斷地從隊列中拉取消息進行處理,偽代碼一般會這么寫:
while true: msg = redis.rpop("queue") // 沒有消息,繼續循環 if msg == null: continue // 處理消息 handle(msg)
如果此時隊列為空,那消費者依舊會頻繁拉取消息,這會造成「CPU 空轉」,不僅浪費 CPU 資源,還會對 Redis 造成壓力。
怎么解決這個問題呢?
也很簡單,當隊列為空時,我們可以「休眠」一會,再去嘗試拉取消息。代碼可以修改成這樣:
while true: msg = redis.rpop("queue") // 沒有消息,休眠2s if msg == null: sleep(2) continue // 處理消息 handle(msg)
這就解決了 CPU 空轉問題。
這個問題雖然解決了,但又帶來另外一個問題:當消費者在休眠等待時,有新消息來了,那消費者處理新消息就會存在「延遲」。
假設設置的休眠時間是 2s,那新消息最多存在 2s 的延遲。
要想縮短這個延遲,只能減小休眠的時間。但休眠時間越小,又有可能引發 CPU 空轉問題。
魚和熊掌不可兼得。
那如何做,既能及時處理新消息,還能避免 CPU 空轉呢?
Redis 是否存在這樣一種機制:如果隊列為空,消費者在拉取消息時就「阻塞等待」,一旦有新消息過來,就通知我的消費者立即處理新消息呢?
幸運的是,Redis 確實提供了「阻塞式」拉取消息的命令:BRPOP / BLPOP,這里的 B 指的是阻塞(Block)。
現在,你可以這樣來拉取消息了:
while true: // 沒消息阻塞等待,0表示不設置超時時間 msg = redis.brpop("queue", 0) if msg == null: continue // 處理消息 handle(msg)
使用 BRPOP 這種阻塞式方式拉取消息時,還支持傳入一個「超時時間」,如果設置為 0,則表示不設置超時,直到有新消息才返回,否則會在指定的超時時間后返回 NULL。
這個方案不錯,既兼顧了效率,還避免了 CPU 空轉問題,一舉兩得。
注意:如果設置的超時時間太長,這個連接太久沒有活躍過,可能會被 Redis Server 判定為無效連接,之后 Redis Server 會強制把這個客戶端踢下線。所以,采用這種方案,客戶端要有重連機制。
解決了消息處理不及時的問題,你可以再思考一下,這種隊列模型,有什么缺點?
我們一起來分析一下:
- 不支持重復消費:消費者拉取消息后,這條消息就從 List 中刪除了,無法被其它消費者再次消費,即不支持多個消費者消費同一批數據
- 消息丟失:消費者拉取到消息后,如果發生異常宕機,那這條消息就丟失了
第一個問題是功能上的,使用 List 做消息隊列,它僅僅支持最簡單的,一組生產者對應一組消費者,不能滿足多組生產者和消費者的業務場景。
第二個問題就比較棘手了,因為從 List 中 POP 一條消息出來后,這條消息就會立即從鏈表中刪除了。也就是說,無論消費者是否處理成功,這條消息都沒辦法再次消費了。
這也意味着,如果消費者在處理消息時異常宕機,那這條消息就相當於丟失了。
針對這 2 個問題怎么解決呢?我們一個個來看。
二、發布/訂閱模型:Pub/Sub
從名字就能看出來,這個模塊是 Redis 專門是針對「發布/訂閱」這種隊列模型設計的。
它正好可以解決前面提到的第一個問題:重復消費。
即多組生產者、消費者的場景,我們來看它是如何做的。
Redis 提供了 PUBLISH / SUBSCRIBE 命令,來完成發布、訂閱的操作。
假設你想開啟 2 個消費者,同時消費同一批數據,就可以按照以下方式來實現。
首先,使用 SUBSCRIBE 命令,啟動 2 個消費者,並「訂閱」同一個隊列。
// 2個消費者 都訂閱一個隊列 127.0.0.1:6379> SUBSCRIBE queue Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "queue" 3) (integer) 1
此時,2 個消費者都會被阻塞住,等待新消息的到來。
之后,再啟動一個生產者,發布一條消息。
127.0.0.1:6379> PUBLISH queue msg1
(integer) 1
這時,2 個消費者就會解除阻塞,收到生產者發來的新消息。
127.0.0.1:6379> SUBSCRIBE queue // 收到新消息 1) "message" 2) "queue" 3) "msg1"
看到了么,使用 Pub/Sub 這種方案,既支持阻塞式拉取消息,還很好地滿足了多組消費者,消費同一批數據的業務需求。
除此之外,Pub/Sub 還提供了「匹配訂閱」模式,允許消費者根據一定規則,訂閱「多個」自己感興趣的隊列。
// 訂閱符合規則的隊列 127.0.0.1:6379> PSUBSCRIBE queue.* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "queue.*" 3) (integer) 1
這里的消費者,訂閱了 queue.* 相關的隊列消息。
之后,生產者分別向 queue.p1 和 queue.p2 發布消息。
127.0.0.1:6379> PUBLISH queue.p1 msg1 (integer) 1 127.0.0.1:6379> PUBLISH queue.p2 msg2 (integer) 1
這時再看消費者,它就可以接收到這 2 個生產者的消息了。
127.0.0.1:6379> PSUBSCRIBE queue.* Reading messages... (press Ctrl-C to quit) ... // 來自queue.p1的消息 1) "pmessage" 2) "queue.*" 3) "queue.p1" 4) "msg1" // 來自queue.p2的消息 1) "pmessage" 2) "queue.*" 3) "queue.p2" 4) "msg2"
我們可以看到,Pub/Sub 最大的優勢就是,支持多組生產者、消費者處理消息。
講完了它的優點,那它有什么缺點呢?
其實,Pub/Sub 最大問題是:丟數據。
如果發生以下場景,就有可能導致數據丟失:
- 消費者下線
- Redis 宕機
- 消息堆積
究竟是怎么回事?
這其實與 Pub/Sub 的實現方式有很大關系。
Pub/Sub 在實現時非常簡單,它沒有基於任何數據類型,也沒有做任何的數據存儲,它只是單純地為生產者、消費者建立「數據轉發通道」,把符合規則的數據,從一端轉發到另一端。
一個完整的發布、訂閱消息處理流程是這樣的:
- 消費者訂閱指定隊列,Redis 就會記錄一個映射關系:隊列->消費者
- 生產者向這個隊列發布消息,那 Redis 就從映射關系中找出對應的消費者,把消息轉發給它
看到了么,整個過程中,沒有任何的數據存儲,一切都是實時轉發的。
這種設計方案,就導致了上面提到的那些問題。
例如,如果一個消費者異常掛掉了,它再重新上線后,只能接收新的消息,在下線期間生產者發布的消息,因為找不到消費者,都會被丟棄掉。
如果所有消費者都下線了,那生產者發布的消息,因為找不到任何一個消費者,也會全部「丟棄」。
所以,當你在使用 Pub/Sub 時,一定要注意:消費者必須先訂閱隊列,生產者才能發布消息,否則消息會丟失。
這也是前面講例子時,我們讓消費者先訂閱隊列,之后才讓生產者發布消息的原因。
另外,因為 Pub/Sub 沒有基於任何數據類型實現,所以它也不具備「數據持久化」的能力。
也就是說,Pub/Sub 的相關操作,不會寫入到 RDB 和 AOF 中,當 Redis 宕機重啟,Pub/Sub 的數據也會全部丟失。
最后,我們來看 Pub/Sub 在處理「消息積壓」時,為什么也會丟數據?
當消費者的速度,跟不上生產者時,就會導致數據積壓的情況發生。
如果采用 List 當作隊列,消息積壓時,會導致這個鏈表很長,最直接的影響就是,Redis 內存會持續增長,直到消費者把所有數據都從鏈表中取出。
但 Pub/Sub 的處理方式卻不一樣,當消息積壓時,有可能會導致消費失敗和消息丟失!
這是怎么回事?
還是回到 Pub/Sub 的實現細節上來說。
每個消費者訂閱一個隊列時,Redis 都會在 Server 上給這個消費者在分配一個「緩沖區」,這個緩沖區其實就是一塊內存。
當生產者發布消息時,Redis 先把消息寫到對應消費者的緩沖區中。
之后,消費者不斷地從緩沖區讀取消息,處理消息。
但是,問題就出在這個緩沖區上。
因為這個緩沖區其實是有「上限」的(可配置),如果消費者拉取消息很慢,就會造成生產者發布到緩沖區的消息開始積壓,緩沖區內存持續增長。
如果超過了緩沖區配置的上限,此時,Redis 就會「強制」把這個消費者踢下線。
這時消費者就會消費失敗,也會丟失數據。
如果你有看過 Redis 的配置文件,可以看到這個緩沖區的默認配置:client-output-buffer-limit pubsub 32mb 8mb 60。
它的參數含義如下:
- 32mb:緩沖區一旦超過 32MB,Redis 直接強制把消費者踢下線
- 8mb + 60:緩沖區超過 8MB,並且持續 60 秒,Redis 也會把消費者踢下線
Pub/Sub 的這一點特點,是與 List 作隊列差異比較大的。
從這里你應該可以看出,List 其實是屬於「拉」模型,而 Pub/Sub 其實屬於「推」模型。
List 中的數據可以一直積壓在內存中,消費者什么時候來「拉」都可以。
但 Pub/Sub 是把消息先「推」到消費者在 Redis Server 上的緩沖區中,然后等消費者再來取。
當生產、消費速度不匹配時,就會導致緩沖區的內存開始膨脹,Redis 為了控制緩沖區的上限,所以就有了上面講到的,強制把消費者踢下線的機制。
好了,現在我們總結一下 Pub/Sub 的優缺點:
- 支持發布 / 訂閱,支持多組生產者、消費者處理消息
- 消費者下線,數據會丟失
- 不支持數據持久化,Redis 宕機,數據也會丟失
- 消息堆積,緩沖區溢出,消費者會被強制踢下線,數據也會丟失
有沒有發現,除了第一個是優點之外,剩下的都是缺點。
所以,很多人看到 Pub/Sub 的特點后,覺得這個功能很「雞肋」。
也正是以上原因,Pub/Sub 在實際的應用場景中用得並不多。
目前只有哨兵集群和 Redis 實例通信時,采用了 Pub/Sub 的方案,因為哨兵正好符合即時通訊的業務場景。
我們再來看一下,Pub/Sub 有沒有解決,消息處理時異常宕機,無法再次消費的問題呢?
其實也不行,Pub/Sub 從緩沖區取走數據之后,數據就從 Redis 緩沖區刪除了,消費者發生異常,自然也無法再次重新消費。
好,現在我們重新梳理一下,我們在使用消息隊列時的需求。
當我們在使用一個消息隊列時,希望它的功能如下:
- 支持阻塞等待拉取消息
- 支持發布 / 訂閱模式
- 消費失敗,可重新消費,消息不丟失
- 實例宕機,消息不丟失,數據可持久化
- 消息可堆積
Redis 除了 List 和 Pub/Sub 之外,還有符合這些要求的數據類型嗎?
其實,Redis 的作者也看到了以上這些問題,也一直在朝着這些方向努力着。
Redis 作者在開發 Redis 期間,還另外開發了一個開源項目 disque。
這個項目的定位,就是一個基於內存的分布式消息隊列中間件。
但由於種種原因,這個項目一直不溫不火。
終於,在 Redis 5.0 版本,作者把 disque 功能移植到了 Redis 中,並給它定義了一個新的數據類型:Stream。
下面我們就來看看,它能符合上面提到的這些要求嗎?
三、基於Sorted-Set的實現
3.1、SortSet類型使用說明
zset 可能是 Redis 提供的最為特色的數據結構,它也是在面試中面試官最愛問的數據結構。
- 一方面它是set,保證 value 的唯一性,
- 一方面它可以給每個 value 一個 score,代表排序權重。
它的內部實現用的是一種叫做「跳躍列表」的數據結構。
3.2、SortSet常用命令
zset 中最后一個 value 被移除后,數據結構自動刪除,內存被回收。
下面是zadd(添加元素)、zrange(順序)、zrevrange(逆序)、zscore(獲取score)、zrank(排名)、zrangebyscore(根據分值區間遍歷)、zrem(刪除)的使用示例,更多的命令說明見官方。
> zadd books 9.0 "think in java" > zadd books 8.9 "java concurrency" > zadd books 8.6 "java cookbook" > zrange books 0 -1 # 按 score 排序列出,參數區間為排名范圍 1) "java cookbook" 2) "java concurrency" 3) "think in java" > zrevrange books 0 -1 # 按 score 逆序列出,參數區間為排名范圍 1) "think in java" 2) "java concurrency" 3) "java cookbook" > zcard books # 相當於 count() (integer) 3 > zscore books "java concurrency" # 獲取指定 value 的 score "8.9000000000000004" # 內部 score 使用 double 類型進行存儲,所以存在小數點精度問題 > zrank books "java concurrency" # 排名 (integer) 1 > zrangebyscore books 0 8.91 # 根據分值區間遍歷 zset 1) "java cookbook" 2) "java concurrency" > zrangebyscore books -inf 8.91 withscores # 根據分值區間 (-∞, 8.91] 遍歷 zset,同時返回分值。inf 代表 infinite,無窮大的意思。 1) "java cookbook" 2) "8.5999999999999996" 3) "java concurrency" 4) "8.9000000000000004" > zrem books "java concurrency" # 刪除 value (integer) 1 > zrange books 0 -1 1) "java cookbook" 2) "think in java"
3.3、使用場景
- 粉絲列表,value 值是粉絲的用戶 ID,score 是關注時間
- 視頻網站需要對用戶上傳的視頻做排行榜,榜單維護可能是多方面:按照時間、按照播放量、按照獲得的贊數等。
-------------------------------------------------------------------------------
場景1-熱門文章排序
像博客、論壇等內容網站讓優質內容得到足夠曝光,是提高網站吸引力的重要方法。 今天我們就經常占據網站C位的 熱門文章列表
這一場景來詳細分析。
核心思路
我們引出一個 熱度
的概念,它其實是個代表文章的受歡迎程度的分數
。 我們把 發布時間、點贊、評論、瀏覽量... 通過公式轉化為熱度,再根據它來排序即可
實踐
文章按時間倒序排序,我們可以理解了一個隨時間衰減
的評分,這里可以使用Unix時間。而點贊、評論、瀏覽量...則乘以自己的權重(常量
),加上發布時間就等於文章評分
時間衰減評分
首先准備文章數據
:id:1文章最早發布,id:5文章最晚
[ 'id' => 1, 'title' => 'article 1', 'link' => 'http://article 1', 'user_id' => 1, 'votes' => 0, 'publish_time' => 1571190843 ],
[ 'id' => 2, 'title' => 'article 2', 'link' => 'http://article 2', 'user_id' => 1, 'votes' => 0, 'publish_time' => 1571190903 ],
[ 'id' => 3, 'title' => 'article 3', 'link' => 'http://article 3', 'user_id' => 1, 'votes' => 0, 'publish_time' => 1571190963 ],
[ 'id' => 4, 'title' => 'article 4', 'link' => 'http://article 4', 'user_id' => 1, 'votes' => 0, 'publish_time' => 1571191023 ],
[ 'id' => 5, 'title' => 'article 5', 'link' => 'http://article 5', 'user_id' => 1, 'votes' => 0, 'publish_time' => 1571191083 ]
同時在Redis建立起 articles_hit_rate
文章熱度 的有序列表(zset)
article 是文章id, score 為 熱度評分
測試zrevrangebyscore articles_hit_rate +inf -inf
根據分數倒序取值。結果;
local_redis:0>zrevrangebyscore articles_hit_rate +inf -inf
1) "5"
2) "4"
3) "3"
4) "2"
5) "1"
場景2-點贊👍
點贊、評論、瀏覽量的權重(常量),有個很好的計算方式: 發布一天內,你認為獲得多少點贊👍的文章是優質文章/列表首頁展示多少條數據。( 比如 100 ) 權重常量:86400 / 100 = 864 ( 一天有86400秒 )
/** * 點贊文章 * @param int $articleId 文章id * @param int $userId 用戶id * @param int $voteNum 點贊數 * @return bool */ public function voteArticle( int $articleId, int $userId, int $voteNum ) { if ( $this->isVoted( $articleId, $userId ) ) return false; // 已投票,返回 $this->userVoted( $articleId, $userId ); $this->RedisUtil->hIncrBy( self::LIST_ARTICLE_PREFIXX . $articleId, 'votes', $voteNum ); // 更新文章點贊數 $this->RedisUtil->zinCrBy( self::LIST_ARTICLE_HIT_RATE, ( $voteNum * self::LIKE_HIT_RATE ), $articleId ); // 更新文章熱度 return true; }
至於其他維度,也可以按照點贊方法以此類推... 當然為了防止用戶對同一片文章進行多次投票,還需要用Redis中無序列表set為每篇文章建立已投票用戶。這個在代碼里面有提現,這里就不過多討論...
比如:訂單超時未支付,取消訂單,恢復庫存.
對消息隊列有嚴格要求(不能丟)的建議還是使用kafka,專業的MQ。這些專業的消息中間件提供了很多功能特性,當然他的部署使用維護都是比較麻煩的。如果你對消息隊列沒那么高要求,想要輕量級的,使用Redis就沒錯啦。
Redis的數據結構Zset,同樣可以實現延遲隊列的效果,主要利用它的score屬性,Redis通過score來為集合中的成員進行從小到大的排序。

通過zadd命令向隊列delayqueue中添加元素,並設置score值表示元素過期的時間;向delayqueue添加三個order1、order2、order3,分別是10秒、20秒、30秒后過期。
zadd delayqueue 3 order3
消費端輪詢隊列delayqueue,將元素排序后取最小時間與當前時間比對,如小於當前時間代表已經過期移除key。
/** * 消費消息 */ public void pollOrderQueue() { while (true) { Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0); String value = ((Tuple) set.toArray()[0]).getElement(); int score = (int) ((Tuple) set.toArray()[0]).getScore(); Calendar cal = Calendar.getInstance(); int nowSecond = (int) (cal.getTimeInMillis() / 1000); if (nowSecond >= score) { jedis.zrem(DELAY_QUEUE, value); System.out.println(sdf.format(new Date()) + " removed key:" + value); } if (jedis.zcard(DELAY_QUEUE) <= 0) { System.out.println(sdf.format(new Date()) + " zset empty "); return; } Thread.sleep(1000); } }
我們看到執行結果符合預期:
2020-05-07 13:24:09 add finished. 2020-05-07 13:24:19 removed key:order1 2020-05-07 13:24:29 removed key:order2 2020-05-07 13:24:39 removed key:order3 2020-05-07 13:24:39 zset empty
Redis過期回調
Redis的key過期回調事件,也能達到延遲隊列的效果,簡單來說我們開啟監聽key是否過期的事件,一旦key過期會觸發一個callback事件。
修改redis.conf文件開啟notify-keyspace-events Ex。
notify-keyspace-events Ex
Redis監聽配置,注入Bean RedisMessageListenerContainer。
@Configuration public class RedisListenerConfig { @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } }
編寫Redis過期回調監聽方法,必須繼承KeyExpirationEventMessageListener ,有點類似於MQ的消息監聽。
@Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @Override public void onMessage(Message message, byte[] pattern) { String expiredKey = message.toString(); System.out.println("監聽到key:" + expiredKey + "已過期"); } }
到這代碼就編寫完成,非常的簡單,接下來測試一下效果,在redis-cli客戶端添加一個key並給定3s的過期時間。
set xiaofu 123 ex 3
在控制台成功監聽到了這個過期的key。
監聽到過期的key為:xiaofu
四、趨於成熟的隊列:Stream(Redis5.0的Stream類型)
Redis5.0最近被作者突然放出來了,增加了很多新的特色功能。而Redis5.0最大的新特性就是多出了一個數據結構Stream,它是一個新的強大的支持多播的可持久化的消息隊列,作者坦言Redis Stream狠狠地借鑒了Kafka的設計。
Redis Stream的結構如上圖所示,它有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的ID和對應的內容。消息是持久化的,Redis重啟后,內容還在。
Redis5.0中發布的Stream類型,也用來實現典型的消息隊列。該Stream類型的出現,幾乎滿足了消息隊列具備的全部內容,包括但不限於:
- 消息ID的序列化生成
- 消息遍歷
- 消息的阻塞和非阻塞讀取
- 消息的分組消費
- 未完成消息的處理
- 消息隊列監控
我們依舊從簡單到復雜,依次來看 Stream 在做消息隊列時,是如何處理的?
4.1、發布消息與消費消息
首先,Stream 通過 XADD 和 XREAD 完成最簡單的生產、消費模型:
- XADD:發布消息
- XREAD:讀取消息
XADD,命令用於在某個stream(流數據)中追加消息:
其中語法格式為:
XADD key ID field string [field string ...]
需要提供key,消息ID方案,消息內容,其中消息內容為key-value型數據。 ID,最常使用*,表示由Redis生成消息ID,這也是強烈建議的方案。 field string [field string], 就是當前消息內容,由1個或多個key-value構成。
XREAD,從Stream中讀取消息:
XREAD支持很多參數,語法格式為:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
其中:
- [COUNT count],用於限定獲取的消息數量
- [BLOCK milliseconds],用於設置XREAD為阻塞模式,默認為非阻塞模式
- ID,用於設置由哪個消息ID開始讀取。使用0表示從第一條消息開始。(本例中就是使用0)此處需要注意,消息隊列ID是單調遞增的,所以通過設置起點,可以向后讀取。在阻塞模式中,可以使用$,表示最新的消息ID。(在非阻塞模式下$無意義)。
XRED讀消息時分為阻塞和非阻塞模式,使用BLOCK選項可以表示阻塞模式,需要設置阻塞時長。非阻塞模式下,讀取完畢(即使沒有任何消息)立即返回,而在阻塞模式下,若讀取不到內容,則阻塞等待。
一個典型的阻塞模式用法為:
127.0.0.1:6379> XREAD block 1000 streams memberMessage $
(nil)
(1.07s)
我們使用Block模式,配合$作為ID,表示讀取最新的消息,若沒有消息,命令阻塞!等待過程中,其他客戶端向隊列追加消息,則會立即讀取到。
因此,典型的隊列就是 XADD 配合 XREAD Block 完成。XADD負責生成消息,XREAD負責消費消息。
4.2、消息ID說明
XADD生成的1553439850328-0
,就是Redis生成的消息ID,由兩部分組成:時間戳-序號。時間戳是毫秒級單位,是生成消息的Redis服務器時間,它是個64位整型(int64)。序號是在這個毫秒時間點內的消息序號,它也是個64位整型。較真來說,序號可能會溢出,but真可能嗎?
可以通過multi批處理,來驗證序號的遞增:
127.0.0.1:6379> MULTI OK 127.0.0.1:6379> XADD memberMessage * msg one QUEUED 127.0.0.1:6379> XADD memberMessage * msg two QUEUED 127.0.0.1:6379> XADD memberMessage * msg three QUEUED 127.0.0.1:6379> XADD memberMessage * msg four QUEUED 127.0.0.1:6379> XADD memberMessage * msg five QUEUED 127.0.0.1:6379> EXEC 1) "1553441006884-0" 2) "1553441006884-1" 3) "1553441006884-2" 4) "1553441006884-3" 5) "1553441006884-4"
由於一個redis命令的執行很快,所以可以看到在同一時間戳內,是通過序號遞增來表示消息的。
為了保證消息是有序的,因此Redis生成的ID是單調遞增有序的。由於ID中包含時間戳部分,為了避免服務器時間錯誤而帶來的問題(例如服務器時間延后了),Redis的每個Stream類型數據都維護一個latest_generated_id屬性,用於記錄最后一個消息的ID。若發現當前時間戳退后(小於latest_generated_id所記錄的),則采用時間戳不變而序號遞增的方案來作為新消息ID(這也是序號為什么使用int64的原因,保證有足夠多的的序號),從而保證ID的單調遞增性質。
強烈建議使用Redis的方案生成消息ID,因為這種時間戳+序號的單調遞增的ID方案,幾乎可以滿足你全部的需求。但同時,記住ID是支持自定義的,別忘了!
4.3、消費者組模式,consumer group
當多個消費者(consumer)同時消費一個消息隊列時,可以重復的消費相同的消息,就是消息隊列中有10條消息,三個消費者都可以消費到這10條消息。
但有時,我們需要多個消費者配合協作來消費同一個消息隊列,就是消息隊列中有10條消息,三個消費者分別消費其中的某些消息,比如消費者A消費消息1、2、5、8,消費者B消費消息4、9、10,而消費者C消費消息3、6、7。也就是三個消費者配合完成消息的消費,可以在消費能力不足,也就是消息處理程序效率不高時,使用該模式。該模式就是消費者組模式。如下圖所示:
消費者組模式的支持主要由兩個命令實現:
- XGROUP,用於管理消費者組,提供創建組,銷毀組,更新組起始消息ID等操作
- XREADGROUP,分組消費消息操作
進行演示,演示時使用5個消息,思路是:創建一個Stream消息隊列,生產者生成5條消息。在消息隊列上創建一個消費組,組內三個消費者進行消息消費:
# 生產者生成10條消息 127.0.0.1:6379> MULTI 127.0.0.1:6379> XADD mq * msg 1 # 生成一個消息:msg 1 127.0.0.1:6379> XADD mq * msg 2 127.0.0.1:6379> XADD mq * msg 3 127.0.0.1:6379> XADD mq * msg 4 127.0.0.1:6379> XADD mq * msg 5 127.0.0.1:6379> EXEC 1) "1553585533795-0" 2) "1553585533795-1" 3) "1553585533795-2" 4) "1553585533795-3" 5) "1553585533795-4" # 創建消費組 mqGroup 127.0.0.1:6379> XGROUP CREATE mq mqGroup 0 # 為消息隊列 mq 創建消費組 mgGroup OK # 消費者A,消費第1條 127.0.0.1:6379> XREADGROUP group mqGroup consumerA count 1 streams mq > #消費組內消費者A,從消息隊列mq中讀取一個消息 1) 1) "mq" 2) 1) 1) "1553585533795-0" 2) 1) "msg" 2) "1" # 消費者A,消費第2條 127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq > 1) 1) "mq" 2) 1) 1) "1553585533795-1" 2) 1) "msg" 2) "2" # 消費者B,消費第3條 127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerB COUNT 1 STREAMS mq > 1) 1) "mq" 2) 1) 1) "1553585533795-2" 2) 1) "msg" 2) "3" # 消費者A,消費第4條 127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA count 1 STREAMS mq > 1) 1) "mq" 2) 1) 1) "1553585533795-3" 2) 1) "msg" 2) "4" # 消費者C,消費第5條 127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerC COUNT 1 STREAMS mq > 1) 1) "mq" 2) 1) 1) "1553585533795-4" 2) 1) "msg" 2) "5"
上面的例子中,三個在同一組 mpGroup 消費者A、B、C在消費消息時(消費者在消費時指定即可,不用預先創建),有着互斥原則,消費方案為,A->1, A->2, B->3, A->4, C->5。語法說明為:
XGROUP CREATE mq mqGroup 0
,用於在消息隊列mq上創建消費組 mpGroup,最后一個參數0,表示該組從第一條消息開始消費。(意義與XREAD的0一致)。除了支持CREATE
外,還支持SETID
設置起始ID,DESTROY
銷毀組,DELCONSUMER
刪除組內消費者等操作。
XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >
,用於組mqGroup
內消費者consumerA
在隊列mq
中消費,參數>
表示未被組內消費的起始消息,參數count 1
表示獲取一條。語法與XREAD
基本一致,不過是增加了組的概念。
可以進行組內消費的基本原理是,STREAM類型會為每個組記錄一個最后處理(交付)的消息ID(last_delivered_id),這樣在組內消費時,就可以從這個值后面開始讀取,保證不重復消費。
以上就是消費組的基礎操作。除此之外,消費組消費時,還有一個必須要考慮的問題,就是若某個消費者,消費了某條消息,但是並沒有處理成功時(例如消費者進程宕機),這條消息可能會丟失,因為組內其他消費者不能再次消費到該消息了。下面繼續討論解決方案。
4.4、Pending 等待列表
為了解決組內消息讀取但處理期間消費者崩潰帶來的消息丟失問題,STREAM
設計了 Pending
列表,用於記錄讀取但並未處理完畢的消息。命令XPENDIING
用來獲消費組或消費內消費者的未處理完畢的消息。演示如下:
127.0.0.1:6379> XPENDING mq mqGroup # mpGroup的Pending情況 1) (integer) 5 # 5個已讀取但未處理的消息 2) "1553585533795-0" # 起始ID 3) "1553585533795-4" # 結束ID 4) 1) 1) "consumerA" # 消費者A有3個 2) "3" 2) 1) "consumerB" # 消費者B有1個 2) "1" 3) 1) "consumerC" # 消費者C有1個 2) "1" 127.0.0.1:6379> XPENDING mq mqGroup - + 10 # 使用 start end count 選項可以獲取詳細信息 1) 1) "1553585533795-0" # 消息ID 2) "consumerA" # 消費者 3) (integer) 1654355 # 從讀取到現在經歷了1654355ms,IDLE 4) (integer) 5 # 消息被讀取了5次,delivery counter 2) 1) "1553585533795-1" 2) "consumerA" 3) (integer) 1654355 4) (integer) 4 # 共5個,余下3個省略 ... 127.0.0.1:6379> XPENDING mq mqGroup - + 10 consumerA # 在加上消費者參數,獲取具體某個消費者的Pending列表 1) 1) "1553585533795-0" 2) "consumerA" 3) (integer) 1641083 4) (integer) 5 # 共3個,余下2個省略 ...
每個Pending的消息有4個屬性:
- 消息ID
- 所屬消費者
- IDLE,已讀取時長
- delivery counter,消息被讀取次數
上面的結果我們可以看到,我們之前讀取的消息,都被記錄在Pending列表中,說明全部讀到的消息都沒有處理,僅僅是讀取了。那如何表示消費者處理完畢了消息呢?使用命令 XACK
完成告知消息處理完成,演示如下:
127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # 通知消息處理結束,用消息ID標識 (integer) 1 127.0.0.1:6379> XPENDING mq mqGroup # 再次查看Pending列表 1) (integer) 4 # 已讀取但未處理的消息已經變為4個 2) "1553585533795-1" 3) "1553585533795-4" 4) 1) 1) "consumerA" # 消費者A,還有2個消息處理 2) "2" 2) 1) "consumerB" 2) "1" 3) 1) "consumerC" 2) "1" 127.0.0.1:6379>
有了這樣一個Pending機制,就意味着在某個消費者讀取消息但未處理后,消息是不會丟失的。等待消費者再次上線后,可以讀取該Pending列表,就可以繼續處理該消息了,保證消息的有序和不丟失。
此時還有一個問題,就是若某個消費者宕機之后,沒有辦法再上線了,那么就需要將該消費者Pending的消息,轉義給其他的消費者處理,就是消息轉移。請繼續。
4.5、 消息轉移
消息轉移的操作時將某個消息轉移到自己的Pending列表中。使用語法XCLAIM
來實現,需要設置組、轉移的目標消費者和消息ID,同時需要提供IDLE(已被讀取時長),只有超過這個時長,才能被轉移。演示如下:
# 當前屬於消費者A的消息1553585533795-1,已經15907,787ms未處理了
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
2) "consumerA"
3) (integer) 15907787
4) (integer) 4
# 轉移超過3600s的消息1553585533795-1到消費者B的Pending列表
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
1) 1) "1553585533795-1"
2) 1) "msg"
2) "2"
# 消息1553585533795-1已經轉移到消費者B的Pending中。
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
2) "consumerB"
3) (integer) 84404 # 注意IDLE,被重置了
4) (integer) 5 # 注意,讀取次數也累加了1次
以上代碼,完成了一次消息轉移。轉移除了要指定ID外,還需要指定IDLE,保證是長時間未處理的才被轉移。被轉移的消息的IDLE會被重置,用以保證不會被重復轉移,以為可能會出現將過期的消息同時轉移給多個消費者的並發操作,設置了IDLE,則可以避免后面的轉移不會成功,因為IDLE不滿足條件。例如下面的連續兩條轉移,第二條不會成功。
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1
這就是消息轉移。至此我們使用了一個Pending消息的ID,所屬消費者和IDLE的屬性,還有一個屬性就是消息被讀取次數,delivery counter,該屬性的作用由於統計消息被讀取的次數,包括被轉移也算。這個屬性主要用在判定是否為錯誤數據上。請繼續看:
4.6 壞消息問題,Dead Letter,死信問題
正如上面所說,如果某個消息,不能被消費者處理,也就是不能被XACK,這是要長時間處於Pending列表中,即使被反復的轉移給各個消費者也是如此。此時該消息的delivery counter就會累加(上一節的例子可以看到),當累加到某個我們預設的臨界值時,我們就認為是壞消息(也叫死信,DeadLetter,無法投遞的消息),由於有了判定條件,我們將壞消息處理掉即可,刪除即可。刪除一個消息,使用XDEL
語法,演示如下:
# 刪除隊列中的消息
127.0.0.1:6379> XDEL mq 1553585533795-1
(integer) 1
# 查看隊列中再無此消息
127.0.0.1:6379> XRANGE mq - +
1) 1) "1553585533795-0"
2) 1) "msg"
2) "1"
2) 1) "1553585533795-2"
2) 1) "msg"
2) "3"
注意本例中,並沒有刪除Pending中的消息因此你查看Pending,消息還會在。可以執行XACK
標識其處理完畢!
4.7 信息監控,XINFO
Stream提供了XINFO來實現對服務器信息的監控,可以查詢:
查看隊列信息
127.0.0.1:6379> Xinfo stream mq
1) "length"
2) (integer) 7
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "groups"
8) (integer) 1
9) "last-generated-id"
10) "1553585533795-9"
11) "first-entry"
12) 1) "1553585533795-3"
2) 1) "msg"
2) "4"
13) "last-entry"
14) 1) "1553585533795-9"
2) 1) "msg"
2) "10"
消費組信息
127.0.0.1:6379> Xinfo groups mq
1) 1) "name"
2) "mqGroup"
3) "consumers"
4) (integer) 3
5) "pending"
6) (integer) 3
7) "last-delivered-id"
8) "1553585533795-4"
消費者組成員信息
127.0.0.1:6379> XINFO CONSUMERS mq mqGroup
1) 1) "name"
2) "consumerA"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 18949894
2) 1) "name"
2) "consumerB"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 3092719
3) 1) "name"
2) "consumerC"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 23683256
至此,消息隊列的操作說明大體結束!
4.8 命令一覽
|命令|說明| |:---|:---| |XACK|結束Pending| |XADD|生成消息| |XCLAIM|消息轉移| |XDEL|刪除消息| |XGROUP|消費組管理| |XINFO|得到消費組信息| |XLEN|消息隊列長度| |XPENDING|Pending列表| |XRANGE|獲取消息隊列中消息| |XREAD|消費消息| |XREADGROUP|分組消費消息| |XREVRANGE|逆序獲取消息隊列中消息| |XTRIM|消息隊列容量|
4.9 Stream數據結構,RadixTree,基數樹
Stream
是基於 RadixTree
數據結構實現的。另立話題討論。基數樹,http://www.hellokang.net/algorithm/radix-tree.html
五、stream的隊列示例
生產者發布 2 條消息:
// *表示讓Redis自動生成消息ID 127.0.0.1:6379> XADD queue * name zhangsan "1618469123380-0" 127.0.0.1:6379> XADD queue * name lisi "1618469127777-0"
使用 XADD 命令發布消息,其中的「*」表示讓 Redis 自動生成唯一的消息 ID。
這個消息 ID 的格式是「時間戳-自增序號」。
消費者拉取消息:
// 從開頭讀取5條消息,0-0表示從開頭讀取 127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0 1) 1) "queue" 2) 1) 1) "1618469123380-0" 2) 1) "name" 2) "zhangsan" 2) 1) "1618469127777-0" 2) 1) "name" 2) "lisi"
如果想繼續拉取消息,需要傳入上一條消息的 ID:
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0
(nil)
沒有消息,Redis 會返回 NULL。
以上就是 Stream 最簡單的生產、消費。
這里不再重點介紹 Stream 命令的各種參數,我在例子中演示時,凡是大寫的單詞都是「固定」參數,凡是小寫的單詞,都是可以自己定義的,例如隊列名、消息長度等等,下面的例子規則也是一樣,為了方便你理解,這里有必要提醒一下。
下面我們來看,針對前面提到的消息隊列要求,Stream 都是如何解決的?
1) Stream 是否支持「阻塞式」拉取消息?
可以的,在讀取消息時,只需要增加 BLOCK 參數即可。
// BLOCK 0 表示阻塞等待,不設置超時時間
127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0
這時,消費者就會阻塞等待,直到生產者發布新的消息才會返回。
2) Stream 是否支持發布 / 訂閱模式?
也沒問題,Stream 通過以下命令完成發布訂閱:
- XGROUP:創建消費者組
- XREADGROUP:在指定消費組下,開啟消費者拉取消息
下面我們來看具體如何做?
首先,生產者依舊發布 2 條消息:
127.0.0.1:6379> XADD queue * name zhangsan
"1618470740565-0"
127.0.0.1:6379> XADD queue * name lisi
"1618470743793-0"
之后,我們想要開啟 2 組消費者處理同一批數據,就需要創建 2 個消費者組:
// 創建消費者組1,0-0表示從頭拉取消息
127.0.0.1:6379> XGROUP CREATE queue group1 0-0
OK
// 創建消費者組2,0-0表示從頭拉取消息
127.0.0.1:6379> XGROUP CREATE queue group2 0-0
OK
消費者組創建好之后,我們可以給每個「消費者組」下面掛一個「消費者」,讓它們分別處理同一批數據。
第一個消費組開始消費:
// group1的consumer開始消費,>表示拉取最新數據
127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
2) 1) 1) "1618470740565-0"
2) 1) "name"
2) "zhangsan"
2) 1) "1618470743793-0"
2) 1) "name"
2) "lisi"
同樣地,第二個消費組開始消費:
// group2的consumer開始消費,>表示拉取最新數據
127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
2) 1) 1) "1618470740565-0"
2) 1) "name"
2) "zhangsan"
2) 1) "1618470743793-0"
2) 1) "name"
2) "lisi"
我們可以看到,這 2 組消費者,都可以獲取同一批數據進行處理了。
這樣一來,就達到了多組消費者「訂閱」消費的目的。
3) 消息處理時異常,Stream 能否保證消息不丟失,重新消費?
除了上面拉取消息時用到了消息 ID,這里為了保證重新消費,也要用到這個消息 ID。
當一組消費者處理完消息后,需要執行 XACK 命令告知 Redis,這時 Redis 就會把這條消息標記為「處理完成」。
// group1下的 1618472043089-0 消息已處理完成
127.0.0.1:6379> XACK queue group1 1618472043089-0
如果消費者異常宕機,肯定不會發送 XACK,那么 Redis 就會依舊保留這條消息。
待這組消費者重新上線后,Redis 就會把之前沒有處理成功的數據,重新發給這個消費者。這樣一來,即使消費者異常,也不會丟失數據了。
// 消費者重新上線,0-0表示重新拉取未ACK的消息
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0
// 之前沒消費成功的數據,依舊可以重新消費
1) 1) "queue"
2) 1) 1) "1618472043089-0"
2) 1) "name"
2) "zhangsan"
2) 1) "1618472045158-0"
2) 1) "name"
2) "lisi"
4) Stream 數據會寫入到 RDB 和 AOF 做持久化嗎?
Stream 是新增加的數據類型,它與其它數據類型一樣,每個寫操作,也都會寫入到 RDB 和 AOF 中。
我們只需要配置好持久化策略,這樣的話,就算 Redis 宕機重啟,Stream 中的數據也可以從 RDB 或 AOF 中恢復回來。
5) 消息堆積時,Stream 是怎么處理的?
其實,當消息隊列發生消息堆積時,一般只有 2 個解決方案:
- 生產者限流:避免消費者處理不及時,導致持續積壓
- 丟棄消息:中間件丟棄舊消息,只保留固定長度的新消息
而 Redis 在實現 Stream 時,采用了第 2 個方案。
在發布消息時,你可以指定隊列的最大長度,防止隊列積壓導致內存爆炸。
// 隊列長度最大10000
127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan
"1618473015018-0"
當隊列長度超過上限后,舊消息會被刪除,只保留固定長度的新消息。
這么來看,Stream 在消息積壓時,如果指定了最大長度,還是有可能丟失消息的。
除了以上介紹到的命令,Stream 還支持查看消息長度(XLEN)、查看消費者狀態(XINFO)等命令,使用也比較簡單,你可以查詢官方文檔了解一下,這里就不過多介紹了。
好了,通過以上介紹,我們可以看到,Redis 的 Stream 幾乎覆蓋到了消息隊列的各種場景,是不是覺得很完美?
既然它的功能這么強大,這是不是意味着,Redis 真的可以作為專業的消息隊列中間件來使用呢?
但是還「差一點」,就算 Redis 能做到以上這些,也只是「趨近於」專業的消息隊列。
原因在於 Redis 本身的一些問題,如果把其定位成消息隊列,還是有些欠缺的。
到這里,就不得不把 Redis 與專業的隊列中間件做對比了。
下面我們就來看一下,Redis 在作隊列時,到底還有哪些欠缺?
六、與專業的消息隊列對比
其實,一個專業的消息隊列,必須要做到兩大塊:
- 消息不丟
- 消息可堆積
前面我們討論的重點,很大篇幅圍繞的是第一點展開的。
這里我們換個角度,從一個消息隊列的「使用模型」來分析一下,怎么做,才能保證數據不丟?
使用一個消息隊列,其實就分為三大塊:生產者、隊列中間件、消費者。
消息是否會發生丟失,其重點也就在於以下 3 個環節:
- 生產者會不會丟消息?
- 消費者會不會丟消息?
- 隊列中間件會不會丟消息?
1) 生產者會不會丟消息?
當生產者在發布消息時,可能發生以下異常情況:
- 消息沒發出去:網絡故障或其它問題導致發布失敗,中間件直接返回失敗
- 不確定是否發布成功:網絡問題導致發布超時,可能數據已發送成功,但讀取響應結果超時了
如果是情況 1,消息根本沒發出去,那么重新發一次就好了。
如果是情況 2,生產者沒辦法知道消息到底有沒有發成功?所以,為了避免消息丟失,它也只能繼續重試,直到發布成功為止。
生產者一般會設定一個最大重試次數,超過上限依舊失敗,需要記錄日志報警處理。
也就是說,生產者為了避免消息丟失,只能采用失敗重試的方式來處理。
但發現沒有?這也意味着消息可能會重復發送。
是的,在使用消息隊列時,要保證消息不丟,寧可重發,也不能丟棄。
那消費者這邊,就需要多做一些邏輯了。
對於敏感業務,當消費者收到重復數據數據時,要設計冪等邏輯,保證業務的正確性。
從這個角度來看,生產者會不會丟消息,取決於生產者對於異常情況的處理是否合理。
所以,無論是 Redis 還是專業的隊列中間件,生產者在這一點上都是可以保證消息不丟的。
2) 消費者會不會丟消息?
這種情況就是我們前面提到的,消費者拿到消息后,還沒處理完成,就異常宕機了,那消費者還能否重新消費失敗的消息?
要解決這個問題,消費者在處理完消息后,必須「告知」隊列中間件,隊列中間件才會把標記已處理,否則仍舊把這些數據發給消費者。
這種方案需要消費者和中間件互相配合,才能保證消費者這一側的消息不丟。
無論是 Redis 的 Stream,還是專業的隊列中間件,例如 RabbitMQ、Kafka,其實都是這么做的。
所以,從這個角度來看,Redis 也是合格的。
3) 隊列中間件會不會丟消息?
前面 2 個問題都比較好處理,只要客戶端和服務端配合好,就能保證生產端、消費端都不丟消息。
但是,如果隊列中間件本身就不可靠呢?
畢竟生產者和消費這都依賴它,如果它不可靠,那么生產者和消費者無論怎么做,都無法保證數據不丟。
在這個方面,Redis 其實沒有達到要求。
Redis 在以下 2 個場景下,都會導致數據丟失。
- AOF 持久化配置為每秒寫盤,但這個寫盤過程是異步的,Redis 宕機時會存在數據丟失的可能
- 主從復制也是異步的,主從切換時,也存在丟失數據的可能(從庫還未同步完成主庫發來的數據,就被提成主庫)
基於以上原因我們可以看到,Redis 本身的無法保證嚴格的數據完整性。
所以,如果把 Redis 當做消息隊列,在這方面是有可能導致數據丟失的。
再來看那些專業的消息隊列中間件是如何解決這個問題的?
像 RabbitMQ 或 Kafka 這類專業的隊列中間件,在使用時,一般是部署一個集群,生產者在發布消息時,隊列中間件通常會寫「多個節點」,以此保證消息的完整性。這樣一來,即便其中一個節點掛了,也能保證集群的數據不丟失。
也正因為如此,RabbitMQ、Kafka在設計時也更復雜。畢竟,它們是專門針對隊列場景設計的。
但 Redis 的定位則不同,它的定位更多是當作緩存來用,它們兩者在這個方面肯定是存在差異的。
最后,我們來看消息積壓怎么辦?
4) 消息積壓怎么辦?
因為 Redis 的數據都存儲在內存中,這就意味着一旦發生消息積壓,則會導致 Redis 的內存持續增長,如果超過機器內存上限,就會面臨被 OOM 的風險。
所以,Redis 的 Stream 提供了可以指定隊列最大長度的功能,就是為了避免這種情況發生。
但 Kafka、RabbitMQ 這類消息隊列就不一樣了,它們的數據都會存儲在磁盤上,磁盤的成本要比內存小得多,當消息積壓時,無非就是多占用一些磁盤空間,相比於內存,在面對積壓時也會更加「坦然」。
綜上,我們可以看到,把 Redis 當作隊列來使用時,始終面臨的 2 個問題:
- Redis 本身可能會丟數據
- 面對消息積壓,Redis 內存資源緊張
到這里,Redis 是否可以用作隊列,我想這個答案你應該會比較清晰了。
如果你的業務場景足夠簡單,對於數據丟失不敏感,而且消息積壓概率比較小的情況下,把 Redis 當作隊列是完全可以的。
而且,Redis 相比於 Kafka、RabbitMQ,部署和運維也更加輕量。
如果你的業務場景對於數據丟失非常敏感,而且寫入量非常大,消息積壓時會占用很多的機器資源,那么我建議你使用專業的消息隊列中間件。
七、總結
好了,總結一下。這篇文章我們從「Redis 能否用作隊列」這個角度出發,介紹了 List、Pub/Sub、Stream 在做隊列的使用方式,以及它們各自的優劣。
之后又把 Redis 和專業的消息隊列中間件做對比,發現 Redis 的不足之處。
最后,我們得出 Redis 做隊列的合適場景。
這里我也列了一個表格,總結了它們各自的優缺點。
后記
最后,我想和你再聊一聊關於「技術方案選型」的問題。
你應該也看到了,這篇文章雖然始於 Redis,但並不止於 Redis。
我們在分析 Redis 細節時,一直在提出問題,然后尋找更好的解決方案,在文章最后,又聊到一個專業的消息隊列應該怎么做。
其實,我們在討論技術選型時,就是一個關於如何取舍的問題。
而這里我想傳達給你的信息是,在面對技術選型時,不要不經過思考就覺得哪個方案好,哪個方案不好。
你需要根據具體場景具體分析,這里我把這個分析過程分為 2 個層面:
- 業務功能角度
- 技術資源角度
這篇文章所講到的內容,都是以業務功能角度出發做決策的。
但這里的第二點,從技術資源角度出發,其實也很重要。
技術資源的角度是說,你所處的公司環境、技術資源能否匹配這些技術方案。
這個怎么解釋呢?
簡單來講,就是你所在的公司、團隊,是否有匹配的資源能 hold 住這些技術方案。
我們都知道 Kafka、RabbitMQ 是非常專業的消息中間件,但它們的部署和運維,相比於 Redis 來說,也會更復雜一些。
如果你在一個大公司,公司本身就有優秀的運維團隊,那么使用這些中間件肯定沒問題,因為有足夠優秀的人能 hold 住這些中間件,公司也會投入人力和時間在這個方向上。
但如果你是在一個初創公司,業務正處在快速發展期,暫時沒有能 hold 住這些中間件的團隊和人,如果貿然使用這些組件,當發生故障時,排查問題也會變得很困難,甚至會阻礙業務的發展。
而這種情形下,如果公司的技術人員對於 Redis 都很熟,綜合評估來看,Redis 也基本可以滿足業務 90% 的需求,那當下選擇 Redis 未必不是一個好的決策。
所以,做技術選型不只是技術問題,還與人、團隊、管理、組織結構有關。
也正是因為這些原因,當你在和別人討論技術選型問題時,你會發現每個公司的做法都不相同。
畢竟每個公司所處的環境和文化不一樣,做出的決策當然就會各有差異。
如果你不了解這其中的邏輯,那在做技術選型時,只會趨於表面現象,無法深入到問題根源。
而一旦你理解了這個邏輯,那么你在看待這個問題時,不僅對於技術會有更加深刻認識,對技術資源和人的把握,也會更加清晰。
希望你以后在做技術選型時,能夠把這些因素也考慮在內,這對你的技術成長之路也是非常有幫助的。
轉:
https://mp.weixin.qq.com/s/jYldDKkxypj53NaogQj40Q
https://www.imooc.com/article/298523
https://zhuanlan.zhihu.com/p/60501638
https://juejin.cn/post/6844903968326287368