RocketMQ之消息中間件需要解決的問題


消息中間件需要解決哪些問題

1.Publish/Subscribe(發布訂閱)

發布訂閱是消息中間件最基本的功能

2.Message Priority(消息優先級)

在消息隊列中,每條消息都有不同的優先級,優先級高的先投遞。

由於rocketmq的所有消息都是持久化的,按照優先級排序開銷會非常大,所以不支持持久化。但是可以配置一個優先級高的隊列和一個普通的隊列,將不同的消息發送到不同的隊列。

優先級問題可以分為兩類:

  1. 只要達到優先級目的即可,不需要嚴格划分優先級。通常將優先級划分為高、中、低等幾個等級。每個優先級用不同的topic表示。發送消息通過不同的topic來表示優先級。缺點是對業務的優先級做了妥協。
  2. 嚴格的優先級。如果讓MQ解決此問題,會對MQ的性能造成很多影響。這里要確保一點,業務上是否確實需 要這種嚴格的優先級,如果將優先級壓縮成幾個,對業務的影響有多大?

3.Message Order(消息有序)

消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了 3 條消息,分別是訂單創 建,訂單付款,訂單完成。消費時,要按照這個順序消費才能有意義。但是同時訂單之間是可以並行消費的。

rocketmq嚴格保證消息有序性。

4.Message Filter(消息過濾)

  • Broker端消息過濾

    在broker中按照consumer的要求過濾,優點是減少了對應consumer的無用消息的傳輸。但增加了broker的負擔,使得實現變復雜。

  • Consumer端消息過濾

    這種過濾方式可由應用完全自定義實現,但是缺點是很多無用的消息要傳輸到 consumer端。

5.Message Persistence(消息持久化)

消息中間件通常采用幾種方式持久化:

  1. 持久化到數據庫
  2. 持久化到KV存儲
  3. 文件記錄形式的持久化,例如kafka、rocketmq
  4. 對內存數據做持久化鏡像

前三種持久化方式都具有將內存隊列 buffer 進行擴展的能力,后一種則當broker掛掉重啟后仍然能將之前內存的數據恢復出來。

rocketmq參考了kafka的持久化方式,充分利用Linux文件系統內存cache來提高性能。

6.Message Reliablity(消息可靠性)

影響消息可靠性的幾種情況:

  1. broker正常關閉
  2. broker異常crash
  3. OS crash
  4. 機器掉電,但能立即恢復供電情況
  5. 機器不能開機(硬件損壞)
  6. 磁盤設備損壞

前面1-4四種情況都屬於硬件資源可立即恢復的情況。rocketmq在這四種情況下能保證消息不丟,或丟失少量數據(依賴刷盤方式是同步還是異步)。

5-6兩種情況屬於單點故障,且不能恢復,一旦發生,在此單點上的消息全部丟失。rocketmq在這兩種情況下,通過異步復制,可保證99%的消息不丟。通過同步雙寫技術可以完全避免單點,但會影響性能,適合對消息可靠性要求極高的場景,如與錢相關的應用。

7.Low Latency Messaging(低延遲)

在消息不堆積情況下,消息到達 broker 后,能立刻到達 consumer。

rocketmq使用長輪詢 pulll 方式,可保證消息非常實時,消息實時性不低於 push。

8.At least Once(至少一次) 

指每個消息必須投遞一次

rocketmq的consumer先pull消息到本地,消息完成后,才向服務器返回ask。如果沒有消費,一定不會ask消息。

9.Exactly Only Once(只有一次)

  • 發送消息階段不允許重復發送
  • 消費消息階段不允許重復消費

只有兩個條件都滿足,才能認為消息是去除的。而要實現以上兩點,在分布式環境下,無疑會產生巨大開銷。

rocketmq並不保證此特性,而是要求在業務上去重,即消費消息做到冪等性。

10.Broker 的 Buffer

broker中的buffer通常指broker中一個隊列中的內存buffer大小。如果buffer滿了怎么辦?

CORBA Notification 規范中處理方式:

  • RejectNewEvents:拒絕新來的消息,向producer返回錯誤碼
  • 按照特定策略丟棄已有消息

rockermq的隊列都是持久化磁盤,buffer大小是磁盤容量,且數據定期清理。

11.回溯消費

回溯消息指consumer成功消費的消息由於業務上的需求,需要重新消費。要支持此功能,broker在向consumer投遞消息后,消息仍要保留。。並且重新消費一般是按照時間維度,例如由於 Consumer 系統故障, 恢復后需要重新消費 1 小時前的數據,那么 Broker 要提供一種機制,可以按照時間維度來回退消費進度。

rocketmq支持按照時間回溯消息,時間維度精確到毫秒,可以向前回溯,也可以向后回溯。

12.消息堆積

消息中間件的主要功能是異步解耦,擋住前端的數據洪峰,保證后端系統的穩定性。這要求消息中間件具有一定的消息堆積能力。

消息堆積分兩種:

  • 一種是消息堆積在內存buffer中,一旦超過內存buffer,可以根據丟棄策略來丟棄消息。這種消息堆積能力主要在於內存buffer的大小。且消息堆積后,性能下降不會太大。因為內存中數據多少對於對外提供的訪問能力影響有限。
  • 第二種是消息堆積在持久化存儲系統中,如:數據庫,KV存儲,文件記錄形式。當消息不能在內存cache中命中時,要不可避免的訪問磁盤,從而產生大量讀IO,讀IO的吞吐量直接決定了消息堆積后的訪問能力

13.分布式事務

rocketmq采用第一階段發送Prepared消息時,拿到消息的offset,第二階段通過offset訪問消息,並修改狀態。offset就是數據的地址。

rocketmq實現事務方式,沒有通過KV存儲做,而是通過offset方式,存在一個顯著缺陷,即通過offset更改數據,會令系統的臟頁過多。

14.定時消息

定時消息指消息放到broker后,不能立即被consumer消費,需到特定時間點或等待特定時間后才能被消費。

如果支持任意時間精度,在broker層,就必須做消息排序,涉及到持久化,排序就會產生大量性能開銷。

rocketmq支持定時消息,但不支持任意精度,只支持特定level,如:5s,10s,1m等

15.消息重試

consumer消費消息失敗后,要提供一種重試機制,令消息再消費一次。

consumer消費消息失敗有幾種情況:

  1. 由於消息本身原因,如反序列化失敗,消息數據本身沒法處理(話費充值,當前手機號被注銷,不能充值)等。這種錯誤通常需要跳過這條消息,再消費其他消息,這條失敗的消息即使立刻重試消費,基本不可能成功,所以最好提供一種定時重試機制。
  2. 由於依賴的下游應用用服務不可用,如數據庫連接不可用,網絡原因等。這種錯誤即使跳過當前失敗的消息,消費其他消息同樣會報錯。

這種情況建議應用sleep 30s,再消費下條消息,從而減輕broker重試消息的壓力。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM