RocketMQ學習之安裝部署及基礎講解


一、rocketMQ簡介

  RocektMQ是阿里巴巴在2012年開源的一個純java、分布式、隊列模型的第三代消息中間件。

  2016年11月11號,雙十一大促見證了RocketMQ低延遲存儲架構的成功試水,99.996%的延遲落在了10ms以內,極個別由於GC引發的停頓在50ms以內,其高性能、低延時和高可靠的特性承載了近年來雙十一17萬筆/秒的交易峰值,在整個生產鏈路上都有着穩定和出色的表現。其在同年捐贈給Apache后正式進入孵化期。並於2017年9月RocketMQ正式從Apache社區正式畢業,成為Apache頂級項目。

二、相關術語

  1、消息模型(Message Model)

  RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。Broker 在實際部署過程中對應一台服務器,每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲於不同的 Broker。Message Queue 用於存儲消息的物理地址,每個Topic中的消息地址存儲於多個 Message Queue 中。Consumer Group 由多個Consumer 實例構成。

  2、消息生產者(Producer)

  Producer:消息生產者,負責產生消息,一般由業務系統負責產生消息。

  Producer Group:一類Producer的集合名稱,這類Producer通常發送一類消息,且發送邏輯一致。

  一個消息生產者會把業務應用系統里產生的消息發送到broker服務器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker返回確認信息,單向發送不需要。

  3、消息消費者(Consumer)

  Consumer:消息消費者,負責消費消息,一般是后台系統負責異步消費。
  Push Consumer:Consumer的一種,應用通常向Consumer對象注冊一個Listener接口,一旦收到消息,Consumer對象立刻回調Listener接口方法。該消費模式一般實時性較高。
  Pull Consumer:Consumer的一種,應用通常主動調用Consumer的拉消息方法從Broker拉消息,主動權由應用控制。
  Consumer Group:一類Consumer的集合名稱,這類Consumer通常消費一類消息,且消費邏輯一致。

  4、代理服務器(Broker)

  消息中轉角色,負責存儲消息,轉發消息,一般也稱為Server。

  Master:Broker中的主節點。
  Slave:Broker中的副節點。

  5、名字服務(Name Server)

  名稱服務充當路由消息的提供者。生產者或消費者能夠通過名字服務查找各主題相應的Broker IP列表。多個Namesrv實例組成集群,集群中Nameserver互相獨立,彼此沒有通信關系,單台Nameserver掛掉,不影響其他Nameserver,即使全部掛掉,也不影響業務系統使用。而且Nameserver不會有頻繁的讀寫,所以性能開銷非常小,穩定性很高。

  6、主題(Topic)

  表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬於一個主題,是RocketMQ進行消息訂閱的基本單位。

  7、消息(Message)

  消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬於一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了通過Message ID和Key查詢消息的功能。

  8、消息隊列(Message Queue)

  在RocketMQ中,所有消息隊列都是持久化,長度無限的數據結構,所謂長度無限是指隊列中的每個存儲單元都是定長,訪問其中的存儲單元使用Offset來訪問,offset為java long類型,64位,理論上在100年內不會溢出,所以認為是長度無限,另外隊列中只保存最近幾天的數據,之前的數據會按照過期時間來刪除。也可以認為Message Queue是一個長度無限的數組,offset就是下標。

  9、消費類型

  廣播消費:一條消息被多個Consumer消費,即使這些Consumer屬於同一個Consumer Group,消息也會被Consumer Group中的每個Consumer都消費一次,廣播消費中的Consumer Group概念可以認為在消息划分方面無意義。

  集群消費:一個Consumer Group中的Consumer實例平均分攤消費消息。例如某個Topic有9條消息,其中一個Consumer Group有3個實例(可能是3個進程,或者3台機器),那么每個實例只消費其中的3條消息。

  10、消息順序

  普通順序消息:消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。主要指的是局部順序,即一類消息為滿足順序性,必須Producer單線程順序發送,且發送到同一個隊列,這樣Consumer就可以按照Producer發送的順序去消費消息。但是一旦發生通信異常,Broker重啟,由於隊列總數發生變化,哈希取模后定位的隊列會變化,產生短暫的消息順序不一致。

  嚴格順序消息:順序消息的一種,無論正常異常情況都能保證順序,但是犧牲了分布式Failover特性,即Broker集群中只要有一台機器不可用,則整個集群都不可用,服務可用性大大降低。如果服務器部署為同步雙寫模式,此缺陷可通過備機自動切換為主避免,不過仍然會存在幾分鍾的服務不可用。

  11、消息寫入機制

  異步復制:消息寫入master節點,再由master節點異步復制到slave節點,類似mysql中的master-slave機制。

  同步雙寫:消息同時寫入master節點和slave節點。

  12、持久化策略

  異步刷盤:Broker的一種持久化策略,消息寫入pagecache后,直接返回。由異步線程負責將pagecache寫入硬盤。

  同步刷盤:Broker的一種持久化策略,消息寫入pagecache后,由同步線程將pagecache寫入硬盤后,再返回。

三、RocketMQ集群部署模式

  RocketMQ作為消息中間件,其主要功能為消息的Publish/Subscribe。而Broker擔任的消息轉發和存儲功能,其部署方式有很多種:

  1、單Master

  優點:除了配置簡單沒什么優點。
  缺點:不可靠,該機器重啟或宕機,將導致整個服務不可用。

  2、多Master

  優點:配置簡單,性能最高。
  缺點:可能會有少量消息丟失,單台機器重啟或宕機期間,該機器下未被消費的消息在機器恢復前不可訂閱,影響消息實時性。

  3、異步多Master多Slave

  每個Master配一個Slave,有多對Master-Slave,集群采用異步復制方式,主備有短暫消息延遲,毫秒級。

  優點:性能同多Master幾乎一樣,實時性高,主備間切換對應用透明,不需人工干預。
  缺點:Master宕機或磁盤損壞時會有少量消息丟失。

  4、同步多Master多Slave

  每個Master配一個Slave,有多對Master-Slave,集群采用同步雙寫方式,主備都寫成功,向應用返回成功。

  優點:服務可用性與數據可用性非常高。
  缺點:性能比異步集群略低,當前版本主宕備不能自動切換為主。

四、RocketMQ架構

  首先我們來看看RocketMQ架構是怎樣的:

     

  接下來看看各個組件如何啟動: 

      

  最后看看各個組件如何協作:

      

 五、RocketMQ安裝部署

  1、單機部署

  首先我們來嘗試進行單機部署,本實例是在Mac環境下進行部署,Linux機器大致一致。

  1.1、官方推薦環境

  • 64bit OS, Linux/Unix/Mac is recommended;(Windows user see guide below)
  • 64bit JDK 1.8+;
  • Maven 3.2.x;
  • Git;
  • 4g+ free disk for Broker server

  1.2、軟件包獲取

  軟件包可以下載源文件進行編譯然后安裝,也可以直接下載二進制包進行解壓安裝,本文就以二進制包的安裝為例,版本:rocketmq-all-4.7.0-bin-release.zip

  下載地址為:http://rocketmq.apache.org/docs/quick-start/

  中文的GitHub地址:https://github.com/apache/rocketmq/tree/master/docs/cn

  1.3、JDK環境

  本文JDK版本選擇 1.8:

HoudeMacBook-Pro:~ ******$ java -version
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)

  注意選擇新的版本啟動可能出現以下問題:

Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Unrecognized VM option 'UseCMSCompactAtFullCollection'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

  1.4、rocketmq服務端安裝

  解壓安裝包:rocketmq-all-4.7.0-bin-release.zip

unzip rocketmq-all-4.7.0-bin-release.zip

  配置rocketmq的環境變量,在 ~/.bash_profile 最后加入:

export ROCKETMQ_HOME=/Volumes/work/rocketmq-all-4.7.0-bin-release
export PATH=$PATH::$ROCKETMQ_HOME/bin

  然后執行source命令使配置文件生效:

source ~/.bash_profile

  給下列命令可運行權限:

HoudeMacBook-Pro:~ houjing$ cd $ROCKETMQ_HOME/bin
HoudeMacBook-Pro:bin houjing$ pwd
/Volumes/work/rocketmq-all-4.7.0-bin-release/bin
HoudeMacBook-Pro:bin houjing$ chmod +x mqadmin mqbroker mqfiltersrv mqshutdown  mqnamesrv

  新建日志目錄:

HoudeMacBook-Pro:~ ******$ cd $ROCKETMQ_HOME
HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release ******$ mkdir logs

  1.5、啟動namesever

HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ nohup mqnamesrv 1>/Volumes/work/rocketmq-all-4.7.0-bin-release/logs/ng.log 2>/Volumes/work/rocketmq-all-4.7.0-bin-release/logs/ng-err.log &
HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ ps -ef|grep rocket
  501  3464  3224   0  8:54下午 ttys000    0:00.02 /bin/sh /Volumes/work/rocketmq-all-4.7.0-bin-release/bin/mqnamesrv
  501  3465  3464   0  8:54下午 ttys000    0:00.01 sh /Volumes/work/rocketmq-all-4.7.0-bin-release/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup
  501  3495  3465   0  8:54下午 ttys000    0:02.88 /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC -verbose:gc -Xloggc:/Volumes/RAMDisk/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m -XX:-OmitStackTraceInFastThrow -XX:-UseLargePages -Djava.ext.dirs=/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext:/Volumes/work/rocketmq-all-4.7.0-bin-release/bin/../lib -cp .:/Volumes/work/rocketmq-all-4.7.0-bin-release/bin/../conf:/lib/tools.jar:/lib/dt.jar org.apache.rocketmq.namesrv.NamesrvStartup

  通過日志檢查是否啟動成功:

HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ tail -f /Volumes/work/rocketmq-all-4.7.0-bin-release/logs/ng.log
Create RAMDisk /Volumes/RAMDisk for gc logging on Darwin OS.
The Name Server boot success. serializeType=JSON

  1.6、啟動broker

  啟動broker,在啟動borker之前須要指定nameserver地址。例如本地單機部署 127.0.0.1 為所在服務器IP:

HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ export NAMESRV_ADDR=127.0.0.1:9876
HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ nohup mqbroker >/Volumes/work/rocketmq-all-4.7.0-bin-release/logs/mq.log &
[2] 3584
HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ tail -f /Volumes/work/rocketmq-all-4.7.0-bin-release/logs/mq.log
The broker[HoudeMacBook-Pro.local, 192.168.100.28:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

  1.7、模擬生產者生產

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ export NAMESRV_ADDR=127.0.0.1:9876
HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 21:14:59.611 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
SendResult [sendStatus=SEND_OK, msgId=C0A8641C0E694D7E188675DB340D0000, offsetMsgId=C0A8641C00002A9F000000000002BEB2, messageQueue=MessageQueue [topic=TopicTest, brokerName=HoudeMacBook-Pro.local, queueId=1], queueOffset=250]
SendResult [sendStatus=SEND_OK, msgId=C0A8641C0E694D7E188675DB34320001, offsetMsgId=C0A8641C00002A9F000000000002BF64, messageQueue=MessageQueue [topic=TopicTest, brokerName=HoudeMacBook-Pro.local, queueId=2], queueOffset=250]
SendResult [sendStatus=SEND_OK, msgId=C0A8641C0E694D7E188675DB34350002, offsetMsgId=C0A8641C00002A9F000000000002C016, messageQueue=MessageQueue [topic=TopicTest, brokerName=HoudeMacBook-Pro.local, queueId=3], queueOffset=250]

  1.8、模擬消費者進行消費

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 21:16:28.569 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=HoudeMacBook-Pro.local, queueId=1, storeSize=178, queueOffset=251, sysFlag=0, bornTimestamp=1592918100028, bornHost=/192.168.100.28:58376, storeTimestamp=1592918100029, storeHost=/192.168.100.28:10911, msgId=C0A8641C00002A9F000000000002C17A, commitLogOffset=180602, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=500, CONSUME_START_TIME=1592918188998, UNIQ_KEY=C0A8641C0E694D7E188675DB343C0004, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}]] 
ConsumeMessageThread_10 Receive New Messages: [MessageExt [brokerName=HoudeMacBook-Pro.local, queueId=1, storeSize=179, queueOffset=254, sysFlag=0, bornTimestamp=1592918100065, bornHost=/192.168.100.28:58376, storeTimestamp=1592918100066, storeHost=/192.168.100.28:10911, msgId=C0A8641C00002A9F000000000002C9D8, commitLogOffset=182744, bodyCRC=1659149091, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=500, CONSUME_START_TIME=1592918188998, UNIQ_KEY=C0A8641C0E694D7E188675DB34600010, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 54], transactionId='null'}]] 

  1.9、關閉RockectMQ

mqshutdown namesrv       #關閉nameserver
mqshutdown broker          #關閉broker
HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ mqshutdown namesrv
The mqnamesrv(3582) is running...
Send shutdown request to mqnamesrv(3582) OK
HoudeMacBook-Pro:rocketmq-all-4.7.0-bin-release houjing$ mqshutdown broker
The mqbroker(3589) is running...
Send shutdown request to mqbroker(3589) OK

  2、集群部署

  RocketMQ的多Master多Slave模式有兩種:異步復制和同步雙寫。

  異步復制:每個 Master 配置一個 Slave,有多對Master-Slave, HA采用異步復制方式,主備有短暫消息延遲,毫秒級。

      優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,因為Master 宕機后,消費者仍然可以從 Slave消費,此過程對應用透明。不需要人工干預。性能同多 Master 模式幾乎一樣。

      缺點: Master 宕機,磁盤損壞情況,會丟失少量消息。

  同步雙寫:每個 Master 配置一個 Slave,有多對Master-Slave, HA采用同步雙寫方式,主備都寫成功,向應用返回成功。

      優點:數據與服務都無單點, Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高。

      缺點:性能比異步復制模式略低,大約低10%左右,發送單個消息的 RT會略高。目前主宕機后,備機不能自動切換為主機,后續會支持自動切換功能。

  我們首先來部署一個異步復制的RocketMQ,在RocketMQ路徑下有兩個文件夾:
drwxr-xr-x   6 houjing  admin    192  3  4 09:59 2m-2s-async/
drwxr-xr-x   6 houjing  admin    192  3  4 09:59 2m-2s-sync/

  分別用於異步復制和同步雙寫配置,我們進入到目錄2m-2s-sync,可以看到4個broker文件,表示2Master2Slave的RocketMQ集群。

-rw-r--r--   1 houjing  admin  922  3  4 09:59 broker-a-s.properties -rw-r--r--   1 houjing  admin  928  3  4 09:59 broker-a.properties -rw-r--r--   1 houjing  admin  922  3  4 09:59 broker-b-s.properties -rw-r--r--   1 houjing  admin  928  3  4 09:59 broker-b.properties 192:2m-2s-sync houjing$ pwd
/Volumes/work/rocketmq-all-4.7.0-bin-release/conf/2m-2s-sync

  我們依次對4個properties進行配置配置:

#mq-master01節點配置/data/rocketmq/conf/2m-2s-sync/broker-a.properties
[root@mq-master01 ~]# vim /data/rocketmq/conf/2m-2s-sync/broker-a.properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣 例如:在a.properties 文件中寫 broker-a 在b.properties 文件中寫 broker-b
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,這里nameserver是單台,如果nameserver是多台集群的話,就用分號分割(即namesrvAddr=ip1:port1;ip2:port2;ip3:port3)
namesrvAddr=192.168.10.207:9876;
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數。由於是4個broker節點,所以設置為4
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/data/rocketmq/store
#commitLog 存儲路徑
storePathCommitLog=/data/rocketmq/store/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/data/rocketmq/store/consumequeue
#消息索引存儲路徑
storePathIndex=/data/rocketmq/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/data/rocketmq/store/checkpoint
#abort 文件存儲路徑
abortFile=/data/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 異步復制Master #- SYNC_MASTER 同步雙寫Master #- SLAVE
brokerRole=MASTER                      #要配置為MASTER或SLAVE的角色
#刷盤方式 #- ASYNC_FLUSH 異步刷盤 #- SYNC_FLUSH 同步刷盤
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false #發消息線程池數量
#sendMessageThreadPoolNums=128 #拉消息線程池數量 #pullMessageThreadPoolNums=128
 
#mq-master02節點配置的是/data/rocketmq/conf/2m-2s-sync/broker-b.properties #就下面三行配置不一樣,其他配置行都一樣!
[root@mq-master02 software]# vim /data/rocketmq/conf/2m-2s-sync/broker-b.properties     
......
brokerName=broker-b           
brokerId=0
brokerRole=MASTER  
 
#mq-slave01節點配置的是/data/rocketmq/conf/2m-2s-sync/broker-a-s.properties
[root@mq-slave01 software]# vim /data/rocketmq/conf/2m-2s-sync/broker-a-s.properties
......
brokerName=broker-a          #注意這一行的名稱要和master保持一致
brokerId=1                   #這個ID要跟master的不一致!
brokerRole=SLAVE             #要配置為從
 
#mq-slave02節點配置的是/data/rocketmq/conf/2m-2s-sync/broker-b-s.properties
[root@mq-slave02 software]# vim /data/rocketmq/conf/2m-2s-sync/broker-b-s.properties
......
brokerName=broker-b         #注意這一行的名稱要和master的保持一致
brokerId=1                  #這個ID要跟master的不一致
brokerRole=SLAVE            #要配置為從

  注意要創建存儲&日志文件,如果一台虛擬機部署多個需要用文件進行區分。

  修改日志配置文件,如:

mkdir -p /root/svr/rocketmq/logs
cd /root/svr/rocketmq/conf && sed -i 's#${user.home}#/root/svr/rocketmq#g'*.xml

  注意logback.*.xml配置文件中${user.home}需要替換自己指定的目錄

  同時要進行默認參數修改,runbroker.sh,runserver.sh啟動參數默認對jvm的堆內存設置比較大(不改啟動不起來),如果是虛擬機非線上環境需要改下參數,大小可以根據自己機器來決定
默認大小:

-Xms8g -Xmx8g -Xmn4g
改為:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"

  配置完成后跟單機部署一樣先啟動NameServer,然后啟動Broker即可。附RocketMQ常用命令:

需要切換到bin目錄下,即:
[root@mq-master01 ~]# cd /data/rocketmq/bin
[root@mq-master01 bin]#
 
獲取所有可用命令:
[root@mq-master01 bin]# sh mqadmin
 
查看幫助:
# sh mqadmin <command> -h
查詢Producer的網絡連接情況:
# sh mqadmin producerConnection -n localhost:9876 -g <producer-group> -t <producer-topic>
查詢Consumer的網絡連接情況:
# sh mqadmin consumerConnection -n localhost:9876 -g <consumer-group>
查詢Consumer的消費狀態:
# sh mqadmin consumerProgress -n localhost:9876 -g <consumer-group>
 
查詢消息是否發送成功
獲取指定Topic:
# sh mqadmin topicList -n localhost:9876 | grep <topicName>
查看Topic狀態:
# sh mqadmin topicStatus -n localhost:9876 -t <topicName>
根據offset獲取消息:
# sh sh mqadmin queryMsgByOffset -n localhost:9876 -b <broker-name> -i <queueId> -o <offset> -t <topicName>
根據offsetMsgId查詢消息:
# sh sh mqadmin queryMsgById -n localhost:9876 -i <offsetMsgId>
 
查詢消息是否被消費成功
查詢消息詳情:
# sh mqadmin queryMsgById -i {MsgId} -n {NameServerAddr}
查看Consumer Group訂閱了哪些TOPIC:
# sh mqadmin consumerProgress -g <ConsumerGroup> -n <NameServerAddr>
 
查詢TOPIC被哪些Consumer Group訂閱了
沒有查詢特定TOPIC訂閱情況,只能查詢所有后再過濾:
# sh mqadmin statsAll -n <NameServerAddr> | grep <TOPIC>
返回結果:#Topic #Consumer Group #InTPS #OutTPS #InMsg24Hour #OutMsg24Hour
 
關閉nameserver和所有的broker:
# sh mqshutdown namesrv
# sh mqshutdown broker
 
查看所有消費組group:
# sh mqadmin consumerProgress -n 192.168.23.159:9876
查看指定消費組(kevinGroupConsumer)下的所有topic數據堆積情況:
# sh mqadmin consumerProgress -n 192.168.23.159:9876 -g kevinGroupConsumer
查看所有topic :
# sh mqadmin topicList -n 192.168.23.159:9876
查看topic信息列表詳情統計
# sh mqadmin topicstatus -n 192.168.23.159:9876 -t myTopicTest1
新增topic
# sh mqadmin updateTopic –n 10.45.47.168 –c DefaultCluster –t ZTEExample
刪除topic
# sh mqadmin deleteTopic –n 10.45.47.168:9876 –c DefaultCluster –t ZTEExample

 五、RocketMQ Console安裝部署

   1、rocketmq-console介紹

  RocketMQ-Console是RocketMQ項目的擴展插件,是一個圖形化管理控制台,提供Broker集群狀態查看,Topic管理,Producer、Consumer狀態展示,消息查詢等常用功能,這個功能在安裝好RocketMQ后需要額外單獨安裝、運行。

  2、rocketmq-console下載、部署

  進入rocketmq-externals項目的GitHub地址,如下圖,可看到RocketMQ項目的諸多擴展項目,其中就包含我們需要下載的rocketmq-console。

   

  

// 克隆項目到本地
git clone  https://github.com/apache/rocketmq-externals.git

  3、進入項目文件夾並修改配置文件

  中文注釋是為了方便解釋,請刪除,不然打包報錯:Not allow chinese character !

// 進入目錄
$ cd rocketmq-externals/rocketmq-console/
// 修改maven項目的資源文件
$ vim src/main/resources/application.properties 
#管理后台訪問上下文路徑,默認為空,如果填寫,一定要前面加“/”,后面不要加,否則啟動報錯
server.contextPath=/rocketmq
#訪問端口
server.port=8080

### SSL setting  默認就行
#server.ssl.key-store=classpath:rmqcngkeystore.jks
#server.ssl.key-store-password=rocketmq
#server.ssl.keyStoreType=PKCS12
#server.ssl.keyAlias=rmqcngkey

#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
#logback配置文件路徑,先默認即可
logging.config=classpath:logback.xml #if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
#Name Server地址,修改成你自己的服務地址。多個地址用英文分號“;”隔開
rocketmq.config.namesrvAddr=localhost:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
#set the message track trace topic if you don't want use the default one
rocketmq.config.msgTrackTopicName=
rocketmq.config.ticketKey=ticket

#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
rocketmq.config.loginRequired=false

  注意: Name Server地址默認為空,可以在啟動項目后在后台配置;或者啟動服務的時候給出rocketmq.config.namesrvAddr參數值。

  4、將項目打成jar包,並運行jar文件

$ mvn clean package -Dmaven.test.skip=true
$ java -jar target/rocketmq-console-ng-1.0.0.jar
#如果配置文件沒有填寫Name Server的話,可以在啟動項目時指定namesrvAddr
$ java -jar target/rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr='localhost:9876'

#因為本文在打包時配置了namesrvAddr,故而執行如下命令
$ java -jar target/rocketmq-console-ng-1.0.0.jar

  5、IDEA上直接運行Console

   我們也可以直接在IntelliJ上啟動Console,console插件本身就是一個項目,可以直接在IDEA導入項目並運行,同上進行application.properties修改,然后找到app啟動類啟動,如下圖所示:

    

六、Console控制台的使用 

  瀏覽器訪問如下:

  

  如上圖所示,DashBoard展示了一些Broker和Topic的基本信息。

  1、NameSrvAdd配置

   

  NameServer除了可以在首次啟動時在Application.properties中指定,還可以在Console啟動后進行配置。

  2、集群Cluster

   

  3、 Topic主題

  主題可在此處進行增刪查改,其中TopicTest是我們RocketMQ的默認主題。

  

  3.1、添加Topic

  我們可以進行手動添加:

   

  3.2、Topic狀態

  RocketMQ自帶的默認Topic:TopicTest,默認有四個隊列。

  

  3.3、發送消息

  

   4、消息Message

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM