搭建RocketMQ
系統環境准備
- 64位操作系統,推薦使用Linux、Unix、MacOS
- 64位 JDK1.8+
- Maven 3.2.x
- 適用於Broker服務器的4g +可用磁盤
下載與搭建
下載
從Apache RocketMQ的官網【http://rocketmq.apache.org/ 】可以進入RocketMQ的下載地址,目前最新的版本為4.5.2【http://rocketmq.apache.org/release_notes/release-notes-4.5.2/ 】,下載Binary文件即可
搭建
-
解壓縮
unzip rocketmq-all-4.5.2-bin-release.zip
-
切換目錄
cd rocketmq-all-4.5.2-bin-release
-
啟動Name Server
nohup sh bin/mqnamesrv &
驗證是否啟動成功
tail -f ~/logs/rocketmqlogs/namesrv.log
如果啟動成功,可以看見如下日志:
2019-10-24 16:20:48 INFO FileWatchService - FileWatchService service started
2019-10-24 16:20:48 INFO main - The Name Server boot success. serializeType=JSON -
啟動Broker
nohup sh bin/mqbroker -n localhost:9876 &
驗證是否啟動成功
tail -f ~/logs/rocketmqlogs/broker.log
如果啟動成功,可以看見如下日志:
2019-10-25 10:46:16 INFO brokerOutApi_thread_2 - register broker[0]to name server localhost:9876 OK
PS:默認啟動Broker要求的內存非常的高,往往會因為內存不足的原因導致broker啟動失敗:
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
這時,需要調整bin文件夾中的 runbroker.sh文件,調整,修改:
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
發送消息與接收消息
發送消息【生產消息】
執行如下命令
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
可以看見如下輸出
SendResult [sendStatus=SEND_OK, msgId=C0A81F650E80511D50C07DE6317903E7, offsetMsgId=C0A81F6500002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=fanxuan-TM1604, queueId=3], queueOffset=249]
接收消息【消息消息】
執行如下命令
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
可以看見如下輸出
ConsumeMessageThread_20 Receive New Messages: [MessageExt [queueId=1, storeSize=180, queueOffset=174, sysFlag=0, bornTimestamp=1571971437603, bornHost=/192.168.31.101:49768, storeTimestamp=1571971437604, storeHost=/192.168.31.101:10911, msgId=C0A81F6500002A9F000000000001E9A6, commitLogOffset=125350, bodyCRC=1257100577, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1571971518424, UNIQ_KEY=C0A81F650E80511D50C07DE6302302B9, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 54, 57, 55], transactionId='null'}]]
關閉服務器
停止 broker:sh bin/mqshutdown broker
停止 nameserver:sh bin/mqshutdown namesrv
搭建RocketMQ控制台
下載源碼
git clone https://github.com/apache/rocketmq-externals.git
代碼修改
修改配置
修改rocketmq-externals/rocketmq-console/src/main/resources/application.properties文件:
- server.port:默認端口8080,容易與其他端口沖突,建議修改
- rocketmq.config.namesrvAddr:name server地址,localhost:9876
修改依賴
修改rocketmq-externals/rocketmq-console/pom.xml文件:
將rocketmq.version改為你自己的版本
PS:建議使用4.5.1,4.5.2版本會導致MQAdminExtImpl報錯
修改代碼
找到org.apache.rocketmq.console.service.impl.MessageServiceImpl文件,修改queryMessageByTopic方法中的
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
為:
RPCHook rpcHook = null;
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
另外需要引入包:
import org.apache.rocketmq.remoting.RPCHook;
編譯啟動
打開終端,切換到rocketmq-console目錄下,重新構建項目:
mvn clean package -DskipTests
編譯成功:
[INFO] --- maven-source-plugin:3.0.1:jar (attach-sources) @ rocketmq-console-ng ---
[INFO] Building jar: /home/fanxuan/Server/RocketMQ/rocketmq-externals/rocketmq-console/target/rocketmq-console-ng-1.0.1-sources.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 12.404 s
[INFO] Finished at: 2019-10-25T12:35:19+08:00
[INFO] ------------------------------------------------------------------------
在target文件夾中找到新編譯好的包,啟動:
java -jar rocketmq-console-ng-1.0.1.jar
啟動成功之后,瀏覽器,輸入localhost:8888【修改配置中修改的server.port值】
RocketMQ控制台說明
apache官方文檔:https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md 詳細介紹了RocketMQ使用文檔
運維
- 你可以修改這個服務使用的navesvr的地址(enter,更新)
- 你可以修改這個服務是否使用VIPChannel(如果你的mq server版本小於3.5.8,請設置不使用)
駕駛艙
- 查看broker的消息量(總量/5分鍾圖)
- 查看單一主題的消息量(總量/趨勢圖)
集群
- 查看集群的分布情況
- cluster與broker關系
- broker
- 查看broker具體信息/運行信息
- 查看broker配置信息
主題
- 展示所有的主題,可以通過搜索框進行過濾
- 篩選 普通/重試/死信 主題
- 添加/更新主題
- clusterName 創建在哪幾個cluster上
- brokerName 創建在哪幾個broker上
- topicName 主題名
- writeQueueNums 寫隊列數量
- readQueueNums 讀隊列數量
- perm //2是寫 4是讀 6是讀寫
- 狀態 查詢消息投遞狀態(投遞到哪些broker/哪些queue/多少量等)
- 路由 查看消息的路由(現在你發這個主題的消息會發往哪些broker,對應broker的queue信息)
- CONSUMER管理(這個topic都被哪些group消費了,消費情況何如)
- topic配置(查看變更當前的配置)
- 發送消息(向這個主題發送一個測試消息)
- 重置消費位點(分為在線和不在線兩種情況,不過都需要檢查重置是否成功)
- 刪除主題 (會刪除掉所有broker以及namesvr上的主題配置和路由信息)
消費者頁面
- 展示所有的消費組,可以通過搜索框進行過濾
- 刷新頁面/每隔五秒定時刷新頁面
- 按照訂閱組/數量/TPS/延遲 進行排序
- 添加/更新消費組
- clusterName 創建在哪幾個集群上
- brokerName 創建在哪幾個broker上
- groupName 消費組名字
- consumeEnable //是否可以消費 FALSE的話將無法進行消費
- consumeBroadcastEnable //是否可以廣播消費
- retryQueueNums //重試隊列的大小
- brokerId //正常情況從哪消費
- whichBrokerWhenConsumeSlowly//出問題了從哪消費
- 終端 在線的消費客戶端查看,包括版本訂閱信息和消費模式
- 消費詳情 對應消費組的消費明細查看,這個消費組訂閱的所有Topic的消費情況,每個queue對應的消費client查看(包括Retry消息)
- 配置 查看變更消費組的配置
- 刪除 在指定的broker上刪除消費組
發布管理頁面
- 通過Topic和Group查詢在線的消息生產者客戶端
- 信息包含客戶端主機 版本
消息查詢頁面
- 根據Topic和時間區間查詢 *由於數據量大 做多只會展示2000條,多的會被忽略
- 根據Topic和Key進行查詢
- 最多只會展示64條
- 根據消息主題和消息Id進行消息的查詢
- 消息詳情可以展示這條消息的詳細信息,查看消息對應到具體消費組的消費情況(如果異常,可以查看具體的異常信息)。可以向指定的消費組重發消息。
HTTPS 方式訪問Console
- HTTPS功能實際上是使用SpringBoot提供的配置功能即可完成,首先,需要有一個SSL KeyStore來存放服務端證書,可以使用本工程所提供的測試密鑰庫:
resources/rmqcngkeystore.jks, 它可以通過如下keytool命令生成
#生成庫並以rmqcngKey別名添加秘鑰
keytool -genkeypair -alias rmqcngKey -keyalg RSA -validity 3650 -keystore rmqcngkeystore.jks
#查看keystore內容
keytool -list -v -keystore rmqcngkeystore.jks
#轉換庫格式
keytool -importkeystore -srckeystore rmqcngkeystore.jks -destkeystore rmqcngkeystore.jks -deststoretype pkcs12
- 配置resources/application.properties, 打開SSL的相關選項, 啟動console后即開啟了HTTPS.
#設置https端口
server.port=8443
### SSL setting
#server.ssl.key-store=classpath:rmqcngkeystore.jks
#server.ssl.key-store-password=rocketmq
#server.ssl.keyStoreType=PKCS12
#server.ssl.keyAlias=rmqcngkey
登錄訪問Console
在訪問Console時支持按用戶名和密碼登錄控制台,在操作完成后登出。需要做如下的設置:
- 在Spring配置文件resources/application.properties中修改 開啟登錄功能
# 開啟登錄功能
rocketmq.config.loginRequired=true
# Dashboard文件目錄,登錄用戶配置文件所在目錄
rocketmq.config.dataPath=/tmp/rocketmq-console/data
- 確保${rocketmq.config.dataPath}定義的目錄存在,並且該目錄下創建登錄配置文件"users.properties", 如果該目錄下不存在此文件,則默認使用resources/users.properties文件。
users.properties文件格式為:
# 該文件支持熱修改,即添加和修改用戶時,不需要重新啟動console
# 格式, 每行定義一個用戶, username=password[,N] #N是可選項,可以為0 (普通用戶); 1 (管理員)
#定義管理員
admin=admin,1
#定義普通用戶
user1=user1
user2=user2
- 啟動控制台則開啟了登錄功能