文章很長,建議收藏起來,慢慢讀! 高並發 發燒友社群:瘋狂創客圈 為小伙伴奉上以下珍貴的學習資源:
-
瘋狂創客圈 經典升級 : 極致經典 《 Java 高並發 三部曲 》 面試必備 + 大廠必備 + 漲薪必備
-
瘋狂創客圈 經典圖書 : 《Netty Zookeeper Redis 高並發實戰》 面試必備 + 大廠必備 +漲薪必備 免費領
-
瘋狂創客圈 經典圖書 : 《SpringCloud、Nginx高並發核心編程》 面試必備 + 大廠必備 + 漲薪必備 免費領
-
瘋狂創客圈 資源寶庫: Java 必備 百度網盤資源大合集 價值>1000元 【免費取 】
RocketMQ簡介
rocketMQ作為一款分布式的消息中間件,RocketMQ作為一款分布式的消息中間件(阿里的說法是不遵循任何規范的,所以不能完全用JMS的那一套東西來看它),經歷了Metaq1.x、Metaq2.x的發展和淘寶雙十一的洗禮,在功能和性能上遠超ActiveMQ。
- 1.要知道RocketMQ原生就是支持分布式的,而ActiveMQ原生存在單點性。
- 2.RocketMQ可以保證嚴格的消息順序,而ActiveMQ無法保證!
- 3.RocketMQ提供億級消息的堆積能力,這不是重點,重點是堆積了億級的消息后,依然保持寫入低延遲!
- 4.豐富的消息拉取模式(Push or Pull) Push好理解,比如在消費者端設置Listener回調;而Pull,控制權在於應用,即應用需要主動的調用拉消息方法從Broker獲取消息,這里面存在一個消費位置記錄的問題(如果不記錄,會導致消息重復消費)。
RocketMQ是什么

- RocketMQ是一個隊列模型的消息中間件,具有高性能、高可靠、高實時、分布式特點。
- Producer、Consumer、隊列都可以分布式。
- Producer 向一些隊列輪流發送消息,隊列集合稱為 Topic,Consumer 如果做廣播消費,則一個 consumer 實例消費這個 Topic 對應的所有隊列,如果做集群消費,則多個 Consumer 實例平均消費這個 topic 對應的隊列集合。
- 能夠保證嚴格的消息順序
- 提供豐富的消息拉取模式
- 高效的訂閱者水平擴展能力
- 實時的消息訂閱機制
- 億級消息堆積能力
- 較少的依賴
選擇RocketMQ的理由
強調集群無單點,可擴展,任意一點高可用,水平可擴展
方便集群配置,而且容易擴展(橫向和縱向),通過slave的方式每一點都可以實現高可用
支持上萬個隊列,順序消息
順序消費是實現在同一隊列的,如果高並發的情況就需要隊列的支持,rocketmq可以滿足上萬個隊列同時存在
任性定制你的消息過濾
rocketmq提供了兩種類型的消息過濾,也可以說三種可以通過topic進行消息過濾、可以通過tag進行消息過濾、還可以通過filter的方式任意定制過濾
消息的可靠性(無Buffer,持久化,容錯,回溯消費)
消息無buffer就不用擔心buffer回滿的情況,rocketmq的所有消息都是持久化的,生產者本身可以進行錯誤重試,發布者也會按照時間階梯的方式進行消息重發,消息回溯說的是可以按照指定的時間進行消息的重新消費,既可以向前也可以向后(前提條件是要注意消息的擦除時間)
海量消息堆積能力,消息堆積后,寫入低延遲
針對於provider需要配合部署方式,對於consumer,如果是集群方式一旦master返現消息堆積會向consumer下發一個重定向指令,此時consumer就可以從slave進行數據消費了
分布式事務
我個人感覺rocketmq對這一塊說的不是很清晰,而且官方也說現在這塊存在缺陷(會令系統pagecache過多),所以線上建議還是少用為好
消息失敗重試機制
針對provider的重試,當消息發送到選定的broker時如果出現失敗會自動選擇其他的broker進行重發,默認重試三次,當然重試次數要在消息發送的超時時間范圍內。
針對consumer的重試,如果消息因為各種原因沒有消費成功,會自動加入到重試隊列,一般情況如果是因為網絡等問題連續重試也是照樣失敗,所以rocketmq也是采用階梯重試的方式。
定時消費
除了上面的配置,在發送消息是也可以針對message設置setDelayTimeLevel
活躍的開源社區
現在rocketmq成為了apache的一款開源產品,活躍度也是不容懷疑的
成熟度(經過雙十一考驗)
針對本身的成熟度,我們看看這么多年的雙十一就可想而知了
RocketMQ 邏輯結構

Broker
- Broker即是物理上的概念,也是邏輯上的概念。多個物理Broker通過IP:PORT區分,多個邏輯Broker通過BrokerName區分。
- 多個邏輯Broker組成Cluster。
- Broker與Topic是多對多的關系。
- Broker自身包含一個使用10911端口的NettyServer、一個10909的NettyServer,以及一個NettyClient。
- HA通過10912端口提供服務,用於Broker內部各個部分的數據傳輸。
- Broker是最重要的部分,包括持久化消息、Broker集群一致性(HA)、保存歷史消息、保存Consumer消費偏移量、索引創建等。
- Producer發送來的消息最終會通過CommitLog序列化到硬盤,執行序列化邏輯的類為AppendMessageCallback接口的實現類。
- Broker序列化消息是順序寫,序列化文件保存在userHome/store/commitlog目錄下,文件名為總偏移量。
- 默認為異步刷盤、提交日志單個文件1個G、單個consumer隊列文件為不到6M
消費者組(Consumer Group)
在正式開始說消費之前,我們首先要明白一個概念,就是消費組
消費者組(Consumer Group)是一類消費者的集合,這類消費者通常消費同一類消息並且消費邏輯一致,所以將這些消費者分組在一起。消費者組與生產者組類似,都是將相同角色的消費者分組在一起並命名的。
分組是一個很精妙的概念設計,RocketMQ正是通過這種分組機制,實現了天然的消息負載均衡。在消費消息時,通過消費者組實現了將消息分發到多個消費者服務器實例
設置消費者的名字是在代碼中實現的,如下:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("stock_consumer_group");
舉個例子
某個主題有9條消息,其中一個消費者組有3個實例(3個進程或3台機器),那么每個實例將均攤3條消息,這也意味着我們可以很方便地通過增加機器來實現水平擴展。
如果還不理解的話,我們看下面這張圖,由訂單系統來的消息,被庫存和積分兩個組所分配,每個組就是一個消費組

消費者組(Consumer Group)可以用來表示一個消費消息應用,一個 Consumer Group 下包含多個 Consumer 實例,可以是多台機器,也可 以是多個進程,或者是一個進程的多個 Consumer 對象。一個 Consumer Group 下的多個 Consumer 以均攤 方式消費消息,如果設置為廣播方式,那么這個 Consumer Group 下的每個實例都消費全量數據。
生產者組 Producer Group
用來表示一個發送消息應用,一個 Producer Group 下包含多個 Producer 實例,可以是多台機器,也可以 是一台機器的多個進程,或者一個進程的多個 Producer 對象。一個 Producer Group 可以發送多個 Topic 消息,Producer Group 作用如下:
- 標識一類 Producer
- 可以通過運維工具查詢這個發送消息應用下有多個 Producer 實例
- 發送分布式事務消息時,如果 Producer 中途意外宕機,Broker 會主動回調 Producer Group 內的任意 一台機器來確認事務狀態。
RocketMQ 核心組件圖
RocketMQ是開源的消息中間件,它主要由NameServer,Producer,Broker,Consumer四部分構成。

NameServer
NameServer主要負責Topic和路由信息的管理,類似zookeeper。
Broker
消息中轉角色,負責存儲消息,轉發消息。
Consumer
消息消費者,負責消息消費,一般是后台系統負責異步消費。
Producer
消息生產者,負責產生消息,一般由業務系統負責產生消息。
RokcetMQ 物理部署圖

物理概念
NameServer
NameServer是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。
Broker
Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與Name Server集群中的所有節點建立長連接,定時注冊Topic信息到所有Name Server。
Producer
Producer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。
Producer與Name Server關系
1)連接 單個Producer和一台NameServer保持長連接,如果該NameServer掛掉,生產者會自動連接下一個NameServer,直到有可用連接為止,並能自動重連。
2)輪詢時間 默認情況下,生產者每隔30秒從NameServer獲取所有Topic的最新隊列情況,這意味着某個Broker如果宕機,生產者最多要30秒才能感知,在此期間,
發往該broker的消息發送失敗。
3)心跳 與nameserver沒有心跳
Consumer
Consumer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。
1、Name Server是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。
2、每個Broker與Name Server集群中的所有節點建立長連接,定時注冊Topic信息到所有Name Server。
3、Producer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息。
4、Consumer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息。
Broker 詳解
1、Broker與Name Server關系
1)連接 單個Broker和所有Name Server保持長連接。
2)心跳
心跳間隔:每隔30秒向所有NameServer發送心跳,心跳包含了自身的Topic配置信息。
心跳超時:NameServer每隔10秒,掃描所有還存活的Broker連接,若某個連接2分鍾內沒有發送心跳數據,則斷開連接。
3)斷開:當Broker掛掉;NameServer會根據心跳超時主動關閉連接,一旦連接斷開,會更新Topic與隊列的對應關系,但不會通知生產者和消費者。
2、 負載均衡
一個Topic分布在多個Broker上,一個Broker可以配置多個Topic,它們是多對多的關系。
如果某個Topic消息量很大,應該給它多配置幾個Queue,並且盡量多分布在不同Broker上,減輕某個Broker的壓力。
3 、可用性
由於消息分布在各個Broker上,一旦某個Broker宕機,則該Broker上的消息讀寫都會受到影響。
所以RocketMQ提供了Master/Slave的結構,Salve定時從Master同步數據,如果Master宕機,則Slave提供消費服務,但是不能寫入消息,此過程對應用透明,由RocketMQ內部解決。
有兩個關鍵點:
思考1一旦某個broker master宕機,生產者和消費者多久才能發現?
受限於Rocketmq的網絡連接機制,默認情況下最多需要30秒,因為消費者每隔30秒從nameserver獲取所有topic的最新隊列情況,這意味着某個broker如果宕機,客戶端最多要30秒才能感知。
思考2 master恢復恢復后,消息能否恢復。
消費者得到Master宕機通知后,轉向Slave消費,但是Slave不能保證Master的消息100%都同步過來了,因此會有少量的消息丟失。但是消息最終不會丟的,一旦Master恢復,未同步過去的消息會被消費掉。
RocketMQ的領域模型

Message
代表一條消息,使用MessageId唯一識別,用戶在發送時可以設置messageKey,便於之后查詢和跟蹤。一個 Message 必須指定 Topic,相當於寄信的地址。Message 還有一個可選的 Tag 設置,以便消費端可以基於 Tag 進行過濾消息。也可以添加額外的鍵值對,例如你需要一個業務 key 來查找 Broker 上的消息,方便在開發過程中診斷問題。
Topic
- Topic表示消息的第一級類型,比如一個電商系統的消息可以分為:交易消息、物流消息等。一條消息必須有一個Topic。
- 最細粒度的訂閱單位,一個Group可以訂閱多個Topic的消息。
Tag
Tag表示消息的第二級類型,比如交易消息又可以分為:交易創建消息,交易完成消息等。RocketMQ提供2級消息分類,方便靈活控制。
標簽可以被認為是對 Topic 進一步細化。一般在相同業務模塊中通過引入標簽來標記不同用途的消息。
Queue
Topic和Queue是1對多的關系,一個Topic下可以包含多個Queue,主要用於負載均衡。發送消息時,用戶只指定Topic,Producer會根據Topic的路由信息選擇具體發到哪個Queue上。Consumer訂閱消息時,會根據負載均衡策略決定訂閱哪些Queue的消息。
消息的物理管理單位。一個Topic下可以有多個Queue,Queue的引入使得消息的存儲可以分布式集群化,具有了水平擴展能力。
在 RocketMQ 中,所有消息隊列都是持久化,長度無限的數據結構,所謂長度無限是指隊列中的每個存儲單元都是定長,訪問其中的存儲單元使用 Offset 來訪問,offset 為 java long 類型,64 位,理論上在 100年內不會溢出,所以認為是長度無限。
Queue和消費者的 ConsumeQueue物理概念一 一對應, ConsumeQueue是一個長度無限的數組,Offset 就是下標。
Offset
RocketMQ在存儲消息時會為每個Topic下的每個Queue生成一個消息的索引文件,每個Queue都對應一個Offset記錄當前Queue中消息條數。
Producer
消息生產者,位於用戶的進程內,Producer通過NameServer獲取所有Broker的路由信息,根據負載均衡策略選擇將消息發到哪個Broker,然后調用Broker接口提交消息。
Producer Group
生產者組,簡單來說就是多個發送同一類消息的生產者稱之為一個生產者組。
Consumer
消息消費者,位於用戶進程內。Consumer通過NameServer獲取所有broker的路由信息后,向Broker發送Pull請求來獲取消息數據。Consumer可以以兩種模式啟動,廣播(Broadcast)和集群(Cluster),廣播模式下,一條消息會發送給所有Consumer,集群模式下消息只會發送給一個Consumer。
Consumer Group
消費者組,和生產者類似,消費同一類消息的多個 Consumer 實例組成一個消費者組。
NameServer
NameServer可以看作是RocketMQ的注冊中心,它管理兩部分數據:集群的Topic-Queue的路由配置;Broker的實時配置信息。其它模塊通過Nameserv提供的接口獲取最新的Topic配置和路由信息。
Producer/Consumer:通過查詢接口獲取Topic對應的Broker的地址信息Broker: 注冊配置信息到NameServer, 實時更新Topic信息到NameServer
RocketMQ 消息存儲設計原理圖
RocketMQ存儲邏輯對象層
- 該層主要包含了RocketMQ數據文件存儲直接相關的三個模型類IndexFile、ConsumerQueue和CommitLog。
- IndexFile為索引數據文件提供訪問服務,ConsumerQueue為邏輯消息隊列提供訪問服務,CommitLog則為消息存儲的日志數據文件提供訪問服務。
- 這三個模型類也是構成了RocketMQ存儲層的整體結構。

CommitLog
消息存儲文件,所有消息主題的消息都存儲在 CommitLog 文件中。
Commitlog 文件存儲的邏輯視圖如圖所示

ConsumeQueue
消息消費隊列,消息到達 CommitLog 文件后,將異步轉發到 消費隊列,供消息消費者消費。ConsumeQueue存儲格式如下:

- 單個 ConsumeQueue 文件中默認包含 30 萬個條目,單個文件的長度為 30w × 20 字節, 單個 ConsumeQueue 文件可以看出是一個 ConsumeQueue 條目的數組,其下標為 ConsumeQueue 的邏輯偏移量,消息消費進度存儲的偏移量 即邏輯偏移量。
- ConsumeQueue 即為 Commitlog 文件的索引文件, 其構建機制是當消息到達 Commitlog 文件后, 由專門的線程 產生消息轉發任務,從而構建消息消費隊列文件與下文提到的索引文件。
為什么需要 ConsumeQueue ?
RocketMQ的消息都是按照先來后到,順序的存儲在CommitLog中的,而消費者通常只關心某個Topic下的消息。順序的查找CommitLog肯定是不現實的,我們可以構建一個索引文件,里面存放着某個Topic下面所有消息在CommitLog中的位置,這樣消費者獲取消息的時候,只需要先查找這個索引文件,然后再去CommitLog中獲取消息就 OK了。這個索引文件,就是我們的ComsumerQueue。
IndexFile
消息索引文件,主要存儲消息 Key 與 Offset 的對應關系。
lndexFile 總共包含 lndexHeader、 Hash 槽、 Hash 條目。
消息消費隊列是RocketMQ專門為消息訂閱構建的索引文件,提高根據主題與消息隊 列檢索消息的速度 ,另外 RocketMQ 引入了 Hash 索引機制為消息建立索引, HashMap 的設 計包含兩個基本點 : Hash 槽與 Hash 沖突的鏈表結構。 RocketMQ 索引文件布局如圖所示

存儲分析的實例
1、流程圖
我們由簡單到復雜的來理解,它的一些核心概念

這個圖很好理解,消息先發到Topic,然后消費者去Topic拿消息。只是Topic在這里只是個概念,那它到底是怎么存儲消息數據的呢,這里就要引入Broker概念。
2、Topic的存儲
Topic是一個邏輯上的概念,實際上Message是在每個Broker上以Queue的形式記錄。

從上面的圖片可以總結下幾條結論。
1、消費者發送的Message會在Broker中的Queue隊列中記錄。
2、一個Topic的數據可能會存在多個Broker中。
3、一個Broker存在多個Queue。
4、單個的Queue也可能存儲多個Topic的消息。
也就是說每個Topic在Broker上會划分成幾個邏輯隊列,每個邏輯隊列保存一部分消息數據,但是保存的消息數據實際上不是真正的消息數據,而是指向commit log的消息索引。
Queue不是真正存儲Message的地方,真正存儲Message的地方是在CommitLog。
如圖(盜圖)

左邊的是CommitLog。這個是真正存儲消息的地方。RocketMQ所有生產者的消息都是往這一個地方存的。
右邊是ConsumeQueue。這是一個邏輯隊列。和上文中Topic下的Queue是一一對應的。消費者是直接和ConsumeQueue打交道。ConsumeQueue記錄了消費位點,這個消費位點關聯了commitlog的位置。所以即使ConsumeQueue出問題,只要commitlog還在,消息就沒丟,可以恢復出來。還可以通過修改消費位點來重放或跳過一些消息。
事務狀態服務
存儲每條消息的事務狀態。
定時消息服務
每一個延遲級別對應一個消息消費隊列,存儲延遲隊列的消息拉取進度。
RMQ文件存儲模型層

封裝的文件內存映射層
- RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel兩種方式完成數據文件的讀寫。
- 其中,采用MappedByteBuffer這種內存映射磁盤文件的方式完成對大文件的讀寫,在RocketMQ中將該類封裝成MappedFile類。
- 這里,每一種類的單個文件均由MappedFile類提供讀寫操作服務(其中,MappedFile類提供了順序寫/隨機讀、內存數據刷盤、內存清理等和文件相關的服務)。
磁盤存儲層
主要指的是部署RocketMQ服務器所用的磁盤。這里,需要考慮不同磁盤類型(如SSD或者普通的HDD)特性以及磁盤的性能參數(如IOPS、吞吐量和訪問時延等指標)對順序寫/隨機讀操作帶來的影響。
RocketMQ中消息刷盤
在RocketMQ中消息刷盤主要可以分為同步刷盤和異步刷盤兩種。
同步刷盤

- 在返回寫成功狀態時,消息已經被寫入磁盤。
- 具體流程是,消息寫入內存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執行完成后喚醒等待的線程,返回消息寫成功的狀態。
- 一般只用於金融場景。
異步刷盤
在返回寫成功狀態時,消息可能只是被寫入了內存的PAGECACHE,寫操作的返回快,吞吐量大;當內存里的消息量積累到一定程度時,統一觸發寫磁盤操作,快速寫入。
消息在系統中流轉圖

1.Producer 發送消息,消息從 socket 進入 java 堆。
2.Producer 發送消息,消息從 java 堆轉入 PAGACACHE,物理內存。
3.Producer 發送消息,由異步線程刷盤,消息從 PAGECACHE 刷入磁盤。
4.Consumer 拉消息(正常消費),消息直接從 PAGECACHE(數據在物理內存)轉入 socket,到達 consumer, 不經過 java 堆。這種消費場景最多,線上 96G 物理內存,按照 1K 消息算,可以在物理內存緩存 1 億條消 息。
5.Consumer 拉消息(異常消費),消息直接從 PAGECACHE(數據在虛擬內存)轉入 socket。
6.Consumer 拉消息(異常消費),由於 Socket 訪問了虛擬內存,產生缺頁中斷,此時會產生磁盤 IO,從磁 盤 Load 消息到 PAGECACHE,然后直接從 socket 發出去。
7.同 5 一致。
8.同 6 一致。
RocketMQ的消費模式
RocketMQ的消費模式有2種: 集群消費(CLUSTERING) 、 廣播消費(BROADCASTING)。源碼如下:
1 public enum MessageModel {
2 BROADCASTING("BROADCASTING"),
3 CLUSTERING("CLUSTERING");
4
5 private String modeCN;
6
7 private MessageModel(String modeCN) {
8 this.modeCN = modeCN;
9 }
10
11 public String getModeCN() {
12 return this.modeCN;
13 }
14 }
集群消費(CLUSTERING)
集群消費(CLUSTERING)是指: 一個ConsumerGroup中的Consumer實例平均分攤消費消息。例如某個Topic有9條消息,其中一個ConsumerGroup有3個實例(可能是3個進程,或者3台機器),那么每個實例只消費其中部分,消費完的消息不能被其他實例消費。
例如:某個Topic有9條消息,有3個消費者,廣播模式就是每個消費者都收到9條消息,集群模式就是消費者平均分攤9條消息

其實,對於RocketMQ而言,通過ConsumeGroup的機制,實現了天然的消息負載均衡!通俗點來說,RocketMQ中的消息通過ConsumeGroup實現了將消息分發到C1/C2/C3/……的機制,這意味着我們將非常方便的通過加機器來實現水平擴展!
至於消息分發到C1/C2/C3,其實也是可以設置策略的:

默認的分配算法是AllocateMessageQueueAveragely

還有另外一種平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分攤每一條queue,只是以環狀輪流分queue的形式,如下圖:

廣播消費(BROADCASTING)
**廣播消費 **,類似於ActiveMQ中的發布訂閱模式,消息會發給Consume Group中的每一個消費者進行消費。 由於廣播模式下要求一條消息需要投遞到一個消費組下面每一個消費者實例,所以也就沒有消息被分攤消費的說法。
代碼核心
//設置廣播消費模式就是在消費者這里設置一下,其余的代碼不變
consumer.setMessageModel(MessageModel.BROADCASTING);
廣播消費(BROADCASTING)下,一條消息被多個consumer消費,即使這些consumer屬於同一個ConsumerGroup,消息也會被ConsumerGroup中的每個Consumer都消費一次,廣播消費中ConsumerGroup概念可以認為在消息划分方面無意義。
RocketMQ-廣播消費模式設置:

RocketMQ源碼: 路由中心
早期的rocketmq版本的路由功能是使用zookeeper實現的,后來rocketmq為了追求性能,自己實現了一個性能更高效且實現簡單的路由中心 NameServer,
可以通過部署多個 NameServer 路由節點實現高可用,但它們 NameServer 之間並不能互相通信,這也就會導致在某一個時刻各個路由節點間的數據並不完全相同,但數據某個時刻不一致並不會導致消息發送不了,這也是rocketmq 追求簡單高效的一個做法。
Nameserver的源碼
Nameserver的源碼很簡單,整個NameServer總共就由這么幾個類組成:

其中NamesrvStartup為啟動類,NamesrvController為核心控制器,RouteInfoManager為路由信息表。
路由信息表與心跳(路由注冊)
路由注冊即是Broker向Nameserver注冊的過程,它們是通過Broker的心跳功能實現的,
我們知道RouteInfoManager為路由信息表,先來看看Nameserver到底存儲了哪些信息:
public class org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
-
topicQueueTable:Topic消息隊列路由信息,
包括topic所在的broker名稱,讀隊列數量,寫隊列數量,同步標記等信息,rocketmq根據topicQueueTable的信息進行負載均衡消息發送。
-
brokerAddrTable:Broker節點信息
包括brokername,所在集群名稱,還有主備節點信息。
-
clusterAddrTable:Broker集群信息
存儲了集群中所有的Brokername。
-
brokerLiveTable:Broker狀態信息
Nameserver每次收到Broker的心跳包就會更新該信息。
這里也先講一下rocketmq是基於訂閱發布機制,我之前也寫過一篇文章《rocketmq的消費模式》,我們可知一個Topic擁有多個消息隊列,如果不指定隊列的數量,一個Broker會為每個Topic創建4個讀隊列和4個寫隊列,多個Broker組成集群,Broker會通過發送心跳包將自己的信息注冊到路由中心,路由中心brokerLiveTable存儲Broker的狀態,它會根據Broker的心跳包更新Broker狀態信息。
步驟一:Broker發送心跳包
org.apache.rocketmq.broker.BrokerController#start:
public void start() throws Exception {
// 初次啟動,這里會強制執行發送心跳包
this.registerBrokerAll(true, false, true);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
}
Broker在核心控制器啟動時,會強制發送一次心跳包,接着創建一個定時任務,定時向路由中心發送心跳包。
org.apache.rocketmq.broker.BrokerController#registerBrokerAll:
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
// 創建一個topic包裝類
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
// 這里比較有趣,如果該broker沒有讀寫權限,那么會新建一個臨時的topicConfigTable,再set進包裝類
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
// 判斷是否該Broker是否需要發送心跳包
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
// 執行發送心跳包
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
該方法是Broker執行發送心跳包的核心控制方法,它主要做了topic的包裝類封裝操作,判斷Broker此時是否需要執行發送心跳包,但我查了下org.apache.rocketmq.common.BrokerConfig#forceRegister字段的值永遠等於true,也就是該判斷永遠為true,即每次都需要發送心跳包。
我們定位到needRegister遠程調用到路由中心的方法:
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#isBrokerTopicConfigChanged:
public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
DataVersion prev = queryBrokerTopicConfig(brokerAddr);
return null == prev || !prev.equals(dataVersion);
}
public DataVersion queryBrokerTopicConfig(final String brokerAddr) {
BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
if (prev != null) {
return prev.getDataVersion();
}
return null;
}
發現,Broker是否需要發送心跳包由該Broker在路由中心org.apache.rocketmq.namesrv.routeinfo.BrokerLiveInfo#dataVersion決定,如果dataVersion為空或者當前dataVersion不等於brokerLiveTable存儲的brokerLiveTable,Broker就需要發送心跳包。
步驟二:Nameserver處理心跳包
Nameserver的netty服務監聽收到心跳包之后,會調用到路由中心以下方法進行處理:
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker:
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
// 獲取集群下所有的Broker,並將當前Broker加入clusterAddrTable,由於brokerNames是Set結構,並不會重復
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
// 獲取Broker信息,如果是首次注冊,那么新建一個BrokerData並加入brokerAddrTable
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
// 這里判斷Broker是否是已經注冊過
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
// 如果是Broker是Master節點嗎,並且Topic信息更新或者是首次注冊,那么創建更新topic隊列信息
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 更新BrokerLiveInfo狀態信息
BrokerLiveInfo prevBrokerLiveInfo =
this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
該方法是處理Broker心跳包的最核心方法,它主要做了對RouteInfoManager路由信息的一些更新操作,包括對clusterAddrTable、brokerAddrTable、topicQueueTable、brokerLiveTable等路由信息。
NamesrvStartup啟動類
知道了這幾個類的功能之后,我們就直接定位到NamesrvStartup啟動類的啟動方法

Namesrv顧名思義就是名稱服務,是沒有狀態可橫向擴展的服務。廢話不多說了,直接貼代碼。。
public static NamesrvController main0(String[] args) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize = 4096;// socket發送緩沖區大小
}
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 4096;// Socket接收緩沖區大小
}
try {
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());//構建Options,有h代表help,n代表namesrvAddr
//Options加上c代表configFile,p代表printConfigItem
//解析得到commandLine
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);//監聽端口是9876
if (commandLine.hasOption('c')) {//有配置文件
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);//加載配置文件到prop
MixAll.properties2Object(properties, namesrvConfig);//根據prop文件解析,給namesrvConfig填充對應的值
MixAll.properties2Object(properties, nettyServerConfig);//根據prop文件解析,給nettyServerConfig填充對應的值
namesrvConfig.setConfigStorePath(file);//設置config存儲路徑
System.out.printf("load config properties file OK, " + file + "%n");
in.close();
}
}
if (commandLine.hasOption('p')) {//打印namesrvConfig和nettyServerConfig的非靜態,非this開頭的字段
MixAll.printObjectProperties(null, namesrvConfig);
MixAll.printObjectProperties(null, nettyServerConfig);
System.exit(0);
}
//再根據commandLine得到一個prop,再給namesrvConfig填充對應的值
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {//默認NamesrvConfig.rocketmqHome為空,且配置,參數中RocketMQHome為空的話拋異常
System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV + " variable in your environment to match the location of the RocketMQ installation%n");
System.exit(-2);
}
//配置Logback
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//打印namesrvConfig和nettyServerConfig的非靜態,非this開頭的字段
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
//根據namesrvConfig,nettyServerConfig構造NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
boolean initResult = controller.initialize();//初始化NamesrvController
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();//shutdown的時候 NamesrvController也shutdown
return null;
}
}));
controller.start();//啟動服務
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf(tip + "%n");
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
RocketMQ的部署
部署方式
單Master模式
只有一個 Master節點
優點:配置簡單,方便部署
缺點:這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用,不建議線上環境使用
多Master模式
一個集群無 Slave,全是 Master,例如 2 個 Master 或者 3 個 Master
優點:配置簡單,單個Master 宕機或重啟維護對應用無影響,在磁盤配置為RAID10 時,即使機器宕機不可恢復情況下,由與 RAID10磁盤非常可靠,消息也不會丟(異步刷盤丟失少量消息,同步刷盤一條不丟)。性能最高。多 Master 多 Slave 模式,異步復制
缺點:單台機器宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受到受到影響
多Master多Slave模式(異步復制)---本文稍后以這種方式部署集群為例
每個 Master 配置一個 Slave,有多對Master-Slave, HA,采用異步復制方式,主備有短暫消息延遲,毫秒級。
優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,因為Master 宕機后,消費者仍然可以從 Slave消費,此過程對應用透明。不需要人工干預。性能同多 Master 模式幾乎一樣。
缺點: Master 宕機,磁盤損壞情況,會丟失少量消息。
多Master多Slave模式(同步雙寫)---文中會說明補充此集群配置,線上使用的話,推薦使用此模式集群
每個 Master 配置一個 Slave,有多對Master-Slave, HA采用同步雙寫方式,主備都寫成功,向應用返回成功。
優點:數據與服務都無單點, Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高
缺點:性能比異步復制模式略低,大約低 10%左右,發送單個消息的 RT會略高。目前主宕機后,備機不能自動切換為主機,后續會支持自動切換功能
單機模式部署
2. 下載相關軟件
3. 配置環境
3.1. 解壓RocketMQ
unzip /usr/local/rocketmq-all-4.4.0-bin-release.zip -d /usr/local/rocketmq
3.2. 配置環境變量
vi /etc/profile
export JAVA_HOME=/opt/soft/jdk1.8.0_231
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export MAVEN_HOME=/opt/soft/apache-maven-3.5.4
export PATH=$PATH:$MAVEN_HOME/bin
export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.4.0-bin-release
export PATH=$ROCKETMQ_HOME/bin:$PATH
或者使用追加的命令
cat >> /etc/profile <<EOF
export SCAFFOLD_DB_HOST=cdh1
export SCAFFOLD_DB_USER=root
export SCAFFOLD_DB_PSW=wTJJJJZZZ@SSDDC
export SCAFFOLD_REDIS_HOST=cdh1
export SCAFFOLD_REDIS_PSW=123456
export SCAFFOLD_EUREKA_ZONE_HOSTS=http://cdh1:7777/eureka/
export SCAFFOLD_ROCKETMQ_HOSTS=172.26.9.107:2181
export SCAFFOLD_ZOOKEEPER_HOSTS=cdh1:2181
export LC_ALL=en_US.UTF-8
export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.4.0-bin-release
export JAVA_HOME=/usr/local/java/jdk1.8.0_121
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$ROCKETMQ_HOME/bin:$JAVA_HOME/bin:/usr/bin:/usr/local/mysql/bin:/root/bin
EOF
3.3. 刷新配置文件
source /etc/profile
3.4. 驗證環境是否正確
java -version
java version "1.8.0_231"
Java(TM) SE Runtime Environment (build 1.8.0_231-b11)
4. RocketMQ啟動和關閉
4.1 創建數據存儲目錄
mkdir -p /usr/local/rocketmq/data
4.2 創建日志目錄
mkdir -p /usr/local/rocketmq/logs
4.3 修改broker.conf文件
vim /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/broker.conf
修改內容如下:
# 所屬集群名字(同一主從下: Master 和 Slave 名稱要一致)
brokerClusterName = DefaultCluster
# Broker 名字,注意此處不同的配置文件填寫的不一樣
brokerName = broker-a
# 0 表示 Master,> 0 表示 Slave
brokerId = 0
# Broker 對外服務的監聽端口, 如果一台機器上啟動了多個Broker,則要設置不同的端口號,避免沖突
listenPort = 10911
# nameServer地址,如果nameServer是多台集群的話,就用分號分割,比如
# namesrvAddr = 192.168.0.1:9876;192.168.0.2:9876
# namesrvAddr = 127.0.0.1:9876
namesrvAddr = 192.168.56.121:9876
brokerIP1 = 192.168.56.121
# 是否允許 Broker 自動創建Topic,建議線上關閉
autoCreateTopicEnable = true
# 是否允許 Broker 自動創建訂閱組,建議線上關閉
autoCreateSubscriptionGroup = true
# 與fileReservedTime參數呼應,表明在幾點做消息刪除動作,默認值04表示凌晨4點
deleteWhen = 04
# 在磁盤上保存消息的時長,單位是小時,自動刪除超時的消息
fileReservedTime = 48
# brokerRole有3種:SYNC_MASTER、ASYNC_MASTER、SLAVE
# 關鍵詞 SYNC 和 ASYNC 表示 Master 和 Slave 之間同步消息的機制
# SYNC 的意思是當 Slave 和 Master 消息同步完成后,再返回發送成功的狀態
brokerRole = ASYNC_MASTER
# 刷盤方式 ASYNC_FLUSH 異步刷盤; SYNC_FLUSH 同步刷盤
flushDiskType = ASYNC_FLUSH
#數據存儲位置
storePathRootDir = /usr/local/rocketmq/data
storePathCommitLog = /usr/local/rocketmq/logs
4.4 修改啟動腳本JVM內存大小
4.4.1 vim /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
4.4.2 vim /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn128m"
4.5 啟動namesrv 和 broker
[root@master bin]# nohup mqnamesrv -n 192.168.56.121:9876 &
[1] 2937
[root@master bin]# nohup: 忽略輸入並把輸出追加到"nohup.out"
[root@master bin]# nohup sh mqbroker -n 192.168.56.121:9876 autoCreateTopicEnable=true -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/broker.conf &
[2] 2968
[root@master bin]# nohup: 忽略輸入並把輸出追加到"nohup.out"
# 查看broker啟動配置:
[root@master bin]# sh mqbroker -m
[root@master bin]# jps
3030 Jps
2940 NamesrvStartup
2972 BrokerStartup
命令匯總
mqnamesrv -n 172.26.9.107
nohup mqnamesrv -n 172.26.9.107 &
nohup sh mqbroker -n 172.26.9.107:9876 autoCreateTopicEnable=true -c
nohup sh mqbroker -n 172.26.9.107:9876 autoCreateTopicEnable=true -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/broker.conf &
mqbroker -m
jps
設置開機啟動
#!/bin/bash
# THIS FILE IS ADDED FOR COMPATIBILITY PURPOSES
#
# It is highly advisable to create own systemd services or udev rules
# to run scripts during boot instead of using this file.
#
# In contrast to previous versions due to parallel execution during boot
# this script will NOT be run after all other services.
#
# Please note that you must run 'chmod +x /etc/rc.d/rc.local' to ensure
# that this script will be executed during boot.
touch /var/lock/subsys/local
#start redis
/usr/local/redis/bin/redis-server /usr/local/redis/redis.conf
#start nginx
#/usr/bin/su - root -c "/usr/local/openresty/nginx/sbin/nginx -c /usr/local/openresty/nginx/conf/nginx.conf"
# 啟動 zookeeper
/usr/bin/su - root -c "/work/zookeeper/zookeeper_01/bin/zkServer.sh start"
sleep 5s
/usr/bin/su - root -c "/work/zookeeper/zookeeper_02/bin/zkServer.sh start"
# 啟動 Nacos
/usr/bin/su - root -c "/work/nacos/bin/startup.sh -m standalone"
#start springcloud
sleep 10s
/usr/bin/su - root -c "/work/cloud-eureka-1.0-SNAPSHOT/bin/start.sh start"
sleep 20s
/usr/bin/su - root -c "/work/cloud-config-1.0-SNAPSHOT/bin/start.sh start"
# 啟動 kafka
#/usr/bin/su - root -c "nohup /work/kafka_2.11-1.0.2/bin/kafka-server-start.sh /work/kafka_2.11-1.0.2/config/server.properties 2>&1 &"
#啟動 sentinel
/usr/bin/su - root -c "nohup java -server -Xms64m -Xmx256m -Dserver.port=8849 -Dcsp.sentinel.dashboard.server=localhost:8849 -Dproject.name=sentinel-dashboard -jar /work/sentinel-dashboard-1.7.1.jar 2>&1 &"
sleep 2s
# 啟動 rocketmq namenode
#/usr/bin/su - root -c "nohup mqnamesrv -n 192.168.56.121:9876 &"
sleep 30s
# 啟動 rocketmq broker
#/usr/bin/su - root -c "nohup sh mqbroker -n 192.168.56.121:9876 autoCreateTopicEnable=true -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/broker.conf &"
4.6 查看啟動日志記錄
tail -200f ~/logs/rocketmqlogs/namesrv.log
tail -200f ~/logs/rocketmqlogs/broker.log
4.7 消息發送和消費測試
# 設置NameServer地址
export NAMESRV_ADDR=192.168.56.121:9876
/usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/distribution/target/apache-rocketmq
# 測試發送端
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
# 測試消費端
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
4.8 關閉namesrv 和 broker
[root@master bin]# sh mqshutdown broker
The mqbroker(2972) is running...
Send shutdown request to mqbroker(2972) OK
[root@master bin]# sh mqshutdown namesrv
The mqnamesrv(2940) is running...
Send shutdown request to mqnamesrv(2940) OK
5. rocketmq-console安裝和使用
5.1 從github上clone代碼
git clone https://github.com/apache/rocketmq-externals.git
5.2 用Maven編譯源碼
rocketmq-externals/rocketmq-console
mvn clean package -Dmaven.test.skip=true
5.3 生成的jar文件目錄
rocketmq-externals/rocketmq-console/target/rocketmq-console-ng-1.0.1.jar
5.4 執行jar文件
復制到/usr/local/rocketmq/,然后執行:
java -jar /usr/local/rocketmq/rocketmq-console-ng-1.0.1.jar --rocketmq.config.namesrvAddr='192.168.56.121:9876'
5.5 關閉防火牆
# 關閉防火牆
systemctl stop firewalld.service
# 查看防火牆狀態
firewall-cmd --state
5.6 訪問鏈接
http://192.168.100.129:8080/#/

6. Java示例代碼
6.1 生產者示例代碼
private void startProducer() {
//指定NameServer地址
producer.setNamesrvAddr(CDH_1_9876); //修改為自己的
producer.setInstanceName("Instance1");
producer.setRetryTimesWhenSendFailed(3);
/**
* Producer對象在使用之前必須要調用start初始化,初始化一次即可
* 注意:切記不可以在每次發送消息時,都調用start方法
*/
try {
producer.start();
System.out.println("product start ...");
} catch (MQClientException e) {
e.printStackTrace();
}
}
6.2 消費者示例代碼
private void startConsumer() {
//指定NameServer地址,多個地址以 ; 隔開
consumer.setNamesrvAddr(CDH_1_9876); //修改為自己的
// consumer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改為自己的
/**
* 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
* 如果非第一次啟動,那么按照上次消費的位置繼續消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try {
consumer.subscribe(TOPIC_TEST, "*");
} catch (MQClientException e) {
e.printStackTrace();
return;
}
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (int i = 0; i < msgs.size(); i++) {
MessageExt msg = msgs.get(i);
System.out.println(msg.getTopic() + " " + msg.getTags() + " " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
try {
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
完整的代碼:
package com.crazymaker.springcloud.stock.controller;
import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.result.RestOut;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping("/api/crazymaker/rockmq/")
@Api(tags = "消息管理")
public class RockmqMessageController implements ApplicationContextAware {
public static final String TOPIC_TEST = "TopicTest";
public static final String CDH_1_9876 = "192.168.56.121:9876";
// public static final String CDH_1_9876 = "cdh1:9876";
DefaultMQProducer producer = new DefaultMQProducer("producer_demo");
/**
* Consumer Group,非常重要的概念,后續會慢慢補充
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo");
@PostMapping("/msg/send/v1")
@ApiOperation(value = "發送rockmq消息")
public RestOut<String> simpleSend(@RequestBody String content) {
try {
//構建消息
Message msg = new Message(TOPIC_TEST /* Topic */,
"TagA" /* Tag */,
(content).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//發送同步消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
throw BusinessException.builder().errMsg(e.getMessage()).build();
}
return RestOut.success("發送完成");
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
startProducer();
startConsumer();
}
private void startProducer() {
//指定NameServer地址
producer.setNamesrvAddr(CDH_1_9876); //修改為自己的
producer.setInstanceName("Instance1");
producer.setRetryTimesWhenSendFailed(3);
/**
* Producer對象在使用之前必須要調用start初始化,初始化一次即可
* 注意:切記不可以在每次發送消息時,都調用start方法
*/
try {
producer.start();
System.out.println("product start ...");
} catch (MQClientException e) {
e.printStackTrace();
}
}
private void startConsumer() {
//指定NameServer地址,多個地址以 ; 隔開
consumer.setNamesrvAddr(CDH_1_9876); //修改為自己的
// consumer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改為自己的
/**
* 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
* 如果非第一次啟動,那么按照上次消費的位置繼續消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try {
consumer.subscribe(TOPIC_TEST, "*");
} catch (MQClientException e) {
e.printStackTrace();
return;
}
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (int i = 0; i < msgs.size(); i++) {
MessageExt msg = msgs.get(i);
System.out.println(msg.getTopic() + " " + msg.getTags() + " " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
try {
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
swagger發送界面

6.3 控制台截圖

集群模式部署
此處就RocketMQ的多Master多Slave的模式在Linux服務器部署案例進行詳細的說明,如系統部署結構圖所示。
1本次部署環境
Linux服務器192.168.162.235、192.168.162.236兩台(下文均簡稱235、236),詳細部署環境示意表如下:


2 編輯Hosts
分別修改235 和236 的hosts 文件
sudo vim /etc/hosts
IP NAME
192.168.162.235 nameserver1
192.168.162.235 master1
192.168.162.235 master1-slave1
192.168.162.236 nameserver2
192.168.162.236 master2
192.168.162.236 master2-slave2
注:修改hosts 文件需獲得sudo 權限,本機用戶是rocketMQ非root用戶, 故申請了堡壘機權限(即獲得root權限)。
3 下載官方源碼
下載官方RocketMQ壓縮包,下載地址:http://rocketmq.apache.org/release_notes/release-notes-4.2.0/,並選擇Download the 4.2.0 release 選項的 rocketmq-all-4.2.0-bin-release.zip下載。(其他如source為需要自己編譯的版本)
4 上傳到Linux並解壓
分別上傳rocketmq-all-4.2.0-bin-release.zip到235和236服務器的/home/rocketMQ/ZHF/rocketMQ-2m2s/目錄下:
cd /home/rocketMQ/ZHF/rocketMQ-2m2s/tar –zxvf rocketmq-all-4.2.0-bin-release.zip
- 創建持久化存儲目錄
Master目錄設置:
mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store
mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store/commitlog
mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store/consumequeue
mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store/index
Slave目錄設置:
mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store-s
mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/commitlog
mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/consumequeue
mkdir /home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/index
6 RocketMQ配置文件
235服務器設置:
sudo vim /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-a.properties
sudo vim /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-b-s.properties
236服務器設置:
sudo vim /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-b.properties
sudo vim /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-a-s.properties
broker-a.properties文件配置
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#brokerClusterName=DefaultCluster
#brokerName=broker-a
#brokerId=0
#deleteWhen=04
#fileReservedTime=48
#brokerRole=ASYNC_MASTER
#flushDiskType=ASYNC_FLUSH
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分號分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=10911
haListenPort=10912
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=18
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/home/rocketMQ/ZHF/rocketMQ-2m2s/store
#commitLog 存儲路徑
storePathCommitLog=/home/rocketMQ/ZHF/rocketMQ-2m2s/store/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/home/rocketMQ/ZHF/rocketMQ-2m2s/store/consumequeue
#消息索引存儲路徑
storePathIndex=/home/rocketMQ/ZHF/rocketMQ-2m2s/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/home/rocketMQ/ZHF/rocketMQ-2m2s/store/checkpoint
#abort 文件存儲路徑
abortFile=/home/rocketMQ/ZHF/rocketMQ-2m2s/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步復制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
#強制指定本機IP,需要根據每台機器進行修改。官方介紹可為空,系統默認自動識別,但多網卡時IP地址可能讀取錯誤
brokerIP1=192.168.162.235
broker-a-s.properties文件配置
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#brokerClusterName=DefaultCluster
#brokerName=broker-b
#brokerId=1
#deleteWhen=04
#fileReservedTime=48
#brokerRole=SLAVE
#flushDiskType=ASYNC_FLUSH
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分號分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=10923
haListenPort=10924
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=18
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/home/rocketMQ/ZHF/rocketMQ-2m2s/store-s
#commitLog 存儲路徑
storePathCommitLog=/home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/consumequeue
#消息索引存儲路徑
storePathIndex=/home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/index
#checkpoint 文件存儲路徑
storeCheckpoint=/home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/checkpoint
#abort 文件存儲路徑
abortFile=/home/rocketMQ/ZHF/rocketMQ-2m2s/store-s/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步復制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=SLAVE
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
#強制指定本機IP,需要根據每台機器進行修改。官方介紹可為空,系統默認自動識別,但多網卡時IP地址可能讀取錯誤
brokerIP1=192.168.162.235
broker-b.properties文件配置
參考broker-a.properties
broker-b-s.properties文件配置
參考broker-a-s.properties
7 啟動參數設置
RocketMQ啟動文件位於/home/rocketMQ/ZHF/rocketMQ-2m2s/bin/目錄下,Linux中nameserver啟動文件為:mqnamesrv,broker啟動文件為:mqbroker,mqnamesrv和mqbroker啟動文件分別調用了runserver.sh和runbroker.sh文件,這兩個文件分別設置了nameserver和broker的啟動內存,目前內存啟動參數分別為nameserver啟動內存4G,最大內存4G,新生代2G,broker啟動內存8G,最大內存8G,新生代4G。
8 端口及防火牆設置
RokcetMQ啟動默認使用3個端口9875,10911,10912,三個端口分別代表nameserver服務器端口,broker端口,broker HA端口。需注意的是在多Master多Slave模式下10911和10912是Master的使用端口,但Slave端口的設置與Master的端口不同,具體端口約束為:Slave - Master > 2,否則可能導致同一台服務器無法同時啟動Master和Slave。
如果服務器啟動了防火牆,為了端口不被屏蔽,需將Master和Slave對應端口加入到iptables表以開放對應端口號,添加完成后重啟防火牆。命令行開放端口操作如下:
分別打開235和236終端,在root用戶下執行命令:
開放端口:
/sbin/iptables -A INPUT -m state --state NEW -m tcp -p tcp --dport 9876 -j ACCEPT
/sbin/iptables -A INPUT -m state --state NEW -m tcp -p tcp --dport 10911 -j ACCEPT
/sbin/iptables -A INPUT -m state --state NEW -m tcp -p tcp --dport 10912 -j ACCEPT
/sbin/iptables -A INPUT -m state --state NEW -m tcp -p tcp --dport 10923 -j ACCEPT
/sbin/iptables -A INPUT -m state --state NEW -m tcp -p tcp --dport 10924 -j ACCEPT
保存:
/etc/rc.d/init.d/iptables save
重啟:
/etc/init.d/iptables restart
查看端口開放情況:
/sbin/iptables -L -n
9 啟動Nameserver
分別啟動235、236的Nameserver
cd /home/rocketMQ/ZHF/rocketMQ-2m2s/bin/nohup sh mqnamesrv &
10.啟動Broker
235上Master啟動:
nohup sh mqbroker -c /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-a.properties
236上Master啟動:
nohup sh mqbroker -c /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-b.properties
235上對應236Master的Slave啟動:
nohup sh mqbroker -c /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-b-s.properti
236上對應235Master的Slave啟動:
nohup sh mqbroker -c /home/rocketMQ/ZHF/rocketMQ-2m2s/conf/2m-2s-async/broker-a-s.properti
至此,Nameserver、Broker啟動完成,可以用jobs命令查看當前運行進程,如下是服務端相關shutdown,即在bin目錄下:
sh mqshutdown namesrvsh
sh mqshutdown broker
六、RocketMQ監控平台部署
Apache版的RocketMQ管理界面部署工具可以從github上下載源碼,地址:https://github.com/apache/rocketmq-externals,部署流程如下:
- 修改配置文件,關聯rocketMQ集群到管理界面
首先解壓並進入解壓后rockemq-externals-master目錄rocketmq-externals-master/rocketmq-externals-master/rocketmq-console/src/main/resources,修改目錄下application.properties配置文件內容如下圖:

- 編譯rocketmq-console
mvn clean package -Dmaven.test.skip=true
編譯需用maven命令進行編譯,如下圖,顯示BIUD SUCCESS,則編譯成功,成功后會在rocketmq-externals-master/rocketmq-console/target目錄下產生一個rocketmq-console-ng-1.0.0.jar文件。

- 將編譯好的rocketmq-console-ng-0.0.jar包上傳linux服務器
這里上傳服務器地址為192.168.162.235,路徑為:/home/rocketMQ/ZHF/
- 運行jar包
java -jar target/rocketmq-console-ng-1.0.0.jar
運行顯示下圖則啟動成功:
- 訪問管理界面
瀏覽器輸入http://192.168.162.235:8080/回車顯示監控界面如下:

原文:https://blog.csdn.net/tubunanhai/article/details/81738416
RocketMQ的開發快速入門
1、引入 rocketmq-client
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.1.0-incubating</version>
</dependency>
2、編寫Producer
DefaultMQProducer producer = new DefaultMQProducer("producer_demo");
//指定NameServer地址
producer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改為自己的
/**
* Producer對象在使用之前必須要調用start初始化,初始化一次即可
* 注意:切記不可以在每次發送消息時,都調用start方法
*/
producer.start();
for (int i = 0; i < 997892; i++) {
try {
//構建消息
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("測試RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//發送同步消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
3、編寫Consumer
/**
* Consumer Group,非常重要的概念,后續會慢慢補充
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo");
//指定NameServer地址,多個地址以 ; 隔開
consumer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改為自己的
/**
* 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
* 如果非第一次啟動,那么按照上次消費的位置繼續消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg:msgs){
String msgbody = new String(msg.getBody(), "utf-8");
System.out.println(" MessageBody: "+ msgbody);//輸出消息內容
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
4、說明
各位根據自己的環境,修改NamesrvAddr的值,我的集群請參考:RocketMQ集群部署配置。稍后通過RocketMQ管控台就可以看到之前搭建的多Master多Slave模式,異步復制集群模式。
5、通過RocketMQ管控台
rocketmq-console-ng獲取方式為:rocketmq-console-ng,之后通過mavne進行編譯獲取jar,命令如下:
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar
得到rocketmq-console-ng-1.0.0.jar之后,找到rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties文件,根據自己的NamesrvAddr進行修改rocketmq.config.namesrvAddr的值。
直接啟動:
java -jar rocketmq-console-ng-1.0.0.jar

管控台是基於springboot的,的確springboot非常方便和非常火了,所以有必要去學習下springboot了(其實還是spring系列,所以spring也必要深入學習下),稍后通過管控台進行觀察運行。
6、運行觀察
一個好的習慣是先運行Consumer,之后在運行Producer,之后通過rocketmq-console-ng管控台觀察

運行完成之后,的確broker-a的數據加上broker-b的數據量就等於我們發送的數據量,而且slave的數量也master的數量也是一致的,效果如下:

查看發送這些數據,2台機器的磁盤情況如下:


到目前位置,關於RocketMQ快速入門就結束了,未完待續……
參考:
https://www.pianshen.com/article/1215649056/
https://blog.csdn.net/weiwenhou/article/details/100869824
https://www.cnblogs.com/qdhxhz/p/11094624.html
https://blog.csdn.net/linyaogai/article/details/77876078
https://www.pianshen.com/article/1215649056/
https://blog.csdn.net/weiwenhou/article/details/100869824
https://www.cnblogs.com/qdhxhz/p/11094624.html
https://blog.csdn.net/linyaogai/article/details/77876078
回到◀瘋狂創客圈▶
瘋狂創客圈 - Java高並發研習社群,為大家開啟大廠之門
