分析 Redis 是否適合作為消息隊列


本文為搬運,根據自己理解稍作修改,支持原創:https://mp.weixin.qq.com/s/G31OEGmi0OtTBGIJT8g4jQ

注 :文章最后關於技術選型方面的探討干貨滿滿,強烈建議一鍵三連

前言

  關於「把 Redis 當作隊列來用是否合適」的問題有些人表示贊成,也些人則反對。Redis 很輕量,用作隊列很方便,但是 Redis 會丟數據,所以很多人認為最好還是用專業的隊列中間件更穩妥。

 

what's the 隊列

場景一  

  如果業務需求足夠簡單,想把 Redis 當作隊列來使用,最先想到的肯定是 Redis 的 List 數據類型。因為 List 底層的實現就是一個鏈表,在頭部和尾部操作元素,時間復雜度都是 O(1),這意味着它非常符合消息隊列的模型。

如果把 List 當作隊列,可以這么來用。

127.0.0.1:6379> LPUSH queue msg1
(integer) 1
127.0.0.1:6379> LPUSH queue msg2
(integer) 2
生產者使用 LPUSH 發布消息
127.0.0.1:6379> RPOP queue
"msg1"
127.0.0.1:6379> RPOP queue
"msg2"
消費者使用 RPOP 拉取消息

 

相關模型如圖

 

  但這里有個小問題,當隊列中已經沒有消息了,消費者在執行 RPOP 時,會返回 NULL。 而代碼層面在編寫消費者邏輯時,一般是一個死循環,這個邏輯需要不斷地從隊列中拉取消息進行處理。如果此時隊列為空,消費者頻繁拉取消息,會造成CPU 空轉,不僅浪費 CPU 資源,還會對 Redis 造成壓力。

  為了解決這個問題,大部分人選擇的方案會是——當隊列為空時,程序休眠一會,再去嘗試拉取消息。

  但是這會帶來另一個問題:當消費者在休眠等待時,有新消息來了,那消費者處理新消息就會存在延遲。假設設置的休眠時間是 2s,那新消息最多存在 2s 的延遲。要想縮短這個延遲,只能減小休眠的時間。但休眠時間越小,又有可能引發 CPU 空轉問題。

  那如何做,既能及時處理新消息,還能避免 CPU 空轉呢?Redis 提供了「阻塞式」拉取消息的命令:BRPOP / BLPOP,這里的 B 指的是阻塞(Block)。

  使用 BRPOP 這種阻塞式方式拉取消息時,還支持傳入一個「超時時間」,如果設置為 0,則表示不設置超時,直到有新消息才返回,否則會在指定的超時時間后返回 NULL。這個方案不錯,既兼顧了效率,還避免了 CPU 空轉問題,一舉兩得。

注意:如果設置的超時時間太長,這個連接太久沒有活躍過,可能會被 Redis Server 判定為無效連接,之后 Redis Server 會強制把這個客戶端踢下線。所以,采用這種方案,客戶端要有重連機制。 

 

依然存在的缺點

  1. 不支持重復消費:消費者拉取消息后,這條消息就從 List 中刪除了,無法被其它消費者再次消費,即不支持多個消費者消費同一批數據
  2. 消息丟失:消費者拉取到消息后,如果發生異常宕機,那這條消息就丟失了

 

Redis 的發布/訂閱模型:Pub/Sub

   為了解決前面提到的第一個缺點, Redis 專門針對「發布/訂閱」這種隊列模型設計了命令,來完成多組生產者、消費者的場景下的發布、訂閱的操作。

 

 

場景二 

  一個業務中有 2 個消費者,同時消費同一批數據,就可以按照以下方式來實現

// 2個消費者 都訂閱一個隊列
127.0.0.1:6379> SUBSCRIBE queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1
SUBSCRIBE 命令,啟動 2 個消費者,並訂閱同一個隊列

此時,2 個消費者都會被阻塞住,等待新消息的到來。之后,再啟動一個生產者,發布一條消息。

127.0.0.1:6379> PUBLISH queue msg1
(integer) 1
生產者

這時,2 個消費者就會解除阻塞,收到生產者發來的新消息。

127.0.0.1:6379> SUBSCRIBE queue
// 收到新消息
1) "message"
2) "queue"
3) "msg1"
消費者受到消息

 

 

  使用 Pub/Sub 這種方案,既支持阻塞式拉取消息,還很好地滿足了多組消費者,消費同一批數據的業務需求。

  除此之外,Pub/Sub 還提供了匹配訂閱模式,允許消費者根據一定規則,訂閱多個自己感興趣的隊列。

場景三

// 訂閱符合規則的隊列
127.0.0.1:6379> PSUBSCRIBE queue.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "queue.*"
3) (integer) 1
消費者訂閱了 queue.* 相關的隊列消息

 

127.0.0.1:6379> PUBLISH queue.p1 msg1
(integer) 1
127.0.0.1:6379> PUBLISH queue.p2 msg2
(integer) 1
生產者分別向 queue.p1 和 queue.p2 發布消息
127.0.0.1:6379> PSUBSCRIBE queue.*
Reading messages... (press Ctrl-C to quit)
...
// 來自queue.p1的消息
1) "pmessage"
2) "queue.*"
3) "queue.p1"
4) "msg1"

// 來自queue.p2的消息
1) "pmessage"
2) "queue.*"
3) "queue.p2"
4) "msg2"
再看消費者,它就可以接收到這 2 個生產者的消息了

 

依然存在的缺點

 

   丟數據:

  1. 消費者下線,數據會丟失
  2. 不支持數據持久化,Redis 宕機,數據也會丟失
  3. 消息堆積,緩沖區溢出,消費者會被強制踢下線,數據也會丟失

 

Pub/Sub 在實現時非常簡單,它沒有基於任何數據類型,也沒有做任何的數據存儲,它只是單純地為生產者、消費者建立「數據轉發通道」,把符合規則的數據,從一端轉發到另一端。

一個完整的發布、訂閱消息處理流程是這樣的:

  1. 消費者訂閱指定隊列,Redis 就會記錄一個映射關系:隊列->消費者
  2. 生產者向這個隊列發布消息,那 Redis 就從映射關系中找出對應的消費者,把消息轉發給它

整個過程中,沒有任何的數據存儲,一切都是實時轉發的。這種設計方案,就導致了上面提到的那些問題。

  例如,如果一個消費者異常掛掉了,它再重新上線后,只能接收新的消息,在下線期間生產者發布的消息,因為找不到消費者,都會被丟棄掉。如果所有消費者都下線了,那生產者發布的消息,因為找不到任何一個消費者,也會全部丟棄。所以,在使用 Pub/Sub 時,一定要注意:消費者必須先訂閱隊列,生產者才能發布消息,否則消息會丟失。

  另外,因為 Pub/Sub 沒有基於任何數據類型實現,所以它也不具備數據持久化的能力。也就是說,Pub/Sub 的相關操作,不會寫入到 RDB 和 AOF 中,當 Redis 宕機重啟,Pub/Sub 的數據也會全部丟失。

  並且,Pub/Sub 在處理消息積壓時,也會丟數據。當消費者的速度,跟不上生產者時,就會導致數據積壓的情況發生。如果采用 List 當作隊列,消息積壓時,會導致這個鏈表很長,最直接的影響就是,Redis 內存會持續增長,直到消費者把所有數據都從鏈表中取出。但 Pub/Sub 的處理方式卻不一樣,當消息積壓時,有可能會導致消費失敗和消息丟失!

  每個消費者訂閱一個隊列時,Redis 都會在 Server 上給這個消費者在分配一個緩沖區,這個緩沖區其實就是一塊內存。當生產者發布消息時,Redis 先把消息寫到對應消費者的緩沖區中。之后,消費者不斷地從緩沖區讀取消息,處理消息。但是,這個緩沖區其實是有上限的(可配置),如果消費者拉取消息很慢,就會造成生產者發布到緩沖區的消息開始積壓,緩沖區內存持續增長。如果超過了緩沖區配置的上限,此時,Redis 就會強制把這個消費者踢下線。這時消費者就會消費失敗,也會丟失數據。

緩沖區默認配置

  • 32mb:緩沖區一旦超過 32MB,Redis 直接強制把消費者踢下線
  • 8mb + 60:緩沖區超過 8MB,並且持續 60 秒,Redis 也會把消費者踢下線

 

所以,Pub/Sub 在實際的應用場景中用得並不多,顯得很雞肋。目前只有哨兵集群和 Redis 實例通信時,采用了 Pub/Sub 的方案,因為哨兵正好符合即時通訊的業務場景。

 

Redis 新隊列 Stream

  Redis 作者在開發 Redis 期間,還另外開發了一個開源項目 disque。這個項目的定位,就是一個基於內存的分布式消息隊列中間件。但由於種種原因,這個項目一直不溫不火。

  終於,在 Redis 5.0 版本,作者把 disque 功能移植到了 Redis 中,並給它定義了一個新的數據類型:Stream

Stream 通過 XADD 和 XREAD 完成最簡單的生產、消費模型:

  • XADD:發布消息
  • XREAD:讀取消息

場景四

// *表示讓Redis自動生成消息ID
127.0.0.1:6379> XADD queue * name zhangsan
"1618469123380-0"
127.0.0.1:6379> XADD queue * name lisi
"1618469127777-0"
生產者使用XADD發布 2 條消息

 

// 從開頭讀取5條消息,0-0表示從開頭讀取
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0
1) 1) "queue"
   2) 1) 1) "1618469123380-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618469127777-0"
         2) 1) "name"
            2) "lisi"
消費者拉取消息
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0
(nil)
消費者拉取消息,沒有消息,Redis 會返回 NULL。

 

 

針對前面提到的消息隊列要求,Stream 都是如何解決的?

 

1. Stream 是否支持「阻塞式」拉取消息?

  可以的,在讀取消息時,只需要增加 BLOCK 參數即可。

// BLOCK 0 表示阻塞等待,不設置超時時間
127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0
View Code

 

2. Stream 是否支持發布 / 訂閱模式?

  沒問題,Stream 通過以下命令完成發布訂閱:

  • XGROUP:創建消費者組
  • XREADGROUP:在指定消費組下,開啟消費者拉取消息
127.0.0.1:6379> XADD queue * name zhangsan
"1618470740565-0"
127.0.0.1:6379> XADD queue * name lisi
"1618470743793-0"
首先,生產者依舊發布 2 條消息

 

之后,開啟 2 組消費者處理同一批數據

 

// group1的consumer開始消費,>表示拉取最新數據
127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
   2) 1) 1) "1618470740565-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618470743793-0"
         2) 1) "name"
            2) "lisi"
第一個消費組開始消費

 

// group2的consumer開始消費,>表示拉取最新數據
127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
   2) 1) 1) "1618470740565-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618470743793-0"
         2) 1) "name"
            2) "lisi"
第二個消費組開始消費

3. 消息處理時異常,Stream 能否保證消息不丟失,重新消費?

  除了上面拉取消息時用到了消息 ID,這里為了保證重新消費,也要用到這個消息 ID。

  當一組消費者處理完消息后,需要執行 XACK 命令告知 Redis,這時 Redis 就會把這條消息標記為處理完成。

// group1下的 1618472043089-0 消息已處理完成
127.0.0.1:6379> XACK queue group1 1618472043089-0
View Code

 

 

  如果消費者異常宕機,肯定不會發送 XACK,那么 Redis 就會依舊保留這條消息。待這組消費者重新上線后,Redis 就會把之前沒有處理成功的數據,重新發給這個消費者。這樣一來,即使消費者異常,也不會丟失數據了。

4. Stream 數據會寫入到 RDB 和 AOF 做持久化嗎?

  Stream 是新增加的數據類型,它與其它數據類型一樣,每個寫操作,也都會寫入到 RDB 和 AOF 中。

  只需要配置好持久化策略,這樣的話,就算 Redis 宕機重啟,Stream 中的數據也可以從 RDB 或 AOF 中恢復回來

5. 消息堆積時,Stream 是怎么處理的?

其實,當消息隊列發生消息堆積時,一般只有 2 個解決方案:

  1. 生產者限流:避免消費者處理不及時,導致持續積壓
  2. 丟棄消息:中間件丟棄舊消息,只保留固定長度的新消息

  而 Redis 在實現 Stream 時,采用了第 2 個方案。在發布消息時,可以指定隊列的最大長度,防止隊列積壓導致內存爆炸。當隊列長度超過上限后,舊消息會被刪除,只保留固定長度的新消息。這么來看,Stream 在消息積壓時,如果指定了最大長度,還是有可能丟失消息的。

// 隊列長度最大10000
127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan
"1618473015018-0"
View Code

 

 

依然存在的缺點

  可以看到,Redis 的 Stream 幾乎覆蓋到了消息隊列的各種場景,是不是覺得很完美?

  既然它的功能這么強大,這是不是意味着,Redis 真的可以作為專業的消息隊列中間件來使用呢?

  但是還差一點,就算 Redis 能做到以上這些,也只是趨近於專業的消息隊列。原因在於 Redis 本身的一些問題,如果把其定位成消息隊列,還是有些欠缺的。

  到這里,就不得不把 Redis 與專業的隊列中間件做對比了。

 

Redis 與專業的消息隊列對比

  一個專業的消息隊列,必須要做到兩大塊:

  1. 消息不丟
  2. 消息可堆積

前面討論的重點,很大篇幅圍繞第一點展開。

使用一個消息隊列,其實就分為三大塊:生產者、隊列中間件、消費者

消息是否會發生丟失,其重點也就在於以下 3 個環節:

  1. 生產者會不會丟消息?
  2. 消費者會不會丟消息?
  3. 隊列中間件會不會丟消息?

1) 生產者會不會丟消息?

當生產者在發布消息時,可能發生以下異常情況:

  1. 消息沒發出去:網絡故障或其它問題導致發布失敗,中間件直接返回失敗
  2. 不確定是否發布成功:網絡問題導致發布超時,可能數據已發送成功,但讀取響應結果超時了

如果是情況 1,消息根本沒發出去,那么重新發一次就好了。

如果是情況 2,生產者沒辦法知道消息到底有沒有發成功?所以,為了避免消息丟失,它也只能繼續重試,直到發布成功為止。

生產者一般會設定一個最大重試次數,超過上限依舊失敗,需要記錄日志報警處理。

  也就是說,生產者為了避免消息丟失,只能采用失敗重試的方式來處理。但這也意味着消息可能會重復發送。在使用消息隊列時,要保證消息不丟,寧可重發,也不能丟棄。

  那消費者這邊,就需要多做一些邏輯了。對於敏感業務,當消費者收到重復數據數據時,要設計冪等邏輯,保證業務的正確性。從這個角度來看,生產者會不會丟消息,取決於生產者對於異常情況的處理是否合理。

所以,無論是 Redis 還是專業的隊列中間件,生產者在這一點上都是可以保證消息不丟的。

 

2) 消費者會不會丟消息?

  消費者拿到消息后,還沒處理完成,就異常宕機了,那消費者還能否重新消費失敗的消息?

  要解決這個問題,消費者在處理完消息后,必須告知隊列中間件,隊列中間件才會把標記已處理,否則仍舊把這些數據發給消費者。這種方案需要消費者和中間件互相配合,才能保證消費者這一側的消息不丟。無論是 Redis 的 Stream,還是專業的隊列中間件,例如 RabbitMQ、Kafka,其實都是這么做的。

  所以,從這個角度來看,Redis 也是合格的。

 

3) 隊列中間件會不會丟消息?

  前面 2 個問題都比較好處理,只要客戶端和服務端配合好,就能保證生產端、消費端都不丟消息。

  但是,如果隊列中間件本身就不可靠呢?畢竟生產者和消費這都依賴它,如果它不可靠,那么生產者和消費者無論怎么做,都無法保證數據不丟。

  在這個方面,Redis 其實沒有達到要求。

Redis 在以下 2 個場景下,都會導致數據丟失。

  1. AOF 持久化配置為每秒寫盤,但這個寫盤過程是異步的,Redis 宕機時會存在數據丟失的可能
  2. 主從復制也是異步的,主從切換時,也存在丟失數據的可能(從庫還未同步完成主庫發來的數據,就被提成主庫)

基於以上原因我們可以看到,Redis 本身的無法保證嚴格的數據完整性

  所以,如果把 Redis 當做消息隊列,在這方面是有可能導致數據丟失的。

  再來看那些專業的消息隊列中間件是如何解決這個問題的?

  像 RabbitMQ 或 Kafka 這類專業的隊列中間件,在使用時,一般是部署一個集群,生產者在發布消息時,隊列中間件通常會寫「多個節點」,以此保證消息的完整性。這樣一來,即便其中一個節點掛了,也能保證集群的數據不丟失。也正因為如此,RabbitMQ、Kafka在設計時也更復雜。畢竟,它們是專門針對隊列場景設計的。

  但 Redis 的定位則不同,它的定位更多是當作緩存來用,它們兩者在這個方面肯定是存在差異的。

 

4) 消息積壓怎么辦?

  因為 Redis 的數據都存儲在內存中,這就意味着一旦發生消息積壓,則會導致 Redis 的內存持續增長,如果超過機器內存上限,就會面臨被 OOM 的風險。

  所以,Redis 的 Stream 提供了可以指定隊列最大長度的功能,就是為了避免這種情況發生。

  但 Kafka、RabbitMQ 這類消息隊列就不一樣了,它們的數據都會存儲在磁盤上,磁盤的成本要比內存小得多,當消息積壓時,無非就是多占用一些磁盤空間,相比於內存,在面對積壓時也會更加「坦然」。

綜上,我們可以看到,把 Redis 當作隊列來使用時,始終面臨的 2 個問題:

  1. Redis 本身可能會丟數據
  2. 面對消息積壓,Redis 內存資源緊張

到這里,Redis 是否可以用作隊列,答案應該會比較清晰。

  如果業務場景足夠簡單,對於數據丟失不敏感,而且消息積壓概率比較小的情況下,把 Redis 當作隊列是完全可以的。而且,Redis 相比於 Kafka、RabbitMQ,部署和運維也更加輕量。

  如果業務場景對於數據丟失非常敏感,而且寫入量非常大,消息積壓時會占用很多的機器資源,那最好使用專業的消息隊列中間件。

 

總結

 

 

  關於技術方案選型的問題。

  這篇文章雖然始於 Redis,但並不止於 Redis。

  在分析 Redis 細節時,一直在提出問題,然后尋找更好的解決方案,在文章最后,又聊到一個專業的消息隊列應該怎么做。

  其實,在討論技術選型時,就是一個關於如何取舍的問題。

  在面對技術選型時,不要不經過思考就覺得哪個方案好,哪個方案不好

需要根據具體場景具體分析,這里我把這個分析過程分為 2 個層面:

  1. 業務功能角度
  2. 技術資源角度

這篇文章所講到的內容,都是以業務功能角度出發做決策的。但這里的第二點,從技術資源角度出發,其實也很重要。技術資源的角度是說,你所處的公司環境、技術資源能否匹配這些技術方案

  簡單來講,就是所在的公司、團隊,是否有匹配的資源能 hold 住這些技術方案。都知道 Kafka、RabbitMQ 是非常專業的消息中間件,但它們的部署和運維,相比於 Redis 來說,也會更復雜一些。如果是大公司,公司本身就有優秀的運維團隊,那么使用這些中間件肯定沒問題,因為有足夠優秀的人能 hold 住這些中間件,公司也會投入人力和時間在這個方向上。但如果是一個初創公司,業務正處在快速發展期,暫時沒有能 hold 住這些中間件的團隊和人,如果貿然使用這些組件,當發生故障時,排查問題也會變得很困難,甚至會阻礙業務的發展。而這種情形下,如果公司的技術人員對於 Redis 都很熟,綜合評估來看,Redis 也基本可以滿足業務 90% 的需求,那當下選擇 Redis 未必不是一個好的決策。

  所以,做技術選型不只是技術問題,還與人、團隊、管理、組織結構有關。也正是因為這些原因,在技術選型問題時,會發現每個公司的做法都不相同。畢竟每個公司所處的環境和文化不一樣,做出的決策當然就會各有差異。如果不了解這其中的邏輯,那在做技術選型時,只會趨於表面現象,無法深入到問題根源。而一旦理解了這個邏輯,那么在看待這個問題時,不僅對於技術會有更加深刻認識,對技術資源和人的把握,也會更加清晰。

  希望我們以后在做技術選型時,能夠把這些因素也考慮在內,這對技術成長之路也是非常有幫助的。

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM