RocketMQ詳解(一)原理概覽


專題目錄

RocketMQ詳解(一)原理概覽

RocketMQ詳解(二)安裝使用詳解

RocketMQ詳解(三)啟動運行原理

RocketMQ詳解(四)核心設計原理

RocketMQ詳解(五)總結提高

一、引子

RocketMQ在MQ中的地位毋庸置疑,java開發者的首選、必會中間件。筆者在深度使用后,結合apache官網、github、源碼(版本4.8.0),總結出這個系列文章,供大家參考。本節稍顯枯燥,但是有必要讀。

自學飛機票:

1.rocketMQ官網

2.github RocketMQ   中文文檔

3.參數配置文檔

二、概覽

 RocketMQ是按照典型的生產-消費模型設計的。

2.1 部署架構圖

官網架構圖黑白色太單調,從網上下了張彩圖:

RocketMQ技術架構圖

 如上圖,RocketMQ主要由 Producer生產者、Consumer消費者、Broker代理、NameServer名稱服務器 四部分組成。

1.生產者(Producer)

負責發布消息,支持集群部署。Producer通過負載均衡選擇Broker集群隊列進行消息投遞。

2.消費者(Consumer)

負責消費消息,支持集群部署。支持以push推,pull拉兩種模式對消息進行消費。群組消費支持:集群方式和廣播方式(見2.3)。

3.代理服務器(Broker Server)

消息中轉角色,負責存儲消息、轉發消息。主要包含2個功能:

  • 接收從生產者發送來的消息並存儲、同時為消費者的拉取請求作准備。
  • 存儲消息相關的元數據:消費者組consumer Group、消費進度偏移offset、主題Topic、隊列消息Message Queue等。

4.名字服務(Name Server)

Name Server是Topic路由的注冊中心,支持Broker的動態注冊與發現。生產者或消費者能夠通過名字服務查找各主題相應的Broker IP列表。多個Namesrv實例組成集群,無狀態(每個實例數據一樣)。

 

2.2 名詞解釋

1.主題(Topic)

  表示一類消息的集合,每個主題包含若干條消息,是RocketMQ進行消息訂閱的基本單位。

2.標簽(Tag)

 為消息設置的標志,用於同一topic下區分不同類型的消息。可以根據topic+Tag實現消息的精細化生產和消費。

3.消息(Message)

消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬於一個主題。每個消息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了通過Message ID和Key查詢消息的功能。

4.拉取式消費(Pull Consumer)

  Consumer消費的一種類型,應用通常主動調用Consumer的拉消息方法從Broker服務器拉消息、主動權由應用控制。一旦獲取了批量消息,應用就會啟動消費過程。

5.推動式消費(Push Consumer)

 Consumer消費的一種類型,該模式下Broker收到數據后會主動推送給消費端,該消費模式一般實時性較高

6.生產者組(Producer Group)

  同一類Producer的集合,這類Producer發送同一類消息且發送邏輯一致。如果發送的是事務消息且原始生產者在發送之后崩潰,則Broker服務器會聯系同一生產者組的其他生產者實例以提交或回溯消費

7.消費者組(Consumer Group)

  同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致。要注意的是,消費者組的消費者實例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。

8.集群消費(Clustering)

集群消費模式下,一個消息只能被一個Consumer消費。

9.廣播消費(Broadcasting)

廣播消費模式下,相同Consumer Group的每個Consumer都接收全量的消息。

10.普通順序消息(Normal Ordered Message)

普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。(message queue見2.4 消息存儲模型)

11.嚴格順序消息(Strictly Ordered Message)

嚴格順序消息模式下,消費者收到的所有消息均是有順序的。

 

三、原理

3.1 總技術架構圖

3.2 啟動流程

 

四、特性

作為一個架構師,除了了解原理外還必須清晰知道一款軟件的的功能特性,以便后期技術選型。

4.1 業務特性

Apache RocketMQ上描述了6個業務特性:

  1. 低延遲
  2. 面向金融(可跟蹤)
  3. 行業支撐(萬億級消息驗證)
  4. 標准
  5. 大數據友好
  6. 支持大量消息堆積。

 

4.2 設計特性

Github上描述了12個設計特性。總結的挺好:

1.訂閱與發布

消息的發布是指某個生產者向某個topic發送消息;消息的訂閱是指某個消費者關注了某個topic中帶有某些tag的消息,進而從該topic消費數據。

2.消息順序

消息有序指的是一類消息消費時,能按照發送的順序來消費。順序消息分為全局順序消息與分區順序消息,全局順序是指某個Topic下的所有消息都要保證順序;部分順序消息只要保證每一組消息被順序消費即可。
- 全局順序
對於指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行發布和消費。
適用場景:性能要求不高,所有的消息嚴格按照 FIFO 原則進行消息發布和消費的場景
- 分區順序
對於指定的一個 Topic,所有消息根據 sharding key 進行區塊分區。 同一個分區內的消息按照嚴格的 FIFO 順序進行發布和消費。 Sharding key 是順序消息中用來區分不同分區的關鍵字段,和普通消息的 Key 是完全不同的概念。
適用場景:性能要求高,以 sharding key 作為分區字段,在同一個區塊中嚴格的按照 FIFO 原則進行消息發布和消費的場景。


3.消息過濾

RocketMQ的消費者可以根據Tag進行消息過濾,也支持自定義屬性過濾。消息過濾目前是在Broker端實現的,優點是減少了對於Consumer無用消息的網絡傳輸,缺點是增加了Broker的負擔、而且實現相對復雜。

4.消息可靠性

RocketMQ支持消息的高可靠,影響消息可靠性的幾種情況:
1) Broker非正常關閉
2) Broker異常Crash
3) OS Crash
4) 機器掉電,但是能立即恢復供電情況
5) 機器無法開機(可能是cpu、主板、內存等關鍵設備損壞)
6) 磁盤設備損壞

1)、2)、3)、4) 四種情況都屬於硬件資源可立即恢復情況,RocketMQ在這四種情況下能保證消息不丟,或者丟失少量數據(依賴刷盤方式是同步還是異步)。

5)、6)屬於單點故障,且無法恢復,一旦發生,在此單點上的消息全部丟失。RocketMQ在這兩種情況下,通過異步復制,可保證99%的消息不丟,但是仍然會有極少量的消息可能丟失。通過同步雙寫技術可以完全避免單點,同步雙寫勢必會影響性能,適合對消息可靠性要求極高的場合,例如與Money相關的應用。注:RocketMQ從3.0版本開始支持同步雙寫。

5.至少一次

至少一次(At least Once)指每個消息必須投遞一次。Consumer先Pull消息到本地,消費完成后,才向服務器返回ack,如果沒有消費一定不會ack消息,所以RocketMQ可以很好的支持此特性。

6.回溯消費

回溯消費是指Consumer已經消費成功的消息,由於業務上需求需要重新消費,要支持此功能,Broker在向Consumer投遞成功消息后,消息仍然需要保留。並且重新消費一般是按照時間維度,例如由於Consumer系統故障,恢復后需要重新消費1小時前的數據,那么Broker要提供一種機制,可以按照時間維度來回退消費進度。RocketMQ支持按照時間回溯消費,時間維度精確到毫秒。

7.事務消息

RocketMQ事務消息(Transactional Message)是指應用本地事務和發送消息操作可以被定義到全局事務中,要么同時成功,要么同時失敗。RocketMQ的事務消息提供類似 X/Open XA 的分布事務功能,通過事務消息能達到分布式事務的最終一致。

8.定時消息

定時消息(延遲隊列)是指消息發送到broker后,不會立即被消費,等待特定時間投遞給真正的topic。
broker有配置項messageDelayLevel,默認值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18個level。可以配置自定義messageDelayLevel。注意,messageDelayLevel是broker的屬性,不屬於某個topic。發消息時,設置delayLevel等級即可:msg.setDelayLevel(level)。level有以下三種情況:

- level == 0,消息為非延遲消息
- 1<=level<=maxLevel,消息延遲特定時間,例如level==1,延遲1s
- level > maxLevel,則level== maxLevel,例如level==20,延遲2h

定時消息會暫存在名為SCHEDULE_TOPIC_XXXX的topic中,並根據delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一個queue只存相同延遲的消息,保證具有相同發送延遲的消息能夠順序消費。broker會調度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。

需要注意的是,定時消息會在第一次寫入和調度寫入真實topic時都會計數,因此發送數量、tps都會變高。

9.消息重試(消費者)

Consumer消費消息失敗后,要提供一種重試機制,令消息再消費一次。Consumer消費消息失敗通常可以認為有以下幾種情況:
- 由於消息本身的原因,例如反序列化失敗,消息數據本身無法處理(例如話費充值,當前消息的手機號被注銷,無法充值)等。這種錯誤通常需要跳過這條消息,再消費其它消息,而這條失敗的消息即使立刻重試消費,99%也不成功,所以最好提供一種定時重試機制,即過10秒后再重試。
- 由於依賴的下游應用服務不可用,例如db連接不可用,外系統網絡不可達等。遇到這種錯誤,即使跳過當前失敗的消息,消費其他消息同樣也會報錯。這種情況建議應用sleep 30s,再消費下一條消息,這樣可以減輕Broker重試消息的壓力。

RocketMQ會為每個消費組都設置一個Topic名稱為“%RETRY%+consumerGroup”的重試隊列(這里需要注意的是,這個Topic的重試隊列是針對消費組,而不是針對每個Topic設置的),用於暫時保存因為各種異常而導致Consumer端無法消費的消息。考慮到異常恢復起來需要一些時間,會為重試隊列設置多個重試級別,每個重試級別都有與之對應的重新投遞延時,重試次數越多投遞延時就越大。RocketMQ對於重試消息的處理是先保存至Topic名稱為“SCHEDULE_TOPIC_XXXX”的延遲隊列中,后台定時任務按照對應的時間進行Delay后重新保存至“%RETRY%+consumerGroup”的重試隊列中。

10.消息重投(生產者)

生產者在發送消息時,同步消息失敗會重投,異步消息有重試,oneway沒有任何保證。消息重投保證消息盡可能發送成功、不丟失,但可能會造成消息重復,消息重復在RocketMQ中是無法避免的問題。消息重復在一般情況下不會發生,當出現消息量大、網絡抖動,消息重復就會是大概率事件。另外,生產者主動重發、consumer負載變化也會導致重復消息。如下方法可以設置消息重試策略:

- retryTimesWhenSendFailed:同步發送失敗重投次數,默認為2,因此生產者會最多嘗試發送retryTimesWhenSendFailed + 1次。不會選擇上次失敗的broker,嘗試向其他broker發送,最大程度保證消息不丟。超過重投次數,拋出異常,由客戶端保證消息不丟。當出現RemotingException、MQClientException和部分MQBrokerException時會重投。
- retryTimesWhenSendAsyncFailed:異步發送失敗重試次數,異步重試不會選擇其他broker,僅在同一個broker上做重試,不保證消息不丟。
- retryAnotherBrokerWhenNotStoreOK:消息刷盤(主或備)超時或slave不可用(返回狀態非SEND_OK),是否嘗試發送到其他broker,默認false十分重要消息可以開啟

11.流量控制

生產者流控,因為broker處理能力達到瓶頸;消費者流控,因為消費能力達到瓶頸。

生產者流控:
- commitLog文件被鎖時間超過osPageCacheBusyTimeOutMills時,參數默認為1000ms,返回流控。
- 如果開啟transientStorePoolEnable == true,且broker為異步刷盤的主機,且transientStorePool中資源不足,拒絕當前send請求,返回流控。
- broker每隔10ms檢查send請求隊列頭部請求的等待時間,如果超過waitTimeMillsInSendQueue,默認200ms,拒絕當前send請求,返回流控。
- broker通過拒絕send 請求方式實現流量控制。

注意,生產者流控,不會嘗試消息重投

消費者流控:
- 消費者本地緩存消息數超過pullThresholdForQueue時,默認1000
- 消費者本地緩存消息大小超過pullThresholdSizeForQueue時,默認100MB
- 消費者本地緩存消息跨度超過consumeConcurrentlyMaxSpan時,默認2000

消費者流控的結果是降低拉取頻率

12.死信隊列

死信隊列用於處理無法被正常消費的消息。當一條消息初次消費失敗,消息隊列會自動進行消息重試;達到最大重試次數后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息,此時,消息隊列 不會立刻將消息丟棄,而是將其發送到該消費者對應的特殊隊列中。

RocketMQ將這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message),將存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)。在RocketMQ中,可以通過使用console控制台(官方提供的后台管理WEB界面)對死信隊列中的消息進行重發來使得消費者實例再次進行消費。


總結:這12個特性中,需要關注的有:

 消息順序、消息過濾、事務消息、消息重試(消費者)、消息重投(生產者)、流量控制、死信隊列。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM