本文主要描述使用Idea獲取rocketMQ源碼及源碼的讀取。
在演示搭建源碼環境前,先簡要描述一下RocketMQ的設計目標。
1、架構模式
和大多數消息中間件一樣,采用的是發布訂閱模式,基本組件包括:消息發送者、消息服務器(消息存儲)、消息消費和路由發現
2、順序消息
rocketMQ保證嚴格的順序消息(消息到達服務器的時間)
3、消息過濾
rocketMQ既支持在broker端的消息過濾,也支持在消費端的消息過濾
4、消息存儲
rocketMQ引入了內存機制,保證消息存儲的高性能;同時將所有的topic數據存在一個文件中,保證查找消息的高效率;同時為了避免文件積累,引入了文件過期機制和存儲空間報警機制。
5、消息高可用
rocketMQ使用同步刷盤保證了異常宕機情況下數據不丟失,異步刷盤可能有少量數據丟失;單點故障情況,如果開啟異步復制機制,則能保證只丟少量數據;后續會引入雙寫機制,以滿足可靠性極高的場合。
6、消息到底低延遲
RocketMQ在不發生消息堆積時,以長輪詢模式實現准實時的消息推送模式。
7、確保消息必須被消費一次
通過消息消費確認機制(ACK)來確保消息至少被消費一次(由於ACK信息可能會丟失等原因,所以rocket MQ無法做到准確的只被消費一次,因此有可能會有重復消費的可能)
8、回溯消息
rocketMQ支持按時間的消息回溯(已經被消費的數據,可以重新進行消費),可以精確到毫秒
9、消息堆積
rocketMQ使用的是磁盤存儲,同時使用了內存映射機制,並且存儲的物理文件為多個大小相等的邏輯文件,只要磁盤夠大,就可以無限存儲(實際上,如第4點所述,rocketMQ還提供了文件過期機制和存儲空間報警機制,默認保留3天)
10、定時消息
定時消息實際就是不立即消費的消息,到達一定時間后才會被消費,rocketMQ不支持任意進度的定時消息,而只zhi'chi特定的延遲級別(這是因為特定的延遲級別會有對應的隊列)
11、消息重試機制
消息如果被消費失敗,則會重新投遞消息
然后就是搭建閱讀源碼環境
1、獲取源碼
VCS --> Checkout from Version Control --> Git

輸入git地址:https://github.com/apache/rocketmq.git ,由於我是將項目fork到我本人的gitHub目錄下,所以我這里添加我自己的git地址:https://github.com/menglongdeye/rocketmq.git
gitHub reocketMQ項目地址:https://github.com/apache/rocketmq

然后選擇Maven

直接下一步

勾選JDK8,下一步

繼續下一步

繼續下一步

完成

編輯和下載依賴包


RocketMQ核心目錄說明:
(1)acl:訪問控制,包含用戶、資源、權限、角色等信息。
(2)broker:broker模塊(broker啟動進程)
(3)client:消息客戶端,包含消息生產者、消費者相關類
(4)common:公共包
(5)dev:開發者信息(非源碼)
(6)distribution:部署實例文件夾(非源碼)
(7)docs:存儲對各個模塊的說明和一些流程圖片,分為中文和英文兩種語音描述
(8)example:示例代碼
(9)filter:消息過濾基礎類
(10)logappender:日志實現相關類
(11)namesrv:namesrv相關實現類(NameSrv啟動進程)
(12)openmessaging:消息開放標准
(13)remoting:遠程通信模塊,基於Netty
(14)srvutil:服務器工具類
(15)store:消息存儲實現類相關
(16)style:checkstyle相關實現
(17)test:測試相關類
(18)tools:工具類,監控命令相關實現類
2、調試源碼
啟動nameSrv
配置ROCKETMQ_HOME


name 為 ROCKETMQ_HOME, value為源碼保存路徑(與源碼目錄同層級)

在 ROCKETMQ_HOME創建 conf、logs、store三個文件夾
將distribution項目中broker.conf、logback_namesrv.xml和logback_broker.xml復制到剛才創建的conf文件夾下。
修改broker.conf配置文件,新增以下配置項
# 存儲路徑
storePathRootDir=D:\\workSpace\\store
# commitlog存儲路徑
storePathCommitLog=D:\\workSpace\\store\\commitlog
# 消費隊列存儲路徑
storePathConsumeQueue=D:\\workSpace\\store\\comsumequeue
# 消息索引存儲位置
storePathIndex=D:\\workSpace\\store\\index
# checkpoint文件存儲位置
storeCheckPoint=D:\\workSpace\\store\\checkpoint
# abort文件存儲位置
abortFile=D:\\workSpace\\store\\abort
namesrvAddr=127.0.0.1:9876
啟動namesrv,當輸出 The Name Server boot success. serializeType=JSON 時,說明啟動成功。

3、啟動broker
配置ROCKETMQ_HOME和broker配置文件

啟動成功(會顯示鏈接nameSrv的地址)

4、使用rocketmq提供的樣例發送消息和消費消息
(1)發送消息
更改example\src\main\java\org\apache\rocketmq\example\quickstart\Producer.java 中 namesrv地址

啟動

(2)消費消息
啟動D:\workSpace\rocketMQ\example\src\main\java\org\apache\rocketmq\example\quickstart\Consumer.java(需要配置namesrv地址)

可以看到消息消費成功。
