前面我們介紹了消息中間件的優勢和選型,但是選擇消息中間件時還需要考慮幾個問題,支持的消息類型、如何保證消息不丟失、
消息冪等性的保證,下面我們逐個介紹:
1 常見消息類型
常見的消息類型包括無序消息、有序消息和延時消息三種。
1.1 無序消息
概念:無序消息即沒有順序的消息,具體的:producer 只管發送消息,consumer 只管接收消息,消息和消息之間的順序並沒有保證,可能先發送的消息先消費,也可能先發送的消息后消費。舉個簡單例子,producer 依次發送 order id 為 1、2、3 的消息到 broker,consumer 接到的消息順序有可能是 1、2、3,也有可能是 2、1、3 等情況,這就是普通消息。
優勢:因為不需要保證消息的順序,所以消息可以大規模並發地發送和消費,吞吐量很高,適合大部分場景。
1.2 有序消息
概念:按照一定的先后順序的消息類型,舉個例子,producer 依次發送 order id 為 1、2、3 的消息到 broker,consumer 接到的消息順序也就是 1、2、3 ,而不會出現普通消息那樣的 2、1、3 等情況。但是RocketMQ只是保證的是消息發送有序,無法保證消費有序。
分類:有序消息分為全局有序消息和局部有序消息兩種類型。
對於全局有序消息,將對應該類型的topic的queue數變為1,這樣一來,只要 producer 按照 1、2、3、4 的順序去發送消息,那么 consumer 自然也就按照 1、2、3、4 的順序去消費,這就是全局有序消息。但由於一個topic只有一個queue,即使我們有多個producer實例和consumer實例也很難提高消息吞吐量,效率低下。
對於局部有序消息,實際上可看作吞吐量和有序之間的折中方案,以訂單消息為例,訂單消息可以再細分為訂單創建、訂單付款、訂單完成等消息,這些消息都有相同的 orderId。對於某一個orderId只有順序處理才符合業務邏輯。但不同orderId的消息是可以並行的,不會影響到業務。這時候就常見做法就是將orderId進行處理,將orderId相同的消息發送到topicB的同一個queue,不同orderId的消息發送到不同的queue中,從而實現局部有序。具體的,假設我們topicB有2個queue,那么我們可以對id取余,奇數的發往queue0,偶數的發往queue1,消費者按照queue消費時,就能保證queue0里面的消息有序消費,queue1里面的消息有序消費。由於一個topic可以有多個queue,假設queue數是n,理論上性能就是全局有序的n倍。
1.3 延時消息
概念:延時消息,簡單來說就是當producer將消息發送到broker后,會延時一定時間后才投遞給consumer進行消費。RcoketMQ的延時等級為:1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。level=0,表示不延時。level=1,表示 1 級延時,對應延時 1s。level=2 表示 2 級延時,對應5s,以此類推。
場景:適用於消息生產和消費之間有時間窗口要求的場景。比如說我們網購時,下單之后是有一個支付時間,超過這個時間未支付,系統就應該自動關閉該筆訂單。那么在訂單創建的時候就會就需要發送一條延時消息(延時15分鍾)后投遞給consumer,consumer接收消息后再對訂單的支付狀態進行判斷是否關閉訂單。
實現:Broker將延時消息以指定topic(SCHEDULE_TOPIC_XXXX)將消息進行持久化,同時定時任務ScheduleMessageService通過不斷讀取該topic的queueId,判斷延時時間達到后進行消息的還原處理,當消息被還原后就可以被消費者消費。
2 消息冪等性
正常情況下,消費者消費完畢后,會發送一個確認消息給消息隊列,消息隊列就知道該消息被消費了,就會將該消息從消息隊列中刪除。RocketMQ返回一個CONSUME_SUCCESS成功標志,讓消息隊列知道自己已經消費過了。消息冪等性實際上是保證消息不被重復消費,但是消息的冪等性無法由消息側來保證,應該由業務方結合具體的業務來保證,具體分析如下:
- 發送消息后服務端應答過程中發生網絡閃斷,發送端意識到發送失敗再次推送消息;
- 訂閱消息收到后應答過程中發生網絡閃斷,服務端意識到消費失敗再次推送消息;
- RocketMQ的broker或客戶端重啟、擴容時會觸發Rebalance,此時訂閱端可能會收到重復消息
業務方保證消息冪等的建議:以業務的唯一標識Msg key作為冪等處理的依據,將收到消息的Msg key與所存儲的Msg key集合做對比,如果不存在則接收消息並存儲Msg Key,否則則忽略消息。
3 消息重試機制
消息重試機制的目的是為了避免消息丟失。要掌握重試機制,需要先了解重試隊列和死信隊列,如下:
- 重試隊列:如果consumer端因各種異常導致本次消費失敗,為防止該消息丟失而需要將其重新保存到broker中,保存這種因異常無法正常消費的消息隊列稱為重試隊列,重試隊列的名稱有特定規則:%RETRY%+GroupID,需要注意的是重試隊列是針對Group級別的而非topic級別的。但是考慮到消息重試有一定的時間間隔,重試次數越多投遞延時越大(默認重試隊列數為1個,最大重試次數為16次),因此,RocketMQ對重試消息的處理是先保存至Topic名為"SCHEDULETOPIXXX"的延遲隊列中,后台定時任務按照對應的時間進行Delay后重新保存至對應重試隊列中投遞。
- 死信隊列:當超過重試次數后若消息仍然無法被消費,為了避免消息不會被無故的丟失,會將消息保存到死信隊列中。其中,死信隊列是針對Group級別的,命名規則為:"%DLQ%+GroupId"
以上闡述的是消費端相關的重試機制,實際上,RocketMQ支持生產端和消費端兩類重試機制,具體的:
- 生產端:如果由於網絡抖動等原因,生產端程向Broker發送消息時沒有成功,即發送超時期間沒有收到Broker的ACK,此時RocketMQ會自動進行重試。
- 消費端:消費者消費消息后,需要給Broker返回消費狀態(SUCCESS/LATER)。消費端重試包括異常重試和超時重試兩種方式,其中,異常重試指的是消費端邏輯出現異常導致返回LATER的狀態,那么broker就會在一段時間后嘗試重試,超時重試指的是consumer端處理時間太長,broker認為consumer消費超時,此時會發起超時重試。