1. 概念和特性
概念:介紹RocketMQ的基本概念模型
1 消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。
Broker 在實際部署過程中對應一台服務器,每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲於不同的 Broker。
Message Queue 用於存儲消息的物理地址,每個Topic中的消息地址存儲於多個 Message Queue 中。ConsumerGroup 由多個Consumer 實例構成。
2 消息生產者(Producer)
負責生產消息,一般由業務系統負責生產消息。
一個消息生產者會把業務應用系統里產生的消息發送到broker服務器。
RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。
同步和異步方式均需要Broker返回確認信息,單向發送不需要。
3 消息消費者(Consumer)
負責消費消息,一般是后台系統負責異步消費。
一個消息消費者會從Broker服務器拉取消息、並將其提供給應用程序。
從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。
4 主題(Topic)
表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬於一個主題,是RocketMQ進行消息訂閱的基本單位。
5 代理服務器(Broker Server)
消息中轉角色,負責存儲消息、轉發消息。
代理服務器在RocketMQ系統中負責接收從生產者發送來的消息並存儲、同時為消費者的拉取請求作准備。
代理服務器也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。
6 名字服務(Name Server)
名稱服務充當路由消息的提供者。
生產者或消費者能夠通過名字服務查找各主題相應的Broker IP列表。
多個Namesrv實例組成集群,但相互獨立,沒有信息交換。
7 拉取式消費(Pull Consumer)
Consumer消費的一種類型,應用通常主動調用Consumer的拉消息方法從Broker服務器拉消息、主動權由應用控制。一旦獲取了批量消息,應用就會啟動消費過程。
8 推動式消費(Push Consumer)
Consumer消費的一種類型,該模式下Broker收到數據后會主動推送給消費端,該消費模式一般實時性較高。
9 生產者組(Producer Group)
同一類Producer的集合,這類Producer發送同一類消息且發送邏輯一致。
如果發送的是事務消息且原始生產者在發送之后崩潰,則Broker服務器會聯系同一生產者組的其他生產者實例以提交或回溯消費。
10 消費者組(Consumer Group)
同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致。
消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。
要注意的是,消費者組的消費者實例必須訂閱完全相同的Topic。
RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。
11 集群消費(Clustering)
集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤消息。
12 廣播消費(Broadcasting)
廣播消費模式下,相同Consumer Group的每個Consumer實例都接收全量的消息。
13 普通順序消息(Normal Ordered Message)
普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。
14 嚴格順序消息(Strictly Ordered Message)
嚴格順序消息模式下,消費者收到的所有消息均是有順序的。
15 消息(Message)
消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬於一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。
系統提供了通過Message ID和Key查詢消息的功能。
16 標簽(Tag)
為消息設置的標志,用於同一主題下區分不同類型的消息。
來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標簽。
標簽能夠有效地保持代碼的清晰度和連貫性,並優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴展性。
特性:介紹RocketMQ實現的功能特性
1 訂閱與發布
消息的發布是指某個生產者向某個topic發送消息;
消息的訂閱是指某個消費者關注了某個topic中帶有某些tag的消息,進而從該topic消費數據。
2 消息順序
消息有序指的是一類消息消費時,能按照發送的順序來消費。
例如:一個訂單產生了三條消息分別是訂單創建、訂單付款、訂單完成。消費時要按照這個順序消費才能有意義,但是同時訂單之間是可以並行消費的。RocketMQ可以嚴格的保證消息有序。
順序消息分為全局順序消息與分區順序消息,全局順序是指某個Topic下的所有消息都要保證順序;部分順序消息只要保證每一組消息被順序消費即可。
- 全局順序
對於指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行發布和消費。
適用場景:性能要求不高,所有的消息嚴格按照 FIFO 原則進行消息發布和消費的場景 - 分區順序
對於指定的一個 Topic,所有消息根據 sharding key 進行區塊分區。 同一個分區內的消息按照嚴格的 FIFO 順序進行發布和消費。 Sharding key 是順序消息中用來區分不同分區的關鍵字段,和普通消息的 Key 是完全不同的概念。
適用場景:性能要求高,以 sharding key 作為分區字段,在同一個區塊中嚴格的按照 FIFO 原則進行消息發布和消費的場景。
3 消息過濾
RocketMQ的消費者可以根據Tag進行消息過濾,也支持自定義屬性過濾。
消息過濾目前是在Broker端實現的,優點是減少了對於Consumer無用消息的網絡傳輸,缺點是增加了Broker的負擔、而且實現相對復雜。
4 消息可靠性
RocketMQ支持消息的高可靠,影響消息可靠性的幾種情況:
- Broker非正常關閉
- Broker異常Crash
- OS Crash
- 機器掉電,但是能立即恢復供電情況
- 機器無法開機(可能是cpu、主板、內存等關鍵設備損壞)
- 磁盤設備損壞
1)、2)、3)、4) 四種情況都屬於硬件資源可立即恢復情況,RocketMQ在這四種情況下能保證消息不丟,或者丟失少量數據(依賴刷盤方式是同步還是異步)。
5)、6)屬於單點故障,且無法恢復,一旦發生,在此單點上的消息全部丟失。RocketMQ在這兩種情況下,通過異步復制,可保證99%的消息不丟,但是仍然會有極少量的消息可能丟失。通過同步雙寫技術可以完全避免單點,同步雙寫勢必會影響性能,適合對消息可靠性要求極高的場合,例如與Money相關的應用。注:RocketMQ從3.0版本開始支持同步雙寫。
5 至少一次
至少一次(At least Once)指每個消息必須投遞一次。
Consumer先Pull消息到本地,消費完成后,才向服務器返回ack,如果沒有消費一定不會ack消息,所以RocketMQ可以很好的支持此特性。
沒理解
6 回溯消費
回溯消費是指Consumer已經消費成功的消息,由於業務上需求需要重新消費,要支持此功能,Broker在向Consumer投遞成功消息后,消息仍然需要保留。並且重新消費一般是按照時間維度,例如由於Consumer系統故障,恢復后需要重新消費1小時前的數據,那么Broker要提供一種機制,可以按照時間維度來回退消費進度。RocketMQ支持按照時間回溯消費,時間維度精確到毫秒。
消費成功的消息,業務上如果需要重新消費,也是支持的。比如重新消費1小時前的數據。支持按照時間回溯消費,時間維度精確到毫秒。
7 事務消息
RocketMQ事務消息(Transactional Message)是指應用本地事務和發送消息操作可以被定義到全局事務中,要么同時成功,要么同時失敗。
RocketMQ的事務消息提供類似 X/Open XA 的分布事務功能,通過事務消息能達到分布式事務的最終一致。
8 定時消息
定時消息(延遲隊列)是指消息發送到broker后,不會立即被消費,等待特定時間投遞給真正的topic。
broker有配置項messageDelayLevel,默認值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18個level。可以配置自定義messageDelayLevel。注意,messageDelayLevel是broker的屬性,不屬於某個topic。發消息時,設置delayLevel等級即可:msg.setDelayLevel(level)。level有以下三種情況:
- level == 0,消息為非延遲消息
- 1<=level<=maxLevel,消息延遲特定時間,例如level==1,延遲1s
- level > maxLevel,則level== maxLevel,例如level==20,延遲2h
定時消息會暫存在名為SCHEDULE_TOPIC_XXXX的topic中,並根據delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一個queue只存相同延遲的消息,保證具有相同發送延遲的消息能夠順序消費。
broker會調度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。
需要注意的是,定時消息會在第一次寫入和調度寫入真實topic時都會計數,因此發送數量、tps都會變高。
9 消息重試
Consumer消費消息失敗后,要提供一種重試機制,令消息再消費一次。Consumer消費消息失敗通常可以認為有以下幾種情況:
- 由於消息本身的原因,例如反序列化失敗,消息數據本身無法處理(例如話費充值,當前消息的手機號被注銷,無法充值)等。這種錯誤通常需要跳過這條消息,再消費其它消息,而這條失敗的消息即使立刻重試消費,99%也不成功,所以最好提供一種定時重試機制,即過10秒后再重試。(跳過失敗的消息,消費下一條消息,失敗的消息定時重試)
- 由於依賴的下游應用服務不可用,例如db連接不可用,外系統網絡不可達等。遇到這種錯誤,即使跳過當前失敗的消息,消費其他消息同樣也會報錯。這種情況建議應用sleep 30s,再消費下一條消息,這樣可以減輕Broker重試消息的壓力。(應用休眠30秒,再消費下一條消息。怎么理解休眠30秒?)
RocketMQ會為每個消費組都設置一個Topic名稱為“%RETRY%+consumerGroup”的重試隊列(這里需要注意的是,這個Topic的重試隊列是針對消費組,而不是針對每個Topic設置的),用於暫時保存因為各種異常而導致Consumer端無法消費的消息。考慮到異常恢復起來需要一些時間,會為重試隊列設置多個重試級別,每個重試級別都有與之對應的重新投遞延時,重試次數越多投遞延時就越大。RocketMQ對於重試消息的處理是先保存至Topic名稱為“SCHEDULE_TOPIC_XXXX”的延遲隊列中,后台定時任務按照對應的時間進行Delay后重新保存至“%RETRY%+consumerGroup”的重試隊列中。
消費者消費消息支持重試。
RocketMQ為每個消費組設置一個重試隊列,用於暫時保存因為各種異常導致的Consumer端無法消費的消息。
具體做法是,先將消息保存到延遲隊列,延遲時間到了再重新保存到重試隊列
10 消息重投
生產者在發送消息時,同步消息失敗會重投,異步消息有重試,oneway沒有任何保證。
消息重投保證消息盡可能發送成功、不丟失,但可能會造成消息重復,消息重復在RocketMQ中是無法避免的問題。
消息重復在一般情況下不會發生,當出現消息量大、網絡抖動,消息重復就會是大概率事件。
另外,生產者主動重發、consumer負載變化也會導致重復消息。如下方法可以設置消息重試策略:
- retryTimesWhenSendFailed:同步發送失敗重投次數,默認為2,因此生產者會最多嘗試發送retryTimesWhenSendFailed + 1次。不會選擇上次失敗的broker,嘗試向其他broker發送,最大程度保證消息不丟。超過重投次數,拋出異常,由客戶端保證消息不丟。當出現RemotingException、MQClientException和部分MQBrokerException時會重投。
- retryTimesWhenSendAsyncFailed:異步發送失敗重試次數,異步重試不會選擇其他broker,僅在同一個broker上做重試,不保證消息不丟。
- retryAnotherBrokerWhenNotStoreOK:消息刷盤(主或備)超時或slave不可用(返回狀態非SEND_OK),是否嘗試發送到其他broker,默認false。十分重要消息可以開啟。(消息刷盤是為了做什么?持久化。不嘗試發給其他broker會有什么影響?持久化失敗時,brocker如果單點故障,也不嘗試發給其他brocker,勢必會導致部分消息故障后無法恢復到brocker。)
11 流量控制
生產者流控,因為broker處理能力達到瓶頸;消費者流控,因為消費能力達到瓶頸。
生產者流控:
- commitLog文件被鎖時間超過osPageCacheBusyTimeOutMills時,參數默認為1000ms,返回流控。
- 如果開啟transientStorePoolEnable == true,且broker為異步刷盤的主機,且transientStorePool中資源不足,拒絕當前send請求,返回流控。
- broker每隔10ms檢查send請求隊列頭部請求的等待時間,如果超過waitTimeMillsInSendQueue,默認200ms,拒絕當前send請求,返回流控。
- broker通過拒絕send 請求方式實現流量控制。
注意,生產者流控,不會嘗試消息重投。(即被流控擋住未發的消息就不會發了,不保證再次發送。)
消費者流控:
- 消費者本地緩存消息數超過pullThresholdForQueue時,默認1000。
- 消費者本地緩存消息大小超過pullThresholdSizeForQueue時,默認100MB。
- 消費者本地緩存消息跨度超過consumeConcurrentlyMaxSpan時,默認2000。
消費者流控的結果是降低拉取頻率。
12 死信隊列
死信隊列用於處理無法被正常消費的消息。
當一條消息初次消費失敗,消息隊列會自動進行消息重試;達到最大重試次數后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息,
此時,消息隊列 不會立刻將消息丟棄,而是將其發送到該消費者對應的特殊隊列中。
RocketMQ將這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message),將存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)。
在RocketMQ中,可以通過使用console控制台對死信隊列中的消息進行重發來使得消費者實例再次進行消費。(目的是為了盡可能保障消息能被消費)
2. 架構設計
1 技術架構
RocketMQ架構上主要分為四部分,如上圖所示:
-
Producer:消息發布的角色,支持分布式集群方式部署。Producer通過MQ的負載均衡模塊選擇相應的Broker集群隊列進行消息投遞,投遞的過程支持快速失敗並且低延遲。
-
Consumer:消息消費的角色,支持分布式集群方式部署。支持以push推,pull拉兩種模式對消息進行消費。同時也支持集群方式和廣播方式的消費,它提供實時消息訂閱機制,可以滿足大多數用戶的需求。
-
NameServer:NameServer是一個非常簡單的Topic路由注冊中心,其角色類似Dubbo中的zookeeper,支持Broker的動態注冊與發現。主要包括兩個功能:Broker管理,NameServer接受Broker集群的注冊信息並且保存下來作為路由信息的基本數據。然后提供心跳檢測機制,檢查Broker是否還存活;路由信息管理,每個NameServer將保存關於Broker集群的整個路由信息和用於客戶端查詢的隊列信息。然后Producer和Conumser通過NameServer就可以知道整個Broker集群的路由信息,從而進行消息的投遞和消費。NameServer通常也是集群的方式部署,各實例間相互不進行信息通訊。Broker是向每一台NameServer注冊自己的路由信息,所以每一個NameServer實例上面都保存一份完整的路由信息。當某個NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以動態感知Broker的路由的信息。
-
BrokerServer:Broker主要負責消息的存儲、投遞和查詢以及服務高可用保證,為了實現這些功能,Broker包含了以下幾個重要子模塊。
- Remoting Module:整個Broker的實體,負責處理來自clients端的請求。
- Client Manager:負責管理客戶端(Producer/Consumer)和維護Consumer的Topic訂閱信息
- Store Service:提供方便簡單的API接口處理消息存儲到物理硬盤和查詢功能。
- HA Service:高可用服務,提供Master Broker 和 Slave Broker之間的數據同步功能。
- Index Service:根據特定的Message key對投遞到Broker的消息進行索引服務,以提供消息的快速查詢。
2 部署架構
RocketMQ 網絡部署特點
-
NameServer是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。
-
Broker部署相對復雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave 的對應關系通過指定相同的BrokerName,不同的BrokerId 來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有NameServer。 注意:當前RocketMQ版本在部署架構上支持一Master多Slave,但只有BrokerId=1的從服務器才會參與消息的讀負載。
-
Producer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer獲取Topic路由信息,並向提供Topic 服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。
-
Consumer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer獲取Topic路由信息,並向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,消費者在向Master拉取消息時,Master服務器會根據拉取偏移量與最大偏移量的距離(判斷是否讀老消息,產生讀I/O),以及從服務器是否可讀等因素建議下一次是從Master還是Slave拉取。
結合部署架構圖,描述集群工作流程:
- 啟動NameServer,NameServer起來后監聽端口,等待Broker、Producer、Consumer連上來,相當於一個路由控制中心。
- Broker啟動,跟所有的NameServer保持長連接,定時發送心跳包。心跳包中包含當前Broker信息(IP+端口等)以及存儲所有Topic信息。注冊成功后,NameServer集群中就有Topic跟Broker的映射關系。
- 收發消息前,先創建Topic,創建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動創建Topic。
- Producer發送消息,啟動時先跟NameServer集群中的其中一台建立長連接,並從NameServer中獲取當前發送的Topic存在哪些Broker上,輪詢從隊列列表中選擇一個隊列,然后與隊列所在的Broker建立長連接從而向Broker發消息。
- Consumer跟Producer類似,跟其中一台NameServer建立長連接,獲取當前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道,開始消費消息。
設計
1 消息存儲
消息存儲是RocketMQ中最為復雜和最為重要的一部分,本節將分別從RocketMQ的消息存儲整體架構、PageCache與Mmap內存映射以及RocketMQ中兩種不同的刷盤方式三方面來分別展開敘述。
1.1 消息存儲整體架構
消息存儲架構圖中主要有下面三個跟消息存儲相關的文件構成。
(1) CommitLog:消息主體以及元數據的存儲主體,存儲Producer端寫入的消息主體內容,消息內容不是定長的。單個文件大小默認1G ,文件名長度為20位,左邊補零,剩余為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當第一個文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序寫入日志文件,當文件滿了,寫入下一個文件;
(2) ConsumeQueue:消息消費隊列,引入的目的主要是提高消息消費的性能,由於RocketMQ是基於主題topic的訂閱模式,消息消費是針對主題進行的,如果要遍歷commitlog文件中根據topic檢索消息是非常低效的。Consumer即可根據ConsumeQueue來查找待消費的消息。其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
consumequeue文件可以看成是基於topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結構,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue文件采取定長設計,每一個條目共20個字節,分別為8字節的commitlog物理偏移量、4字節的消息長度、8字節tag hashcode,單個文件由30W個條目組成,可以像數組一樣隨機訪問每一個條目,每個ConsumeQueue文件大小約5.72M;
(3) IndexFile:IndexFile(索引文件)提供了一種可以通過key或時間區間來查詢消息的方法。Index文件的存儲位置是:$HOME \store\index${fileName},文件名fileName是以創建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設計為在文件系統中實現HashMap結構,故rocketmq的索引文件其底層實現為hash索引。
在上面的RocketMQ的消息存儲整體架構圖中可以看出,RocketMQ采用的是混合型的存儲結構,即為Broker單個實例下所有的隊列共用一個日志數據文件(即為CommitLog)來存儲。
RocketMQ的混合型存儲結構(多個Topic的消息實體內容都存儲於一個CommitLog中)針對Producer和Consumer分別采用了數據和索引部分相分離的存儲結構,Producer發送消息至Broker端,然后Broker端使用同步或者異步的方式對消息刷盤持久化,保存至CommitLog中。
只要消息被刷盤持久化至磁盤文件CommitLog中,那么Producer發送的消息就不會丟失。正因為如此,Consumer也就肯定有機會去消費這條消息。
當無法拉取到消息后,可以等下一次消息拉取,同時服務端也支持長輪詢模式,如果一個消息拉取請求未拉取到消息,Broker允許等待30s的時間,只要這段時間內有新消息到達,將直接返回給消費端。
這里,RocketMQ的具體做法是,使用Broker端的后台服務線程—ReputMessageService不停地分發請求並異步構建ConsumeQueue(邏輯消費隊列)和IndexFile(索引文件)數據。
1.2 頁緩存與內存映射
頁緩存(PageCache)是OS對文件的緩存,用於加速對文件的讀寫。
一般來說,程序對文件進行順序讀寫的速度幾乎接近於內存的讀寫速度,主要原因就是由於OS使用PageCache機制對讀寫訪問操作進行了性能優化,將一部分的內存用作PageCache。
對於數據的寫入,OS會先寫入至Cache內,隨后通過異步的方式由pdflush內核線程將Cache內的數據刷盤至物理磁盤上。對於數據的讀取,如果一次讀取文件時出現未命中PageCache的情況,OS從物理磁盤上訪問讀取文件的同時,會順序對其他相鄰塊的數據文件進行預讀取。
在RocketMQ中,ConsumeQueue邏輯消費隊列存儲的數據較少,並且是順序讀取,在page cache機制的預讀取作用下,Consume Queue文件的讀性能幾乎接近讀內存,即使在有消息堆積情況下也不會影響性能。而對於CommitLog消息存儲的日志數據文件來說,讀取消息內容時候會產生較多的隨機訪問讀取,嚴重影響性能。如果選擇合適的系統IO調度算法,比如設置調度算法為“Deadline”(此時塊存儲采用SSD的話),隨機讀的性能也會有所提升。
另外,RocketMQ主要通過MappedByteBuffer對文件進行讀寫操作。其中,利用了NIO中的FileChannel模型將磁盤上的物理文件直接映射到用戶態的內存地址中(這種Mmap的方式減少了傳統IO將磁盤文件數據在操作系統內核地址空間的緩沖區和用戶應用程序地址空間的緩沖區之間來回進行拷貝的性能開銷),將對文件的操作轉化為直接對內存地址進行操作,從而極大地提高了文件的讀寫效率(正因為需要使用內存映射機制,故RocketMQ的文件存儲都使用定長結構來存儲,方便一次將整個文件映射至內存)。
1.3 消息刷盤
(1) 同步刷盤:如上圖所示,只有在消息真正持久化至磁盤后RocketMQ的Broker端才會真正返回給Producer端一個成功的ACK響應。同步刷盤對MQ消息可靠性來說是一種不錯的保障,但是性能上會有較大影響,一般適用於金融業務應用該模式較多。
(2) 異步刷盤:能夠充分利用OS的PageCache的優勢,只要消息寫入PageCache即可將成功的ACK返回給Producer端。消息刷盤采用后台異步線程提交的方式進行,降低了讀寫延遲,提高了MQ的性能和吞吐量。
2 通信機制
RocketMQ消息隊列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4個角色,基本通訊流程如下:
(1) Broker啟動后需要完成一次將自己注冊至NameServer的操作;隨后每隔30s時間定時向NameServer上報Topic路由信息。
(2) 消息生產者Producer作為客戶端發送消息時候,需要根據消息的Topic從本地緩存的TopicPublishInfoTable獲取路由信息。如果沒有則更新路由信息會從NameServer上重新拉取,同時Producer會默認每隔30s向NameServer拉取一次路由信息。
(3) 消息生產者Producer根據2)中獲取的路由信息選擇一個隊列(MessageQueue)進行消息發送;Broker作為消息的接收者接收消息並落盤存儲。
(4) 消息消費者Consumer根據2)中獲取的路由信息,並再完成客戶端的負載均衡后,選擇其中的某一個或者某幾個消息隊列來拉取消息並進行消費。
從上面1)~3)中可以看出在消息生產者, Broker和NameServer之間都會發生通信(這里只說了MQ的部分通信),因此如何設計一個良好的網絡通信模塊在MQ中至關重要,它將決定RocketMQ集群整體的消息傳輸能力與最終的性能。
rocketmq-remoting 模塊是 RocketMQ消息隊列中負責網絡通信的模塊,它幾乎被其他所有需要網絡通信的模塊(諸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依賴和引用。為了實現客戶端與服務器之間高效的數據請求與接收,RocketMQ消息隊列自定義了通信協議並在Netty的基礎之上擴展了通信模塊。
2.1 Remoting通信類結構
2.2 協議設計與編解碼
在Client和Server之間完成一次消息發送時,需要對發送的消息進行一個協議約定,因此就有必要自定義RocketMQ的消息協議。同時,為了高效地在網絡中傳輸消息和對收到的消息讀取,就需要對消息進行編解碼。在RocketMQ中,RemotingCommand這個類在消息傳輸過程中對所有數據內容的封裝,不但包含了所有的數據結構,還包含了編碼解碼操作。
Header字段 | 類型 | Request說明 | Response說明 |
---|---|---|---|
code | int | 請求操作碼,應答方根據不同的請求碼進行不同的業務處理 | 應答響應碼。0表示成功,非0則表示各種錯誤 |
language | LanguageCode | 請求方實現的語言 | 應答方實現的語言 |
version | int | 請求方程序的版本 | 應答方程序的版本 |
opaque | int | 相當於requestId,在同一個連接上的不同請求標識碼,與響應消息中的相對應 | 應答不做修改直接返回 |
flag | int | 區分是普通RPC還是onewayRPC得標志 | 區分是普通RPC還是onewayRPC得標志 |
remark | String | 傳輸自定義文本信息 | 傳輸自定義文本信息 |
extFields | HashMap<String, String> | 請求自定義擴展信息 | 響應自定義擴展信息 |
可見傳輸內容主要可以分為以下4部分:
(1) 消息長度:總長度,四個字節存儲,占用一個int類型;
(2) 序列化類型&消息頭長度:同樣占用一個int類型,第一個字節表示序列化類型,后面三個字節表示消息頭長度;
(3) 消息頭數據:經過序列化后的消息頭數據;
(4) 消息主體數據:消息主體的二進制字節數據內容;
2.3 消息的通信方式和流程
在RocketMQ消息隊列中支持通信的方式主要有同步(sync)、異步(async)、單向(oneway)
三種。其中“單向”通信模式相對簡單,一般用在發送心跳包場景下,無需關注其Response。這里,主要介紹RocketMQ的異步通信流程。
2.4 Reactor多線程設計
RocketMQ的RPC通信采用Netty組件作為底層通信庫,同樣也遵循了Reactor多線程模型,同時又在這之上做了一些擴展和優化。
上面的框圖中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多線程模型。一個 Reactor 主線程(eventLoopGroupBoss,即為上面的1)負責監聽 TCP網絡連接請求,建立好連接,創建SocketChannel,並注冊到selector上。RocketMQ的源碼中會自動根據OS的類型選擇NIO和Epoll,也可以通過參數配置),然后監聽真正的網絡數據。拿到網絡數據后,再丟給Worker線程池(eventLoopGroupSelector,即為上面的“N”,源碼中默認設置為3),在真正執行業務邏輯之前需要進行SSL驗證、編解碼、空閑檢查、網絡連接管理,這些工作交給defaultEventExecutorGroup(即為上面的“M1”,源碼中默認設置為8)去做。而處理業務操作放在業務線程池中執行,根據 RomotingCommand 的業務請求碼code去processorTable這個本地緩存變量中找到對應的 processor,然后封裝成task任務后,提交給對應的業務processor處理線程池來執行(sendMessageExecutor,以發送消息為例,即為上面的 “M2”)。從入口到業務邏輯的幾個步驟中線程池一直再增加,這跟每一步邏輯復雜性相關,越復雜,需要的並發通道越寬。
線程數 | 線程名 | 線程具體說明 |
---|---|---|
1 | NettyBoss_%d | Reactor 主線程 |
N | NettyServerEPOLLSelector_%d_%d | Reactor 線程池 |
M1 | NettyServerCodecThread_%d | Worker線程池 |
M2 | RemotingExecutorThread_%d | 業務processor處理線程池 |
3 消息過濾
RocketMQ分布式消息隊列的消息過濾方式有別於其它MQ中間件,是在Consumer端訂閱消息時再做消息過濾的。RocketMQ這么做是在於其Producer端寫入消息和Consumer端訂閱消息采用分離存儲的機制來實現的,Consumer端訂閱消息是需要通過ConsumeQueue這個消息消費的邏輯隊列拿到一個索引,然后再從CommitLog里面讀取真正的消息實體內容,所以說到底也是還繞不開其存儲結構。其ConsumeQueue的存儲結構如下,可以看到其中有8個字節存儲的Message Tag的哈希值,基於Tag的消息過濾正式基於這個字段值的。
主要支持如下2種的過濾方式
(1) Tag過濾方式:Consumer端在訂閱消息時除了指定Topic還可以指定TAG,如果一個消息有多個TAG,可以用||分隔。其中,Consumer端會將這個訂閱請求構建成一個 SubscriptionData,發送一個Pull消息的請求給Broker端。Broker端從RocketMQ的文件存儲層—Store讀取數據之前,會用這些數據先構建一個MessageFilter,然后傳給Store。Store從 ConsumeQueue讀取到一條記錄后,會用它記錄的消息tag hash值去做過濾,由於在服務端只是根據hashcode進行判斷,無法精確對tag原始字符串進行過濾,故在消息消費端拉取到消息后,還需要對消息的原始tag字符串進行比對,如果不同,則丟棄該消息,不進行消息消費。
(2) SQL92的過濾方式:這種方式的大致做法和上面的Tag過濾方式一樣,只是在Store層的具體過濾過程不太一樣,真正的 SQL expression 的構建和執行由rocketmq-filter模塊負責的。每次過濾都去執行SQL表達式會影響效率,所以RocketMQ使用了BloomFilter避免了每次都去執行。SQL92的表達式上下文為消息的屬性。
4 負載均衡
RocketMQ中的負載均衡都在Client端完成,具體來說的話,主要可以分為Producer端發送消息時候的負載均衡和Consumer端訂閱消息的負載均衡。
4.1 Producer的負載均衡
Producer端在發送消息的時候,會先根據Topic找到指定的TopicPublishInfo,在獲取了TopicPublishInfo路由信息后,RocketMQ的客戶端在默認方式下selectOneMessageQueue()方法會從TopicPublishInfo中的messageQueueList中選擇一個隊列(MessageQueue)進行發送消息。具體的容錯策略均在MQFaultStrategy這個類中定義。這里有一個sendLatencyFaultEnable開關變量,如果開啟,在隨機遞增取模的基礎上,再過濾掉not available的Broker代理。所謂的"latencyFaultTolerance",是指對之前失敗的,按一定的時間做退避。例如,如果上次請求的latency超過550Lms,就退避3000Lms;超過1000L,就退避60000L;如果關閉,采用隨機遞增取模的方式選擇一個隊列(MessageQueue)來發送消息,latencyFaultTolerance機制是實現消息發送高可用的核心關鍵所在。
4.2 Consumer的負載均衡
在RocketMQ中,Consumer端的兩種消費模式(Push/Pull)都是基於拉模式來獲取消息的,而在Push模式只是對pull模式的一種封裝,其本質實現為消息拉取線程在從服務器拉取到一批消息后,然后提交到消息消費線程池后,又“馬不停蹄”的繼續向服務器再次嘗試拉取消息。如果未拉取到消息,則延遲一下又繼續拉取。在兩種基於拉模式的消費方式(Push/Pull)中,均需要Consumer端在知道從Broker端的哪一個消息隊列—隊列中去獲取消息。因此,有必要在Consumer端來做負載均衡,即Broker端中多個MessageQueue分配給同一個ConsumerGroup中的哪些Consumer消費。
1、Consumer端的心跳包發送
在Consumer啟動后,它就會通過定時任務不斷地向RocketMQ集群中的所有Broker實例發送心跳包(其中包含了,消息消費分組名稱、訂閱關系集合、消息通信模式和客戶端id的值等信息)。Broker端在收到Consumer的心跳消息后,會將它維護在ConsumerManager的本地緩存變量—consumerTable,同時並將封裝后的客戶端網絡通道信息保存在本地緩存變量—channelInfoTable中,為之后做Consumer端的負載均衡提供可以依據的元數據信息。
2、Consumer端實現負載均衡的核心類—RebalanceImpl
在Consumer實例的啟動流程中的啟動MQClientInstance實例部分,會完成負載均衡服務線程—RebalanceService的啟動(每隔20s執行一次)。通過查看源碼可以發現,RebalanceService線程的run()方法最終調用的是RebalanceImpl類的rebalanceByTopic()方法,該方法是實現Consumer端負載均衡的核心。這里,rebalanceByTopic()方法會根據消費者通信類型為“廣播模式”還是“集群模式”做不同的邏輯處理。這里主要來看下集群模式下的主要處理流程:
(1) 從rebalanceImpl實例的本地緩存變量—topicSubscribeInfoTable中,獲取該Topic主題下的消息消費隊列集合(mqSet);
(2) 根據topic和consumerGroup為參數調用mQClientFactory.findConsumerIdList()方法向Broker端發送獲取該消費組下消費者Id列表的RPC通信請求(Broker端基於前面Consumer端上報的心跳包數據而構建的consumerTable做出響應返回,業務請求碼:GET_CONSUMER_LIST_BY_GROUP);
(3) 先對Topic下的消息消費隊列、消費者Id排序,然后用消息隊列分配策略算法(默認為:消息隊列的平均分配算法),計算出待拉取的消息隊列。這里的平均分配算法,類似於分頁的算法,將所有MessageQueue排好序類似於記錄,將所有消費端Consumer排好序類似頁數,並求出每一頁需要包含的平均size和每個頁面記錄的范圍range,最后遍歷整個range而計算出當前Consumer端應該分配到的記錄(這里即為:MessageQueue)。
(4) 然后,調用updateProcessQueueTableInRebalance()方法,具體的做法是,先將分配到的消息隊列集合(mqSet)與processQueueTable做一個過濾比對。
-
上圖中processQueueTable標注的紅色部分,表示與分配到的消息隊列集合mqSet互不包含。將這些隊列設置Dropped屬性為true,然后查看這些隊列是否可以移除出processQueueTable緩存變量,這里具體執行removeUnnecessaryMessageQueue()方法,即每隔1s 查看是否可以獲取當前消費處理隊列的鎖,拿到的話返回true。如果等待1s后,仍然拿不到當前消費處理隊列的鎖則返回false。如果返回true,則從processQueueTable緩存變量中移除對應的Entry;
-
上圖中processQueueTable的綠色部分,表示與分配到的消息隊列集合mqSet的交集。判斷該ProcessQueue是否已經過期了,在Pull模式的不用管,如果是Push模式的,設置Dropped屬性為true,並且調用removeUnnecessaryMessageQueue()方法,像上面一樣嘗試移除Entry;
最后,為過濾后的消息隊列集合(mqSet)中的每個MessageQueue創建一個ProcessQueue對象並存入RebalanceImpl的processQueueTable隊列中(其中調用RebalanceImpl實例的computePullFromWhere(MessageQueue mq)方法獲取該MessageQueue對象的下一個進度消費值offset,隨后填充至接下來要創建的pullRequest對象屬性中),並創建拉取請求對象—pullRequest添加到拉取列表—pullRequestList中,最后執行dispatchPullRequest()方法,將Pull消息的請求對象PullRequest依次放入PullMessageService服務線程的阻塞隊列pullRequestQueue中,待該服務線程取出后向Broker端發起Pull消息的請求。其中,可以重點對比下,RebalancePushImpl和RebalancePullImpl兩個實現類的dispatchPullRequest()方法不同,RebalancePullImpl類里面的該方法為空,這樣子也就回答了上一篇中最后的那道思考題了。
消息消費隊列在同一消費組不同消費者之間的負載均衡,其核心設計理念是在一個消息消費隊列在同一時間只允許被同一消費組內的一個消費者消費,一個消息消費者能同時消費多個消息隊列。
5 事務消息
Apache RocketMQ在4.3.0版中已經支持分布式事務消息,這里RocketMQ采用了2PC的思想來實現了提交事務消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息,如下圖所示。
5.1 RocketMQ事務消息流程概要
上圖說明了事務消息的大致方案,其中分為兩個流程:正常事務消息的發送及提交、事務消息的補償流程。
1.事務消息發送及提交:
(1) 發送消息(half消息)。
(2) 服務端響應消息寫入結果。
(3) 根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行)。
(4) 根據本地事務狀態執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)
2.補償流程:
(1) 對沒有Commit/Rollback的事務消息(pending狀態的消息),從服務端發起一次“回查”
(2) Producer收到回查消息,檢查回查消息對應的本地事務的狀態
(3) 根據本地事務狀態,重新Commit或者Rollback
其中,補償階段用於解決消息Commit或者Rollback發生超時或者失敗的情況。
5.2 RocketMQ事務消息設計
1.事務消息在一階段對用戶不可見
在RocketMQ事務消息的主要流程中,一階段的消息如何對用戶不可見。其中,事務消息相對普通消息最大的特點就是一階段發送的消息對用戶是不可見的。那么,如何做到寫入消息但是對用戶不可見呢?RocketMQ事務消息的做法是:如果消息是half消息,將備份原消息的主題與消息消費隊列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由於消費組未訂閱該主題,故消費端無法消費half類型的消息,然后RocketMQ會開啟一個定時任務,從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進行消費,根據生產者組獲取一個服務提供者發送回查事務狀態請求,根據事務狀態來決定是提交或回滾消息。
在RocketMQ中,消息在服務端的存儲結構如下,每條消息都會有對應的索引信息,Consumer通過ConsumeQueue這個二級索引來讀取消息實體內容,其流程如下:
RocketMQ的具體實現策略是:寫入的如果事務消息,對消息的Topic和Queue等屬性進行替換,同時將原來的Topic和Queue信息存儲到消息的屬性中,正因為消息主題被替換,故消息並不會轉發到該原主題的消息消費隊列,消費者無法感知消息的存在,不會消費。其實改變消息主題是RocketMQ的常用“套路”,回想一下延時消息的實現機制。
2.Commit和Rollback操作以及Op消息的引入
在完成一階段寫入一條對用戶不可見的消息后,二階段如果是Commit操作,則需要讓消息對用戶可見;如果是Rollback則需要撤銷一階段的消息。先說Rollback的情況。對於Rollback,本身一階段的消息對用戶是不可見的,其實不需要真正撤銷消息(實際上RocketMQ也無法去真正的刪除一條消息,因為是順序寫文件的)。但是區別於這條消息沒有確定狀態(Pending狀態,事務懸而未決),需要一個操作來標識這條消息的最終狀態。RocketMQ事務消息方案中引入了Op消息的概念,用Op消息標識事務消息已經確定的狀態(Commit或者Rollback)。如果一條事務消息沒有對應的Op消息,說明這個事務的狀態還無法確定(可能是二階段失敗了)。引入Op消息后,事務消息無論是Commit或者Rollback都會記錄一個Op操作。Commit相對於Rollback只是在寫入Op消息前創建Half消息的索引。
3.Op消息的存儲和對應關系
RocketMQ將Op消息寫入到全局一個特定的Topic中通過源碼中的方法—TransactionalMessageUtil.buildOpTopic();這個Topic是一個內部的Topic(像Half消息的Topic一樣),不會被用戶消費。Op消息的內容為對應的Half消息的存儲的Offset,這樣通過Op消息能索引到Half消息進行后續的回查操作。
4.Half消息的索引構建
在執行二階段Commit操作時,需要構建出Half消息的索引。一階段的Half消息由於是寫到一個特殊的Topic,所以二階段構建索引時需要讀取出Half消息,並將Topic和Queue替換成真正的目標的Topic和Queue,之后通過一次普通消息的寫入操作來生成一條對用戶可見的消息。所以RocketMQ事務消息二階段其實是利用了一階段存儲的消息的內容,在二階段時恢復出一條完整的普通消息,然后走一遍消息寫入流程。
5.如何處理二階段失敗的消息?
如果在RocketMQ事務消息的二階段過程中失敗了,例如在做Commit操作時,出現網絡問題導致Commit失敗,那么需要通過一定的策略使這條消息最終被Commit。RocketMQ采用了一種補償機制,稱為“回查”。Broker端對未確定狀態的消息發起回查,將消息發送到對應的Producer端(同一個Group的Producer),由Producer根據消息來檢查本地事務的狀態,進而執行Commit或者Rollback。Broker端通過對比Half消息和Op消息進行事務消息的回查並且推進CheckPoint(記錄那些事務消息的狀態是確定的)。
值得注意的是,rocketmq並不會無休止的的信息事務狀態回查,默認回查15次,如果15次回查還是無法得知事務狀態,rocketmq默認回滾該消息。
6 消息查詢
RocketMQ支持按照下面兩種維度(“按照Message Id查詢消息”、“按照Message Key查詢消息”)進行消息查詢。
6.1 按照MessageId查詢消息
RocketMQ中的MessageId的長度總共有16字節,其中包含了消息存儲主機地址(IP地址和端口),消息Commit Log offset。“按照MessageId查詢消息”在RocketMQ中具體做法是:Client端從MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封裝成一個RPC請求后通過Remoting通信層發送(業務請求碼:VIEW_MESSAGE_BY_ID)。Broker端走的是QueryMessageProcessor,讀取消息的過程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的記錄並解析成一個完整的消息返回。
6.2 按照Message Key查詢消息
“按照Message Key查詢消息”,主要是基於RocketMQ的IndexFile索引文件來實現的。RocketMQ的索引文件邏輯結構,類似JDK中HashMap的實現。索引文件的具體結構如下:
IndexFile索引文件為用戶提供通過“按照Message Key查詢消息”的消息索引查詢服務,IndexFile文件的存儲位置是:$HOME\store\index${fileName},文件名fileName是以創建時的時間戳命名的,文件大小是固定的,等於40+500W*4+2000W*20= 420000040個字節大小。如果消息的properties中設置了UNIQ_KEY這個屬性,就用 topic + “#” + UNIQ_KEY的value作為 key 來做寫入操作。如果消息設置了KEYS屬性(多個KEY以空格分隔),也會用 topic + “#” + KEY 來做索引。
其中的索引數據包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 這四個字段,一共20 Byte。NextIndex offset 即前面讀出來的 slotValue,如果有 hash沖突,就可以用這個字段將所有沖突的索引用鏈表的方式串起來了。Timestamp記錄的是消息storeTimestamp之間的差,並不是一個絕對的時間。整個Index File的結構如圖,40 Byte 的Header用於保存一些總的統計信息,4*500W的 Slot Table並不保存真正的索引數據,而是保存每個槽位對應的單向鏈表的頭。20*2000W 是真正的索引數據,即一個 Index File 可以保存 2000W個索引。
“按照Message Key查詢消息”的方式,RocketMQ的具體做法是,主要通過Broker端的QueryMessageProcessor業務處理器來查詢,讀取消息的過程就是用topic和key找到IndexFile索引文件中的一條記錄,根據其中的commitLog offset從CommitLog文件中讀取消息的實體內容。
MQ的應用場景
- 異步解耦(分布式異步操作,對應本地異步操作就是本地起線程做異步的事情)
- 流量削峰(大流量的緩沖)
- 日志處理
- 消息通信
RocketMQ環境搭建
安裝 RocketMQ
環境:
- 64位Linux/Unix/Mac
- JDK1.8.*
下載
http://rocketmq.apache.org/release_notes/release-notes-4.7.0/
一般下載二進制包即可
解壓
unzip rocketmq-all-4.7.0-bin-release.zip
進入RocketMQ目錄
cd rocketmq-all-4.7.0-bin-release
啟動Name Server
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
啟動Brocker
> nohup sh bin/mqbroker -n localhost:9876 &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...
發送和接收消息
> export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
關閉服務
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
RocketMQ 控制台安裝
下載
先到 GitHub 上下載 RocketMQ 控制台。在 Git 上下載下面的工程 rocketmq-console-1.0.0,GitHub 地址:
https://github.com/apache/rocketmq-externals/releases
解壓
unzip rocketmq-externals-rocketmq-console-1.0.0.zip
進入控制台根目錄
cd rocketmq-externals-rocketmq-console-1.0.0
修改配置文件
修改配置文件 rocketmq-console\src\main\resources\application.properties,修改完成,保存。需要修改的內容如下:
server.port=7777 #項目啟動后的端口號
#自己虛擬服務器 ip 地址
rocketmq.config.namesrvAddr=192.168.42.241:9876
#nameserv 的地址,注意防火牆要開啟 9876 端口
構建打包
mvn clean package -Dmaven.test.skip=true
啟動控制台
java -jar target/rocketmq-console-ng-1.0.0.jar
訪問控制台
部署
- 27服務器root密碼
192.168.150.27
Ewell@server
- rocketmq控制台
- 加端口:
firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --reload
- 查看對外開放的端口:
firewall-cmd --zone=public --list-ports
端口說明:
9876:nameserver
10911:broker
7777:web-console
- 日志所在目錄
/root/logs
下