由於篇幅原因,本次的源碼分析只限於Producer側的發送消息的核心邏輯,我會通過流程圖、代碼注釋、文字講解的方式來對源碼進行解釋,后續應該會專門開幾篇文章來做源碼分析。
這篇博客聊聊關於RocketMQ相關的東西,主要聊的點有RocketMQ的功能使用、RocketMQ的底層運行原理和部分核心邏輯的源碼分析。至於我們為什么要用MQ、使用MQ能夠為我們帶來哪些好處、MQ在社區有哪些實現、社區的各個MQ的優劣對比等等,我在之前的文章《消息隊列雜談》已經聊過了,如果需要了解的話可以回過頭去看看。
基礎概念
Broker
首先我們要知道,使用RocketMQ時我們經歷了什么。那就是生產者發送一條消息給RocketMQ,RocketMQ拿到這條消息之后將其持久化存儲起來,然后消費者去找MQ消費這條消息。

上圖中,RocketMQ被標識為了一個單點,但事實上肯定不是如此,對於可以隨時橫向擴展的服務來說,生產者向MQ生產消息的數量也會隨之而變化,所以一個合格成熟的MQ必然是要能夠處理這種情況的;而且MQ自身需要做到高可用,否則一旦這個單點宕機,那所有存儲在MQ中的消息就全部丟失且無法找回了。
所以在實際的生產環境中,肯定是會部署一個MQ的集群。而在RocketMQ中,這個“實例”有個專屬名詞,叫做Broker。並且,每個Broker都會部署一個Slave Broker,Master Broker會定時的向Slave Broker同步數據,形成一個Broker的主從架構。
那么問題來了,在微服務的架構中,部署的服務也存在多實例部署的情況,服務之間相互調用是通過注冊中心來獲取對應服務的實例列表的。
拿Spring Cloud舉例,服務通過Eureka注冊中心獲取到某個服務的全部實例,然后交給Ribbon,Ribbon聯動Eureka,從Eureka處獲取到服務實例的列表,然后通過負載均衡算法選出一個實例,最后發起請求。
同理,此時MQ中存在多個Broker實例,那生產者如何得知MQ集群中有多少Broker實例呢?自己應該連接哪個實例?
首先我們直接排除在代碼里Hard Code,具體原因我覺得應該不用再贅述了。RocketMQ是如何解決這個問題呢?這就是接下來我們要介紹的NameServer了。
NameServer
NameServer可以被簡單的理解為上一小節中提到的注冊中心,所有的Broker的在啟動的時候都會向NameServer進行注冊,將自己的信息上報。這些信息除了Broker的IP、端口相關數據,還有RocketMQ集群的路由信息,路由信息后面再聊。

有了NameServer,客戶端啟動之后會和NameServer交互,獲取到當前RocketMQ集群中所有的Broker信息、路由信息。這樣一來,生產者就知道自己需要連接的Broker信息了,就可以進行消息投遞。
那么問題來了,如果在運行過程中,如果某個Broker突然宕機,NameServer會如何處理?
這需要提到RocketMQ的這續約機制和故障感知機制。Broker在完成向NameServer的注冊之后,會每隔30秒向NameServer發送心跳進行續約;如果NameServer感知到了某個Broker超過了120秒都沒有發送心跳,則會認為這個Broker不可用,將其從自己維護的信息中移除。
這套機制,和Spring Cloud中的Eureka的實現如出一轍。Eureka中的Service在啟動之后也會向Eureka注冊自己,這樣一來其他的服務就可以向該服務發起請求,交換數據。Service每隔30秒會向Eureka發送心跳續約,如果某個Service超過了90秒沒有發送心跳,Eureka就會認為該服務宕機,將其從Eureka維護的注冊表中移除。
上面圖中我聊到了多實例部署,這個多實例部署和微服務中的多實例部署還不太一樣,微服務中,所有的服務都是無狀態的,可以橫向的擴展,而在RocketMQ中,每個Broker所存的數據可能都不一樣。
我們來看一下RocketMQ的簡單使用。
Message msg = new Message(
"TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
可以看到,Message的第一個參數,為當前這條消息指定了一個Topic,那Topic又是什么呢?
Topic
Topic是對發送到RocketMQ中的消息的邏輯分類,例如我們的訂單系統、積分系統、倉儲系統都會用到這個MQ,為了對其進行區分,我們就可以為不同的系統建立不同的Topic。
那為什么說是邏輯分區呢?因為RocketMQ在真實存儲中,並不是一個Broker就存儲一個Topic的數據,道理很簡單,如果當前這個Broker宕機,甚至極端情況磁盤壞了,那這個Topic的數據就會永久丟失。
所以在真實存儲中,消息是分布式的存儲在多個Broker上的,這這些分散在多個Broker上的存儲介質叫MessageQueue,如果你熟悉Kafka的底層原理,就知道這個跟Kafka中的Partition是同類的實現。

通過上圖可以看出,同一個Topic的數據,被分成了好幾份,分別存儲在不同的Broker上,那RocketMQ為什么要這么實現?
首先,一個Topic中如果只有一個Queue,那么消費者在消費時的速度必然受到影響;而如果一個Topic有很多個Queue,那么Consumer就可以將消費操作同時進行,從而扛住更多的並發。
除此之外,單台機器的資源是有限的。一個Topic的消息量可能會非常之巨大,一台機器的磁盤很快就會被塞滿。所以RocketMQ將一個Topic的數據分攤給了多台機器,進行分散存儲。其本質上就是一個數據分片存儲的一種機制。
所以我們知道了,發送到某個Topic的數據是分布式的存儲在多個Broker中的MessageQueue上的。
Broker消息存儲原理
那Producer發送到Broker中的消息,到底是以什么方式存儲的呢?答案是Commit Log,Broker收到消息,會將該消息采用順序寫入的方式,追加到磁盤上的Commit Log文件中,每個Commit Log大小為1G,如果寫滿了1G則會新建一個Commit Log繼續寫,Commit Log文件的特點是順序寫、隨機讀。

這就是最底層的存儲的方式,那么問題來了,Consumer來取消息的時候,Broker是如何從這一堆的Commit Log中找到相應的數據呢?眾所周知,一提到磁盤的I/O操作,就會聯想到耗時這兩個字,而RocketMQ的一大特點就是高吞吐,看似很矛盾,RocketMQ是如何做的呢?
答案是ConsumeQueue,Broker在寫入Commit Log的同時,還會將當前這條消息在Commit Log中的Offset、消息的Size和對應的Tag的Hash寫入到ConsumeQueue文件中。每個Message Queue會有相對應的ConsumeQueue文件存儲在磁盤上。
和Commit Log一樣,一個ConsumeQueue包含了30W條消息,每條消息的大小為20字節,所以每個ConsumeQueue文件的大小約為5.72M;當其寫滿了之后,會再新建一個ConsumeQueue文件繼續寫入。
ConsumeQueue是一種邏輯隊列,更是一種索引,讓Consumer來消費的時候可以快速的從磁盤文件中定位到這條消息。
看到這你可能會想,上面提到的Tag又是個什么東西?
Tag
Tag,標簽,用於對同一個Topic內的消息進行分類,為什么還需要對Topic進行消息類型划分呢?
舉一個極端的例子,某一個新的服務,需要去消費訂單系統的MQ,但是由於業務的特殊性,只需要去消費商品類型為數碼產品的訂單消息,如果沒有Tag,那么該Consumer就會去做判斷,該訂單消息是否是數碼產品類,如果不是,則丟棄,如果是則進行消費。
這樣一來,Consumer側就執行了大量的無用功。引入了Tag之后,Producer在生產消息的時候會給訂單打上Tag,Consumer進行消費的時候,可以配置只消費指定的Tag的消息。這樣一來就不需要Consumer自己去做這個事情了,RocketMQ會幫我們實現這個過濾。
那其過濾的原理是什么?首先在Broker側是通過消息中保存的Tag的Hash值進行過濾,然后Consumer側在去拉取消息的時候還需要再過濾一次。
為什么在Broker過濾了,還需要在Consumer側再過濾一次?因為Hash沖突,不同的Tag經過Hash算法之后可能會得到一樣的值,所以Consumer側在拉取消息的時候會通過字符串進行二次過濾。
Producer發送消息源碼分析
流程總覽
首先給出整個發送消息的大致流程,先熟悉這個流程看源碼,會更加的清晰一點。

初始化Prodcuer
還是按照下面這個例子出發。

首先我們會初始化一個DefaultMQProducer,RocketMQ會給這個Producer一個默認的實現DefaultMQProducerImpl
。然后producer.start()
會啟動一個線程池。
合法性校驗
接下來就是比較核心的producer.send(msg)
了,首先RocketMQ會調用checkMessage
來檢測發送的消息是否合法。

這些檢測包含了待發送的消息是否為空,Topic是否為空、Topic是否包含了非法的字符串、Topic的長度是否超過了最大限制127
,然后會去檢查Body是否符合發送要求,例如msg的Body是否為空、msg的Body是否超過了最大的限制等等,這里消息的Body最大不能超過4M。

調用發送消息
對於msg的Topic,RocketMQ會用NameSpace將其包裝一層,然后就會調用DefaultMQProducerImpl
中的sendDefaultImpl
默認實現,發送消息給Broker,默認的發送消息Timeout是3秒。

發送消息中,MQ會再次調用checkMessage
對消息的合法性再次進行檢查,然后就會去嘗試獲取Topic的詳細信息。
所有的Topic的信息都會存在一個叫topicPublishInfoTable
的 ConcurrentHashMap中,這個Map中Key就是Topic的字符串,而Value則是TopicPublishInfo
。
這個TopicPublishInfo中就包含了之前在基礎概念中提到的,從Broker中獲取到的相應的元數據,其中就包含了關鍵的MessageQueue和集群元數據,其基礎的結構如下。

messageQueueList
包含了該Topic下的所有的MessageQueue,每個MessageQueue的所屬Topic,每個MessageQueue所在的Broker的名稱以及專屬的queueId。
topicRouteData
包含了該Topic下的所有的Queue、Broker相關的數據。
獲取Topic詳細數據
在最終發送消息前,需要獲取到Topic的詳情,例如像Broker地址這樣的數據,Producer中是通過tryToFindTopicPublishInfo
方法獲取的,詳細的注釋我已經寫在了下圖中。

對於首次使用的Topic,在上面的Map肯定是不存在的。所以RocketMQ會將其加入到Map中去,並且調用方法updateTopicRouteInfoFromNameServer
從NameServer處獲取該Topic的元數據,將其一並寫入Map。初次之外,還會將路由信息、Broker的詳細信息分別放入topicRouteTable
和brokerAddrTable
中,這兩個都是Producer維護在內存中的ConcurrentHashMap。
獲取到了Topic的詳細信息之后,接下來會確認一個發送的重試次數timesTotal
,假設timesTotal為N,那么發送消息如果失敗就會重試N次。不過當且僅當發送失敗的時候才會進行重試,其余的case都不會,例如超時、或者沒有選擇到合適的MessageQueue。
這個重試的次數timesTotal
受到參數communicationMode
的影響;CommunicationMode
有三個值,分別是SYNC
、ASYNC
和ONEWAY
。RocketMQ默認的實現中,選擇了SYNC
同步。

通過代碼我們可以看到,如果是communicationMode
是SYNC
的話,timesTotal
的值為1+retryTimesWhenSendFailed
,而retryTimesWhenSendFailed
的值默認為2
,代表在消息發送失敗之后的重試次數。
這樣一來,如果我們選擇了SYNC
的方式,Producer在發送消息的時候默認的重試次數就為3。不過當且僅當發送失敗的時候才會進行重試,其余的case都不會。
MessageQueue選擇機制
我們之前聊過,一個Topic的數據是分片存儲在一個或者多個Broker上的,底層的存儲介質為MessageQueue,之前的圖中,我們沒有給出Producer是如何選擇具體發送到哪個MessageQueue,這里我們通過源碼來看一下。
Producer中是通過selectOneMessageQueue
來進行的Message Queue選擇,該方法通過Topic的詳細元數據和上次選擇的MessageQueue所在的Broker,來決定下一個的選擇。
核心的選擇邏輯
其核心的選擇邏輯是什么呢?用大白話來說,就是選出一個index,然后將其和當前Topic的MessageQueue數量取模。這個index在首次選擇的時候,肯定是沒有的, RocketMQ會搞一個隨機數出來。然后在該值的基礎上+1,因為為了通用,在外層看來,這個index上次已經用過了,所以每次獲取你都直接幫我+1就好了。

上圖就是MessageQueue最核心的、最底層的原則機制了。但是由於實際的業務情況十分復雜, RocketMQ在實現中還額外的做了很多的事情。
發送故障延遲下的選擇邏輯
在實際的選擇過程中,會判斷當前是否啟用了發送延遲故障,這個由變量sendLatencyFaultEnable
的值決定,其默認值是false,也就是默認是不開啟的,從代碼里我暫時沒找到其開啟的位置。
不過我們可以聊聊開啟之后,會發生什么。它同樣會開啟for循環,次數為MessageQueue的數量,計算拿到確定的Queue之后,會通過內存的一張表faultItemTable
去判斷當前這個Broker是否可用,該表是每次發送消息的時候都會去更新它。
如果當前沒有可用的Broker,則會觸發其兜底的邏輯,再選擇一個MessageQueue出來。

常規的選擇邏輯
如果當前發送故障延遲沒有啟用,則會走常規邏輯,同樣的會去for循環計算,循環中取到了MessageQueue之后會去判斷是否和上次選擇的MessageQueue屬於同一個Broker,如果是同一個Broker,則會重新選擇,直到選擇到不屬於同一個Broker的MessageQueue,或者直到循環結束。這也是為了將消息均勻的分發存儲,防止數據傾斜。

消息發送
最后就會調用Netty相關的組件,將消息發送出去了。
EOF
關於RocketMQ中的一些基礎的概念,和RocketMQ的Producer發送消息的源碼就先分析到這里,后續看緣分再分享其他部分的源碼吧。
好了以上就是本篇博客的全部內容了,如果你覺得這篇文章對你有幫助,還麻煩點個贊,關個注,分個享,留個言。
歡迎微信搜索關注【SH的全棧筆記】,查看更多相關文章
