一、rocketMQ簡介
RocektMQ是阿里巴巴在2012年開源的一個純java、分布式、隊列模型的第三代消息中間件。
2016年11月11號,雙十一大促見證了RocketMQ低延遲存儲架構的成功試水,99.996%的延遲落在了10ms以內,極個別由於GC引發的停頓在50ms以內,其高性能、低延時和高可靠的特性承載了近年來雙十一17萬筆/秒的交易峰值,在整個生產鏈路上都有着穩定和出色的表現。其在同年捐贈給Apache后正式進入孵化期。並於2017年9月RocketMQ正式從Apache社區正式畢業,成為Apache頂級項目。
二、相關術語
1、消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。Broker 在實際部署過程中對應一台服務器,每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲於不同的 Broker。Message Queue 用於存儲消息的物理地址,每個Topic中的消息地址存儲於多個 Message Queue 中。Consumer Group 由多個Consumer 實例構成。
2、消息生產者(Producer)
Producer:消息生產者,負責產生消息,一般由業務系統負責產生消息。
Producer Group:一類Producer的集合名稱,這類Producer通常發送一類消息,且發送邏輯一致。
一個消息生產者會把業務應用系統里產生的消息發送到broker服務器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker返回確認信息,單向發送不需要。
3、消息消費者(Consumer)
Consumer:消息消費者,負責消費消息,一般是后台系統負責異步消費。
Push Consumer:Consumer的一種,應用通常向Consumer對象注冊一個Listener接口,一旦收到消息,Consumer對象立刻回調Listener接口方法。該消費模式一般實時性較高。
Pull Consumer:Consumer的一種,應用通常主動調用Consumer的拉消息方法從Broker拉消息,主動權由應用控制。
Consumer Group:一類Consumer的集合名稱,這類Consumer通常消費一類消息,且消費邏輯一致。
4、代理服務器(Broker)
消息中轉角色,負責存儲消息,轉發消息,一般也稱為Server。
Master:Broker中的主節點。
Slave:Broker中的副節點。
5、名字服務(Name Server)
名稱服務充當路由消息的提供者。生產者或消費者能夠通過名字服務查找各主題相應的Broker IP列表。多個Namesrv實例組成集群,集群中Nameserver互相獨立,彼此沒有通信關系,單台Nameserver掛掉,不影響其他Nameserver,即使全部掛掉,也不影響業務系統使用。而且Nameserver不會有頻繁的讀寫,所以性能開銷非常小,穩定性很高。
6、主題(Topic)
表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬於一個主題,是RocketMQ進行消息訂閱的基本單位。
7、消息(Message)
消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬於一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了通過Message ID和Key查詢消息的功能。
8、消息隊列(Message Queue)
在RocketMQ中,所有消息隊列都是持久化,長度無限的數據結構,所謂長度無限是指隊列中的每個存儲單元都是定長,訪問其中的存儲單元使用Offset來訪問,offset為java long類型,64位,理論上在100年內不會溢出,所以認為是長度無限,另外隊列中只保存最近幾天的數據,之前的數據會按照過期時間來刪除。也可以認為Message Queue是一個長度無限的數組,offset就是下標。
9、消費類型
廣播消費:一條消息被多個Consumer消費,即使這些Consumer屬於同一個Consumer Group,消息也會被Consumer Group中的每個Consumer都消費一次,廣播消費中的Consumer Group概念可以認為在消息划分方面無意義。
集群消費:一個Consumer Group中的Consumer實例平均分攤消費消息。例如某個Topic有9條消息,其中一個Consumer Group有3個實例(可能是3個進程,或者3台機器),那么每個實例只消費其中的3條消息。
10、消息順序
普通順序消息:消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。主要指的是局部順序,即一類消息為滿足順序性,必須Producer單線程順序發送,且發送到同一個隊列,這樣Consumer就可以按照Producer發送的順序去消費消息。但是一旦發生通信異常,Broker重啟,由於隊列總數發生變化,哈希取模后定位的隊列會變化,產生短暫的消息順序不一致。
嚴格順序消息:順序消息的一種,無論正常異常情況都能保證順序,但是犧牲了分布式Failover特性,即Broker集群中只要有一台機器不可用,則整個集群都不可用,服務可用性大大降低。如果服務器部署為同步雙寫模式,此缺陷可通過備機自動切換為主避免,不過仍然會存在幾分鍾的服務不可用。
11、消息寫入機制
異步復制:消息寫入master節點,再由master節點異步復制到slave節點,類似mysql中的master-slave機制。
同步雙寫:消息同時寫入master節點和slave節點。
12、持久化策略
異步刷盤:Broker的一種持久化策略,消息寫入pagecache后,直接返回。由異步線程負責將pagecache寫入硬盤。
同步刷盤:Broker的一種持久化策略,消息寫入pagecache后,由同步線程將pagecache寫入硬盤后,再返回。
三、RocketMQ集群部署模式
RocketMQ作為消息中間件,其主要功能為消息的Publish/Subscribe。而Broker擔任的消息轉發和存儲功能,其部署方式有很多種:
1、單Master
優點:除了配置簡單沒什么優點。
缺點:不可靠,該機器重啟或宕機,將導致整個服務不可用。
2、多Master
優點:配置簡單,性能最高。
缺點:可能會有少量消息丟失,單台機器重啟或宕機期間,該機器下未被消費的消息在機器恢復前不可訂閱,影響消息實時性。
3、異步多Master多Slave
每個Master配一個Slave,有多對Master-Slave,集群采用異步復制方式,主備有短暫消息延遲,毫秒級。
優點:性能同多Master幾乎一樣,實時性高,主備間切換對應用透明,不需人工干預。
缺點:Master宕機或磁盤損壞時會有少量消息丟失。
4、同步多Master多Slave
每個Master配一個Slave,有多對Master-Slave,集群采用同步雙寫方式,主備都寫成功,向應用返回成功。
優點:服務可用性與數據可用性非常高。
缺點:性能比異步集群略低,當前版本主宕備不能自動切換為主。
四、RocketMQ架構
首先我們來看看RocketMQ架構是怎樣的:
接下來看看各個組件如何啟動:
最后看看各個組件如何協作:
五、RocketMQ安裝部署
1、單機部署
首先我們來嘗試進行單機部署,本實例是在Mac環境下進行部署,Linux機器大致一致。
1.1、官方推薦環境
- 64bit OS, Linux/Unix/Mac is recommended;(Windows user see guide below)
- 64bit JDK 1.8+;
- Maven 3.2.x;
- Git;
- 4g+ free disk for Broker server
1.2、軟件包獲取
軟件包可以下載源文件進行編譯然后安裝,也可以直接下載二進制包進行解壓安裝,本文就以二進制包的安裝為例,版本:rocketmq-all-4.7.0-bin-release.zip。
下載地址為:http://rocketmq.apache.org/docs/quick-start/
中文的GitHub地址:https://github.com/apache/rocketmq/tree/master/docs/cn
1.3、JDK環境
本文JDK版本選擇 1.8:
HoudeMacBook-Pro:~ ******$ java -version java version "1.8.0_131" Java(TM) SE Runtime Environment (build 1.8.0_131-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)
注意選擇新的版本啟動可能出現以下問題:
Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. Unrecognized VM option 'UseCMSCompactAtFullCollection' Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit.
1.4、rocketmq服務端安裝
解壓安裝包:rocketmq-all-4.7.0-bin-release.zip。
unzip rocketmq-all-4.7.0-bin-release.zip
配置rocketmq的環境變量,在 ~/.bash_profile 最后加入:
export ROCKETMQ_HOME=/Volumes/work/rocketmq-all-4.7.0-bin-release export PATH=$PATH::$ROCKETMQ_HOME/bin
然后執行source命令使配置文件生效:
source ~/.bash_profile
給下列命令可運行權限:
HoudeMacBook-Pro:~ houjing$ cd $ROCKETMQ_HOME/bin HoudeMacBook-Pro:bin houjing$ pwd /Volumes/work/rocketmq-all-4.7.0-bin-release/bin HoudeMacBook-Pro:bin houjing$ chmod +x mqadmin mqbroker mqfiltersrv mqshutdown mqnamesrv
新建日志目錄:
HoudeMacBook-Pro:~ ******$ cd $ROCKETMQ_HOME HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release ******$ mkdir logs
1.5、啟動namesever
HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ nohup mqnamesrv 1>/Volumes/work/rocketmq-all-4.7.0-bin-release/logs/ng.log 2>/Volumes/work/rocketmq-all-4.7.0-bin-release/logs/ng-err.log & HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ ps -ef|grep rocket 501 3464 3224 0 8:54下午 ttys000 0:00.02 /bin/sh /Volumes/work/rocketmq-all-4.7.0-bin-release/bin/mqnamesrv 501 3465 3464 0 8:54下午 ttys000 0:00.01 sh /Volumes/work/rocketmq-all-4.7.0-bin-release/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup 501 3495 3465 0 8:54下午 ttys000 0:02.88 /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC -verbose:gc -Xloggc:/Volumes/RAMDisk/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m -XX:-OmitStackTraceInFastThrow -XX:-UseLargePages -Djava.ext.dirs=/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext:/Volumes/work/rocketmq-all-4.7.0-bin-release/bin/../lib -cp .:/Volumes/work/rocketmq-all-4.7.0-bin-release/bin/../conf:/lib/tools.jar:/lib/dt.jar org.apache.rocketmq.namesrv.NamesrvStartup
通過日志檢查是否啟動成功:
HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ tail -f /Volumes/work/rocketmq-all-4.7.0-bin-release/logs/ng.log Create RAMDisk /Volumes/RAMDisk for gc logging on Darwin OS. The Name Server boot success. serializeType=JSON
1.6、啟動broker
啟動broker,在啟動borker之前須要指定nameserver地址。例如本地單機部署 127.0.0.1 為所在服務器IP:
HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ export NAMESRV_ADDR=127.0.0.1:9876 HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ nohup mqbroker >/Volumes/work/rocketmq-all-4.7.0-bin-release/logs/mq.log & [2] 3584 HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ tail -f /Volumes/work/rocketmq-all-4.7.0-bin-release/logs/mq.log The broker[HoudeMacBook-Pro.local, 192.168.100.28:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
1.7、模擬生產者生產
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ export NAMESRV_ADDR=127.0.0.1:9876 HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 21:14:59.611 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. SendResult [sendStatus=SEND_OK, msgId=C0A8641C0E694D7E188675DB340D0000, offsetMsgId=C0A8641C00002A9F000000000002BEB2, messageQueue=MessageQueue [topic=TopicTest, brokerName=HoudeMacBook-Pro.local, queueId=1], queueOffset=250] SendResult [sendStatus=SEND_OK, msgId=C0A8641C0E694D7E188675DB34320001, offsetMsgId=C0A8641C00002A9F000000000002BF64, messageQueue=MessageQueue [topic=TopicTest, brokerName=HoudeMacBook-Pro.local, queueId=2], queueOffset=250] SendResult [sendStatus=SEND_OK, msgId=C0A8641C0E694D7E188675DB34350002, offsetMsgId=C0A8641C00002A9F000000000002C016, messageQueue=MessageQueue [topic=TopicTest, brokerName=HoudeMacBook-Pro.local, queueId=3], queueOffset=250]
1.8、模擬消費者進行消費
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 21:16:28.569 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework Consumer Started. ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=HoudeMacBook-Pro.local, queueId=1, storeSize=178, queueOffset=251, sysFlag=0, bornTimestamp=1592918100028, bornHost=/192.168.100.28:58376, storeTimestamp=1592918100029, storeHost=/192.168.100.28:10911, msgId=C0A8641C00002A9F000000000002C17A, commitLogOffset=180602, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=500, CONSUME_START_TIME=1592918188998, UNIQ_KEY=C0A8641C0E694D7E188675DB343C0004, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}]] ConsumeMessageThread_10 Receive New Messages: [MessageExt [brokerName=HoudeMacBook-Pro.local, queueId=1, storeSize=179, queueOffset=254, sysFlag=0, bornTimestamp=1592918100065, bornHost=/192.168.100.28:58376, storeTimestamp=1592918100066, storeHost=/192.168.100.28:10911, msgId=C0A8641C00002A9F000000000002C9D8, commitLogOffset=182744, bodyCRC=1659149091, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=500, CONSUME_START_TIME=1592918188998, UNIQ_KEY=C0A8641C0E694D7E188675DB34600010, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 54], transactionId='null'}]]
1.9、關閉RockectMQ
mqshutdown namesrv #關閉nameserver
mqshutdown broker #關閉broker
HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ mqshutdown namesrv The mqnamesrv(3582) is running... Send shutdown request to mqnamesrv(3582) OK HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ mqshutdown broker The mqbroker(3589) is running... Send shutdown request to mqbroker(3589) OK
2、集群部署
RocketMQ的多Master多Slave模式有兩種:異步復制和同步雙寫。
異步復制:每個 Master 配置一個 Slave,有多對Master-Slave, HA采用異步復制方式,主備有短暫消息延遲,毫秒級。
優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,因為Master 宕機后,消費者仍然可以從 Slave消費,此過程對應用透明。不需要人工干預。性能同多 Master 模式幾乎一樣。
缺點: Master 宕機,磁盤損壞情況,會丟失少量消息。
同步雙寫:每個 Master 配置一個 Slave,有多對Master-Slave, HA采用同步雙寫方式,主備都寫成功,向應用返回成功。
優點:數據與服務都無單點, Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高。
缺點:性能比異步復制模式略低,大約低10%左右,發送單個消息的 RT會略高。目前主宕機后,備機不能自動切換為主機,后續會支持自動切換功能。
drwxr-xr-x 6 houjing admin 192 3 4 09:59 2m-2s-async/ drwxr-xr-x 6 houjing admin 192 3 4 09:59 2m-2s-sync/
分別用於異步復制和同步雙寫配置,我們進入到目錄2m-2s-sync,可以看到4個broker文件,表示2Master2Slave的RocketMQ集群。
-rw-r--r-- 1 houjing admin 922 3 4 09:59 broker-a-s.properties -rw-r--r-- 1 houjing admin 928 3 4 09:59 broker-a.properties -rw-r--r-- 1 houjing admin 922 3 4 09:59 broker-b-s.properties -rw-r--r-- 1 houjing admin 928 3 4 09:59 broker-b.properties 192:2m-2s-sync houjing$ pwd /Volumes/work/rocketmq-all-4.7.0-bin-release/conf/2m-2s-sync
我們依次對4個properties進行配置配置:
#mq-master01節點配置/data/rocketmq/conf/2m-2s-sync/broker-a.properties
[root@mq-master01 ~]# vim /data/rocketmq/conf/2m-2s-sync/broker-a.properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣 例如:在a.properties 文件中寫 broker-a 在b.properties 文件中寫 broker-b
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,這里nameserver是單台,如果nameserver是多台集群的話,就用分號分割(即namesrvAddr=ip1:port1;ip2:port2;ip3:port3)
namesrvAddr=192.168.10.207:9876;
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數。由於是4個broker節點,所以設置為4
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/data/rocketmq/store
#commitLog 存儲路徑
storePathCommitLog=/data/rocketmq/store/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/data/rocketmq/store/consumequeue
#消息索引存儲路徑
storePathIndex=/data/rocketmq/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/data/rocketmq/store/checkpoint
#abort 文件存儲路徑
abortFile=/data/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 異步復制Master #- SYNC_MASTER 同步雙寫Master #- SLAVE
brokerRole=MASTER #要配置為MASTER或SLAVE的角色
#刷盤方式 #- ASYNC_FLUSH 異步刷盤 #- SYNC_FLUSH 同步刷盤
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false #發消息線程池數量
#sendMessageThreadPoolNums=128 #拉消息線程池數量 #pullMessageThreadPoolNums=128
#mq-master02節點配置的是/data/rocketmq/conf/2m-2s-sync/broker-b.properties #就下面三行配置不一樣,其他配置行都一樣!
[root@mq-master02 software]# vim /data/rocketmq/conf/2m-2s-sync/broker-b.properties
......
brokerName=broker-b
brokerId=0
brokerRole=MASTER
#mq-slave01節點配置的是/data/rocketmq/conf/2m-2s-sync/broker-a-s.properties
[root@mq-slave01 software]# vim /data/rocketmq/conf/2m-2s-sync/broker-a-s.properties
......
brokerName=broker-a #注意這一行的名稱要和master保持一致
brokerId=1 #這個ID要跟master的不一致!
brokerRole=SLAVE #要配置為從
#mq-slave02節點配置的是/data/rocketmq/conf/2m-2s-sync/broker-b-s.properties
[root@mq-slave02 software]# vim /data/rocketmq/conf/2m-2s-sync/broker-b-s.properties
......
brokerName=broker-b #注意這一行的名稱要和master的保持一致
brokerId=1 #這個ID要跟master的不一致
brokerRole=SLAVE #要配置為從
注意要創建存儲&日志文件,如果一台虛擬機部署多個需要用文件進行區分。
修改日志配置文件,如:
mkdir -p /root/svr/rocketmq/logs cd /root/svr/rocketmq/conf && sed -i 's#${user.home}#/root/svr/rocketmq#g'*.xml
注意logback.*.xml配置文件中${user.home}需要替換自己指定的目錄
同時要進行默認參數修改,runbroker.sh,runserver.sh啟動參數默認對jvm的堆內存設置比較大(不改啟動不起來),如果是虛擬機非線上環境需要改下參數,大小可以根據自己機器來決定
默認大小:
-Xms8g -Xmx8g -Xmn4g 改為: JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
配置完成后跟單機部署一樣先啟動NameServer,然后啟動Broker即可。附RocketMQ常用命令:
需要切換到bin目錄下,即: [root@mq-master01 ~]# cd /data/rocketmq/bin [root@mq-master01 bin]# 獲取所有可用命令: [root@mq-master01 bin]# sh mqadmin 查看幫助: # sh mqadmin <command> -h 查詢Producer的網絡連接情況: # sh mqadmin producerConnection -n localhost:9876 -g <producer-group> -t <producer-topic> 查詢Consumer的網絡連接情況: # sh mqadmin consumerConnection -n localhost:9876 -g <consumer-group> 查詢Consumer的消費狀態: # sh mqadmin consumerProgress -n localhost:9876 -g <consumer-group> 查詢消息是否發送成功 獲取指定Topic: # sh mqadmin topicList -n localhost:9876 | grep <topicName> 查看Topic狀態: # sh mqadmin topicStatus -n localhost:9876 -t <topicName> 根據offset獲取消息: # sh sh mqadmin queryMsgByOffset -n localhost:9876 -b <broker-name> -i <queueId> -o <offset> -t <topicName> 根據offsetMsgId查詢消息: # sh sh mqadmin queryMsgById -n localhost:9876 -i <offsetMsgId> 查詢消息是否被消費成功 查詢消息詳情: # sh mqadmin queryMsgById -i {MsgId} -n {NameServerAddr} 查看Consumer Group訂閱了哪些TOPIC: # sh mqadmin consumerProgress -g <ConsumerGroup> -n <NameServerAddr> 查詢TOPIC被哪些Consumer Group訂閱了 沒有查詢特定TOPIC訂閱情況,只能查詢所有后再過濾: # sh mqadmin statsAll -n <NameServerAddr> | grep <TOPIC> 返回結果:#Topic #Consumer Group #InTPS #OutTPS #InMsg24Hour #OutMsg24Hour 關閉nameserver和所有的broker: # sh mqshutdown namesrv # sh mqshutdown broker 查看所有消費組group: # sh mqadmin consumerProgress -n 192.168.23.159:9876 查看指定消費組(kevinGroupConsumer)下的所有topic數據堆積情況: # sh mqadmin consumerProgress -n 192.168.23.159:9876 -g kevinGroupConsumer 查看所有topic : # sh mqadmin topicList -n 192.168.23.159:9876 查看topic信息列表詳情統計 # sh mqadmin topicstatus -n 192.168.23.159:9876 -t myTopicTest1 新增topic # sh mqadmin updateTopic –n 10.45.47.168 –c DefaultCluster –t ZTEExample 刪除topic # sh mqadmin deleteTopic –n 10.45.47.168:9876 –c DefaultCluster –t ZTEExample
五、RocketMQ Console安裝部署
1、rocketmq-console介紹
RocketMQ-Console是RocketMQ項目的擴展插件,是一個圖形化管理控制台,提供Broker集群狀態查看,Topic管理,Producer、Consumer狀態展示,消息查詢等常用功能,這個功能在安裝好RocketMQ后需要額外單獨安裝、運行。
2、rocketmq-console下載、部署
進入rocketmq-externals項目的GitHub地址,如下圖,可看到RocketMQ項目的諸多擴展項目,其中就包含我們需要下載的rocketmq-console。
// 克隆項目到本地 git clone https://github.com/apache/rocketmq-externals.git
3、進入項目文件夾並修改配置文件
中文注釋是為了方便解釋,請刪除,不然打包報錯:Not allow chinese character !
// 進入目錄 $ cd rocketmq-externals/rocketmq-console/ // 修改maven項目的資源文件 $ vim src/main/resources/application.properties #管理后台訪問上下文路徑,默認為空,如果填寫,一定要前面加“/”,后面不要加,否則啟動報錯 server.contextPath=/rocketmq #訪問端口 server.port=8080 ### SSL setting 默認就行 #server.ssl.key-store=classpath:rmqcngkeystore.jks #server.ssl.key-store-password=rocketmq #server.ssl.keyStoreType=PKCS12 #server.ssl.keyAlias=rmqcngkey #spring.application.index=true spring.application.name=rocketmq-console spring.http.encoding.charset=UTF-8 spring.http.encoding.enabled=true spring.http.encoding.force=true #logback配置文件路徑,先默認即可 logging.config=classpath:logback.xml #if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876 #Name Server地址,修改成你自己的服務地址。多個地址用英文分號“;”隔開 rocketmq.config.namesrvAddr=localhost:9876 #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true rocketmq.config.isVIPChannel= #rocketmq-console's data path:dashboard/monitor rocketmq.config.dataPath=/tmp/rocketmq-console/data #set it false if you don't want use dashboard.default true rocketmq.config.enableDashBoardCollect=true #set the message track trace topic if you don't want use the default one rocketmq.config.msgTrackTopicName= rocketmq.config.ticketKey=ticket #Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required rocketmq.config.loginRequired=false
注意: Name Server地址默認為空,可以在啟動項目后在后台配置;或者啟動服務的時候給出rocketmq.config.namesrvAddr參數值。
4、將項目打成jar包,並運行jar文件
$ mvn clean package -Dmaven.test.skip=true $ java -jar target/rocketmq-console-ng-1.0.0.jar #如果配置文件沒有填寫Name Server的話,可以在啟動項目時指定namesrvAddr $ java -jar target/rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr='localhost:9876' #因為本文在打包時配置了namesrvAddr,故而執行如下命令 $ java -jar target/rocketmq-console-ng-1.0.0.jar
5、IDEA上直接運行Console
我們也可以直接在IntelliJ上啟動Console,console插件本身就是一個項目,可以直接在IDEA導入項目並運行,同上進行application.properties修改,然后找到app啟動類啟動,如下圖所示:
六、Console控制台的使用
瀏覽器訪問如下:
如上圖所示,DashBoard展示了一些Broker和Topic的基本信息。
1、NameSrvAdd配置
NameServer除了可以在首次啟動時在Application.properties中指定,還可以在Console啟動后進行配置。
2、集群Cluster
3、 Topic主題
主題可在此處進行增刪查改,其中TopicTest是我們RocketMQ的默認主題。
3.1、添加Topic
我們可以進行手動添加:
3.2、Topic狀態
RocketMQ自帶的默認Topic:TopicTest,默認有四個隊列。
3.3、發送消息
4、消息Message