1. MQ簡單介紹
1.1 應用場景
應用解耦
系統的耦合性越高,容錯性就越低。以電商應用為例,用戶創建訂單后,如果耦合調用庫存系統、物流系統、支付系統,任何一個子系統出了故障或者因為升級等原因暫時不可用,都會造成下單操作異常,影響用戶使用體驗。
使用消息隊列解耦合,系統的耦合性就會降低了。比如物流系統發生故障,需要幾分鍾才能來修復,在這段時間內,物流系統要處理的數據被緩存到消息隊列中,用戶的下單操作正常完成。當物流系統回復后,補充處理存在消息隊列中的訂單消息即可,終端系統感知不到物流系統發生過幾分鍾故障。
流量的削峰填谷
應用系統如果遇到偶發性的流量的瞬間猛增,有可能會將系統壓垮。但是如果因此提高整體的系統性能,可能會使系統的復雜性和成本大大增高,有了消息隊列可以將大量請求緩存起來,分散到很長一段時間處理,平衡系統的處理性能,這樣可以大大提到系統的穩定性和用戶體驗。
數據分發
如果某個系統的數據變更,將需要將變更操作同步到其他系統中時,如果使用RPC調用的方式,將導致系統的可擴展性大大降低, 所以可以使用MQ的方式 進行通知消息, 消息發送方將無需知道哪些系統需要被通知
1.2 MQ的缺點
缺點包含以下幾點:
-
系統可用性降低
系統引入的外部依賴越多,系統穩定性越差。一旦MQ宕機,就會對業務造成影響。
如何保證MQ的高可用?
-
系統復雜度提高
MQ的加入大大增加了系統的復雜度,以前系統間是同步的遠程調用,現在是通過MQ進行異步調用。需要考慮更多的情況
如何保證消息沒有被重復消費?怎么處理消息丟失情況?那么保證消息傳遞的順序性?
-
一致性問題
A系統處理完業務,通過MQ給B、C、D三個系統發消息數據,如果B系統、C系統處理成功,D系統處理失敗。
如何保證消息數據處理的一致性?
1.3 各種MQ產品的比較
常見的MQ產品包括Kafka、ActiveMQ、RabbitMQ、RocketMQ。
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
開發語言 | java | erlang | java | scala |
單機吞吐量 | 萬級 | 萬級 | 十萬級 | 十萬級 |
時效性 | 毫秒級 | 微秒級 | 毫秒級 | 毫秒級 |
可用性 | 高(主從架構) | 高(主從架構) | 非常高(分布式架構) | 非常高(分布式架構) |
功能特性 | 成熟的產品,在很多公司使用,文檔豐富,各種協議支持的比較好 | 基於erlang語言開發,天生並發能力強,延時很低 | MQ功能比較完善,基於java開發,擴展性好 | 只支持主要的mq功能,為大數領域應用廣 |
2. RocketMq介紹
2.1 基本概念
2.1.1 消息(Message)
消息是指,消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬於一個主題。
2.1.2 主題(Topic)
-
Topic表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬於一個主題,是RocketMQ進行消息訂閱的基本單位。
-
一個生產者可以同時發送多種Topic的消息;而一個消費者只對某種特定的Topic感興趣,即只可以訂閱和消費一種Topic的消息。
2.1.3 標簽(Tag)
為消息設置的標簽,用於同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標簽。標簽能夠有效地保持代碼的清晰度和連貫性,並優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴展性。
Topic是消息的一級分類,Tag是消息的二級分類。
2.1.4 隊列(Queue)
-
存儲消息的物理實體。一個Topic中可以包含多個Queue,每個Queue中存放的就是該Topic的消息。一個Topic的Queue也被稱為一個Topic中消息的分區(Partition)。
-
一個Topic的Queue中的消息只能被一個消費者組中的一個消費者消費。一個Queue中的消息不允許同一個消費者組中的多個消費者同時消費。
和分片的區別:
分片不同於分區。在RocketMQ中,分片指的是存放相應Topic的Broker。每個分片中會創建出相應數量的分區,即Queue,每個Queue的大小都是相同的。
2.1.5 消息標識(MessageId/Key)
RocketMQ中每個消息擁有唯一的MessageId,且可以攜帶具有業務標識的Key,以方便對消息的查詢。不過需要注意的是,MessageId有兩個:在生產者send()消息時會自動生成一個MessageId(msgId),
當消息到達Broker后,Broker也會自動生成一個MessageId(offsetMsgId)。msgId、offsetMsgId與key都稱為消息標識。
- msgId:由producer端生成,其生成規則為:
producerIp + 進程pid + MessageClientIDSetter類的ClassLoader的hashCode +當前時間 + AutomicInteger自增計數器
- offsetMsgId:由broker端生成,其生成規則為:
brokerIp + 物理分區的offset(Queue中的偏移量)
- key:由用戶指定的業務相關的唯一標識
2.2 系統架構
RocketMQ架構上主要分為四部分構成:
2.2.1 Producer生產者
消息生產者,負責生產消息。Producer通過MQ的負載均衡模塊選擇相應的Broker集群隊列進行消息投遞,投遞的過程支持快速失敗並且低延遲。
RocketMQ中的消息生產者都是以生產者組(Producer Group)的形式出現的。生產者組是同一類生產者的集合,這類Producer可以發送相同Topic類型的消息。一個生產者組可以也可以同時發送多個主題的消息。
2.2.2 Consumer消費者
消息消費者,負責消費消息。一個消息消費者會從Broker服務器中獲取到消息,並對消息進行相關業務處理。
RocketMQ中的消息消費者都是以消費者組(Consumer Group)的形式出現的。
消費者組是同一類消費者的集合,這類Consumer消費的是同一個Topic類型的消息。
消費者組使得在消息消費方面,實現負載均衡(將一個Topic中的不同的Queue平均分配給同一個Consumer Group的不同的Consumer,注意,並不是將消息負載均衡)和容錯(一個Consmer掛了,該Consumer Group中的其它Consumer可以接着消費原Consumer消費的Queue)的目標變得非常容易。
如下圖所示:
消費者組中Consumer的數量應該小於等於訂閱Topic的Queue數量。如果超出Queue數量,則多出的Consumer將不能消費消息。不過,一個Topic類型的消息可以被多個消費者組同時消費。
如下圖所示:
注意,
1)消費者組只能消費一個Topic的消息,不能同時消費多個Topic消息
2)一個消費者組中的消費者必須訂閱完全相同的Topic
3)一個消費者組中的消費者可以消費一個topic下的多個queue,例如broker自動創建四個queue,但是只有一個消費者,則此消費者消費四個queue,但是一個topic下的queue,無法被同一個消費組下的多個消費者消費,例如四個queue,一個消費組五個消費者,則多出的一個則不工作
2.2.3 Name Server
1.功能介紹
NameServer是一個Broker與Topic路由的注冊中心,支持Broker的動態注冊與發現。RocketMQ的思想來自於Kafka,而Kafka是依賴了Zookeeper的。所以,在RocketMQ的早期版本,即在MetaQ v1.0與v2.0版本中,也是依賴於Zookeeper的。從MetaQ v3.0,即RocketMQ開始去掉了Zookeeper依賴,使用了自己的NameServer。
2.主要包括兩個功能:
-
Broker管理:接受Broker集群的注冊信息並且保存下來作為路由信息的基本數據;提供心跳檢測機制,檢查Broker是否還存活。
-
路由信息管理:每個NameServer中都保存着Broker集群的整個路由信息和用於客戶端查詢的隊列信息。Producer和Conumser通過NameServer可以獲取整個Broker集群的路由信息,從而進行消息的投遞和消費。
3.路由注冊:
NameServer通常也是以集群的方式部署,不過,NameServer是無狀態的,即NameServer集群中的各個節點間是無差異的,各節點間相互不進行信息通訊。那各節點中的數據是如何進行數據同步的呢?在Broker節點啟動時,輪詢NameServer列表,與每個NameServer節點建立長連接,發起注冊請求。在NameServer內部維護着⼀個Broker列表,用來動態存儲Broker的信息。
(注意,這是與其它像zk、Eureka、Nacos等注冊中心不同的地方。
這種NameServer的無狀態方式,有什么優缺點:
優點:NameServer集群搭建簡單,擴容簡單。
缺點:對於Broker,必須明確指出所有NameServer地址。否則未指出的將不會去注冊。也正因
為如此,NameServer並不能隨便擴容。因為,若Broker不重新配置,新增的NameServer對於
Broker來說是不可見的,其不會向這個NameServer進行注冊。 )**
Broker節點為了證明自己是活着的,為了維護與NameServer間的長連接,會將最新的信息以心跳包的方式上報給NameServer,每30秒發送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、 Broker名稱、Broker所屬集群名稱等等。NameServer在接收到心跳包后,會更新心跳時間戳,記錄這個Broker的最新存活時間。
4.路由剔除
由於Broker關機、宕機或網絡抖動等原因,NameServer沒有收到Broker的心跳,NameServer可能會將其從Broker列表中剔除。NameServer中有⼀個定時任務,每隔10秒就會掃描⼀次Broker表,查看每一個Broker的最新心跳時間戳距離當前時間是否超過120秒,如果超過,則會判定Broker失效,然后將其從Broker列表中剔除。
擴展:對於RocketMQ日常運維工作,例如Broker升級,需要停掉Broker的工作。OP需要怎么做?
OP需要將Broker的讀寫權限禁掉。一旦client(Consumer或Producer)向broker發送請求,都會收到broker的NO_PERMISSION響應,然后client會進行對其它Broker的重試。**
當OP觀察到這個Broker沒有流量后,再關閉它,實現Broker從NameServer的移除。
OP:運維工程師
SRE:Site Reliability Engineer,現場可靠性工程師**
5.路由發現
RocketMQ的路由發現采用的是Pull模型。當Topic路由信息出現變化時,NameServer不會主動推送給客戶端,而是客戶端定時拉取主題最新的路由。默認客戶端每30秒會拉取一次最新的路由。
擴展:
1)Push模型:推送模型。其實時性較好,是一個“發布-訂閱”模型,需要維護一個長連接。而長連接的維護是需要資源成本的。該模型適合於的場景: 實時性要求較高Client數量不多,Server數據變化較頻繁**
2)Pull模型:拉取模型。存在的問題是,實時性較差。**
3)Long Polling模型:長輪詢模型。其是對Push與Pull模型的整合,充分利用了這兩種模型的優勢,屏蔽了它們的劣勢。**
6.客戶端NameServer選擇策略
這里的客戶端指的是Producer與Consumer
客戶端在配置時必須要寫上NameServer集群的地址,那么客戶端到底連接的是哪個NameServer節點呢?客戶端首先會生產一個隨機數,然后再與NameServer節點數量取模,此時得到的就是所要連接的節點索引,然后就會進行連接。如果連接失敗,則會采用round-robin策略,逐個嘗試着去連接其它節點。
首先采用的是隨機策略進行的選擇,失敗后采用的是輪詢策略。
擴展:Zookeeper Client是如何選擇Zookeeper Server的?
簡單來說就是,經過兩次Shuffle(打亂),然后選擇第一台Zookeeper Server。詳細說就是,將配置文件中的zk server地址進行第一次Shuffle,然后隨機選擇一個。這個選擇出的一般都是一個hostname。然后獲取到該hostname對應的所有ip,再對這些ip進行第二次Shuffle,從Shuffle過的結果中取第一個server地址進行連接。
2.2.4 Broker
1.功能介紹
Broker充當着消息中轉角色,負責存儲消息、轉發消息。Broker在RocketMQ系統中負責接收並存儲從生產者發送來的消息,同時為消費者的拉取請求作准備。Broker同時也存儲着消息相關的元數據,包括消費者組消費進度偏移offset、主題、隊列等。
2.模塊構成
示意圖:
Remoting Module:整個Broker的實體,負責處理來自clients端的請求。而這個Broker實體則由以下模
塊構成。
Client Manager:客戶端管理器。負責接收、解析客戶端(Producer/Consumer)請求,管理客戶端。例
如,維護Consumer的Topic訂閱信息
Store Service:存儲服務。提供方便簡單的API接口,處理消息存儲到物理硬盤和消息查詢功能。
HA Service:高可用服務,提供Master Broker 和 Slave Broker之間的數據同步功能。
Index Service:索引服務。根據特定的Message key,對投遞到Broker的消息進行索引服務,同時也提
供根據Message Key對消息進行快速查詢的功能。
3.集群模式
為了增強Broker性能與吞吐量,Broker一般都是以集群形式出現的。各集群節點中可能存放着相同
Topic的不同Queue。不過,這里有個問題,如果某Broker節點宕機,如何保證數據不丟失呢?其解決
方案是,將每個Broker集群節點進行橫向擴展,即將Broker節點再建為一個HA集群,解決單點問題。
Broker節點集群是一個主從集群,即集群中具有Master與Slave兩種角色。Master負責處理讀寫操作請
求,Slave負責對Master中的數據進行備份。當Master掛掉了,Slave則會自動切換為Master去工作。所
以這個Broker集群是主備集群。一個Master可以包含多個Slave,但一個Slave只能隸屬於一個Master。
Master與Slave 的對應關系是通過指定相同的BrokerName、不同的BrokerId 來確定的。BrokerId為0表
示Master,非0表示Slave。每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信
息到所有NameServer。
2.2.5 工作流程
1.具體流程
1)啟動NameServer,NameServer啟動后開始監聽端口,等待Broker、Producer、Consumer連接。
2)啟動Broker時,Broker會與所有的NameServer建立並保持長連接,然后每30秒向NameServer定時
發送心跳包。
3)發送消息前,可以先創建Topic,創建Topic時需要指定該Topic要存儲在哪些Broker上,當然,在創
建Topic時也會將Topic與Broker的關系寫入到NameServer中。不過,這步是可選的,也可以在發送消
息時自動創建Topic。
4)Producer發送消息,啟動時先跟NameServer集群中的其中一台建立長連接,並從NameServer中獲
取路由信息,即當前發送的Topic消息的Queue與Broker的地址(IP+Port)的映射關系。然后根據算法
策略從隊選擇一個Queue,與隊列所在的Broker建立長連接從而向Broker發消息。當然,在獲取到路由
信息后,Producer會首先將路由信息緩存到本地,再每30秒從NameServer更新一次路由信息。
5)Consumer跟Producer類似,跟其中一台NameServer建立長連接,獲取其所訂閱Topic的路由信息,
然后根據算法策略從路由信息中獲取到其所要消費的Queue,然后直接跟Broker建立長連接,開始消費
其中的消息。Consumer在獲取到路由信息后,同樣也會每30秒從NameServer更新一次路由信息。不過
不同於Producer的是,Consumer還會向Broker發送心跳,以確保Broker的存活狀態。
2.Topic的創建模式
手動創建Topic時,有兩種模式:
集群模式:該模式下創建的Topic在該集群中,所有Broker中的Queue數量是相同的。
Broker模式:該模式下創建的Topic在該集群中,每個Broker中的Queue數量可以不同。
自動創建Topic時,默認采用的是Broker模式,會為每個Broker默認創建4個Queue。
3.讀/寫隊列
從物理上來講,讀/寫隊列是同一個隊列。所以,不存在讀/寫隊列數據同步問題。讀/寫隊列是邏輯上進行區分的概念。一般情況下,讀/寫隊列數量是相同的。例如,創建Topic時設置的寫隊列數量為8,讀隊列數量為4,此時系統會創建8個Queue,分別是0 1 2 3 4 5 6 7。Producer會將消息寫入到這8個隊列,但Consumer只會消費0 1 2 3這4個隊列中的消息,4 5 6 7中的消息是不會被消費到的。
再如,創建Topic時設置的寫隊列數量為4,讀隊列數量為8,此時系統會創建8個Queue,分別是0 1 2 3 4 5 6 7。Producer會將消息寫入到0 1 2 3 這4個隊列,但Consumer會消費0 1 2 3 4 5 6 7這8個隊列中的消息,但是4 5 6 7中是沒有消息的。此時假設Consumer Group中包含兩個Consuer,Consumer1消費0 1 2 3,而Consumer2消費4 5 6 7。但實際情況是,Consumer2是沒有消息可消費的。
也就是說,當讀/寫隊列數量設置不同時,總是有問題的。那么,為什么要這樣設計呢?
其這樣設計的目的是為了,方便Topic的Queue的縮容。
例如,原來創建的Topic中包含16個Queue,如何能夠使其Queue縮容為8個,還不會丟失消息?可以動態修改寫隊列數量為8,讀隊列數量不變。此時新的消息只能寫入到前8個隊列,而消費都消費的卻是16個隊列中的數據。當發現后8個Queue中的消息消費完畢后,就可以再將讀隊列數量動態設置為8。整個縮容過程,沒有丟失任何消息。
perm用於設置對當前創建Topic的操作權限:2表示只寫,4表示只讀,6表示讀寫。
3. 安裝 RocketMQ
官網:https://rocketmq.apache.org/docs/quick-start/
下載地址:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.0/rocketmq-all-4.9.0-bin-release.zip
下載最新版本,並解壓,(我這里下載的linux版本,也可以下載windows版)
目錄介紹:
- bin:啟動腳本,包括shell腳本和CMD腳本
- conf:實例配置文件 ,包括broker配置文件、logback配置文件等
- lib:依賴jar包,包括Netty、commons-lang、FastJSON等
2.1 啟動,關閉RocketMQ
啟動NameServer
# 1.啟動NameServer (nohup:不掛斷運行命令, &:后台運行, 默認日志重定向到nohup.out)
nohup sh bin/mqnamesrv &
# 2.查看啟動日志
tail -f ~/logs/rocketmqlogs/namesrv.log
啟動Broker
# 1.啟動Broker
nohup sh bin/mqbroker -n localhost:9876 &
# 2.查看啟動日志
tail -f ~/logs/rocketmqlogs/broker.log
默認的nameServer,broker啟動參數需要的內存比較大,有可能會運行失敗,所以需要修改runserver.sh,runbroker.sh 文件,如下
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
修改為:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn518m"
關閉服務
# 1.關閉名稱服務器
sh bin/mqshutdown namesrv
# 2.關閉Broker
sh bin/mqshutdown broker
2.2 測試RocketMQ
發送消息
# 1.設置環境變量
export NAMESRV_ADDR=localhost:9876
# 2.使用安裝包的Demo發送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
接收消息
# 1.設置環境變量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
4. 集群搭建
為了MQ服務的高可用,生產環境一般以集群的方式搭建
示意圖:
3.1 數據復制與刷盤策略
刷盤策略(不區分單機,集群)
刷盤策略指的是broker中消息的落盤方式,即消息發送到broker內存后消息持久化到磁盤的方式。分為同步刷盤與異步刷盤:
同步刷盤:當消息持久化到broker的磁盤后才回復消息寫入成功。
異步刷盤:當消息寫入到broker的內存后即表示消息寫入成功,無需等待消息持久化到磁盤。
1)異步刷盤策略會降低系統的寫入延遲,RT變小,提高了系統的吞吐量
2)消息寫入到Broker的內存,一般是寫入到了PageCache
3)對於異步 刷盤策略,消息會寫入到PageCache后立即返回成功ACK。但並不會立即做落盤操作,而是當PageCache到達一定量時會自動進行落盤。
復制策略(主從之間)
復制策略是Broker的Master與Slave間的數據同步方式。分為同步復制與異步復制:
同步復制:消息寫入master后,master會等待slave同步數據成功后才向producer返回成功ACK
異步復制:消息寫入master后,master立即向producer返回成功ACK,無需等待slave同步數據成
功
3.2 集群各節點回顧
- NameServer是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。
- Broker部署相對復雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有NameServer。
- Producer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer取Topic路由信息,並向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。
- Consumer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer取Topic路由信息,並向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。
3.3 集群模式
單Master模式
這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用。不建議線上環境使用,可以用於本地測試。
多Master模式
一個集群無Slave,全是Master,例如2個Master或者3個Master,這種模式的優缺點如下:
優點:配置簡單,單個Master宕機或重啟維護對應用無影響,在磁盤配置為RAID10時,即使機器宕機不可恢復情況下,由於RAID10磁盤非常可靠,消息也不會丟(異步刷盤丟失少量消息,同步刷盤一條不丟),性能最高;
缺點:單台機器宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受到影響。
多Master多Slave模式(master與slave異步同步)
每個Master配置一個Slave,有多對Master-Slave,HA采用異步復制方式,主備有短暫消息延遲(毫秒級),這種模式的優缺點如下:
優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,同時Master宕機后,消費者仍然可以從Slave消費,而且此過程對應用透明,不需要人工干預,性能同多Master模式幾乎一樣;
缺點:Master宕機,磁盤損壞情況下會丟失少量消息。
多Master多Slave模式(master與slave同步同步)
每個Master配置一個Slave,有多對Master-Slave,HA采用同步雙寫方式,即只有主備都寫成功,才向應用返回成功,這種模式的優缺點如下:
優點:數據與服務都無單點故障,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高;
缺點:性能比異步復制模式略低(大約低10%左右),發送單個消息的RT會略高,且目前版本在主節點宕機后,備機不能自動切換為主機。
3.4 模擬搭建雙主-雙從(同步)方式集群
示意圖:
工作流程:
-
啟動NameServer,NameServer起來后監聽端口,等待Broker、Producer、Consumer連上來,相當於一個路由控制中心。
-
Broker啟動,跟所有的NameServer保持長連接,定時發送心跳包。心跳包中包含當前Broker信息(IP+端口等)以及存儲所有Topic信息。注冊成功后,NameServer集群中就有Topic跟Broker的映射關系。
-
收發消息前,先創建Topic,創建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動創建Topic。
-
Producer發送消息,啟動時先跟NameServer集群中的其中一台建立長連接,並從NameServer中獲取當前發送的Topic存在哪些Broker上,輪詢從隊列列表中選擇一個隊列,然后與隊列所在的Broker建立長連接從而向Broker發消息。
-
Consumer跟Producer類似,跟其中一台NameServer建立長連接,獲取當前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道,開始消費消息。
由於我是在一台機子上搭建,所以需要修改對應節點的端口號
3.3.1 配置nameServer端口信息
啟動多台nameServer,需要配置不同的端口號,可以在官方准備好的示例配置文件夾下配置: conf/2m-2s-sync
創建文件:namesrv1.properties
,namesrv2.properties
分別配置:
listenPort=9876
listenPort=9877
啟動:指定對應的配置文件
nohup sh mqnamesrv -c /home/app/server/rocketmq-all-4.9.0-bin-release/conf/2m-2s-sync/namesrv1.properties &
nohup sh mqnamesrv -c /home/app/server/rocketmq-all-4.9.0-bin-release/conf/2m-2s-sync/namesrv2.properties &
使用jps命令查看,兩個nameServer啟動成功
3.3.2 Master1+Slave1組合搭建
修改master1節點配置信息:broker-a.properties
# 所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣,主從就是以此名稱配對
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分號分割
namesrvAddr=localhost:9876;localhost:9877
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑,每個broker節點的路徑不能相同
storePathRootDir=/home/app/server/rocketmq/m1/store
#commitLog 存儲路徑
storePathCommitLog=/home/app/server/rocketmq/m1/store/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/home/app/server/rocketmq/m1/store/consumequeue
#消息索引存儲路徑
storePathIndex=/home/app/server/rocketmq/m1/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/home/app/server/rocketmq/m1/store/checkpoint
#abort 文件存儲路徑
abortFile=/home/app/server/rocketmq/m1/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步復制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
# 主從同步方式為同步
brokerRole=SYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
#master的刷盤機制為同步
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
修改slave1 節點配置文件:broker-a-s.properties
# 所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分號分割
namesrvAddr=localhost:9876;localhost:9877
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=11011
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/home/app/server/rocketmq/m2/store
#commitLog 存儲路徑
storePathCommitLog=/home/app/server/rocketmq/m2/store/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/home/app/server/rocketmq/m2/store/consumequeue
#消息索引存儲路徑
storePathIndex=/home/app/server/rocketmq/m2/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/home/app/server/rocketmq/m2/store/checkpoint
#abort 文件存儲路徑
abortFile=/home/app/server/rocketmq/m2/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步復制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=SLAVE
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
啟動腳本:
# 啟動master1 broker
nohup sh mqbroker -c /home/app/server/rocketmq-all-4.9.0-bin-release/conf/2m-2s-sync/broker-a.properties &
#啟動 slave1 broker
nohup sh mqbroker -c /home/app/server/rocketmq-all-4.9.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
3.3.3 Master2+Slave2組合搭建
修改master2節點配置信息:broker-b.properties
# 所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分號分割
namesrvAddr=localhost:9876;localhost:9877
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=20911
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/home/app/server/rocketmq/m4/store
#commitLog 存儲路徑
storePathCommitLog=/home/app/server/rocketmq/m4/store/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/home/app/server/rocketmq/m4/store/consumequeue
#消息索引存儲路徑
storePathIndex=/home/app/server/rocketmq/m4/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/home/app/server/rocketmq/m4/store/checkpoint
#abort 文件存儲路徑
abortFile=/home/app/server/rocketmq/m4/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步復制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
修改slave2 節點配置文件:broker-b-s.properties
# 所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分號分割
namesrvAddr=localhost:9876;localhost:9877
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=21011
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/home/app/server/rocketmq/m3/store
#commitLog 存儲路徑
storePathCommitLog=/home/app/server/rocketmq/m3/store/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/home/app/server/rocketmq/m3/store/consumequeue
#消息索引存儲路徑
storePathIndex=/home/app/server/rocketmq/m3/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/home/app/server/rocketmq/m3/store/checkpoint
#abort 文件存儲路徑
abortFile=/home/app/server/rocketmq/m3/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步復制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=SLAVE
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
啟動腳本:
# 啟動master1 broker
nohup sh mqbroker -c /home/app/server/rocketmq-all-4.9.0-bin-release/conf/2m-2s-sync/broker-b.properties &
#啟動 slave1 broker
nohup sh mqbroker -c /home/app/server/rocketmq-all-4.9.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
使用jps命令查看后台java進程,可以發現已經成功啟動兩個 NameServer 和四個BrokerServer 雙主雙從集群
5. mqadmin管理工具
進入RocketMQ安裝位置,在bin目錄下執行./mqadmin {command} {args}
,可以對 mq 服務進行各種操作包括如下幾個部分
5.1 Topic相關
名稱(command) | 含義 | 命令選項 (args) | 說明 |
---|---|---|---|
updateTopic | 創建更新Topic配置 | -b | Broker 地址,表示 topic 所在 Broker,只支持單台Broker,地址為ip:port |
-c | cluster 名稱,表示 topic 所在集群(集群可通過 clusterList 查詢) | ||
-h | 打印幫助 | ||
-n | NameServer服務地址,格式 ip:port | ||
-p | 指定新topic的讀寫權限( W=2|R=4|WR=6 ) | ||
-r | 可讀隊列數(默認為 8) | ||
-w | 可寫隊列數(默認為 8) | ||
-t | topic 名稱(名稱只能使用字符 [1]+$ ) | ||
deleteTopic | 刪除Topic | -c | cluster 名稱,表示刪除某集群下的某個 topic (集群 可通過 clusterList 查詢) |
-h | 打印幫助 | ||
-n | NameServer 服務地址,格式 ip:port | ||
-t | topic 名稱(名稱只能使用字符 [2]+$ ) | ||
topicList | 查看 Topic 列表信息 | -h | 打印幫助 |
-c | 不配置-c只返回topic列表,增加-c返回clusterName, topic, consumerGroup信息,即topic的所屬集群和訂閱關系,沒有參數 | ||
-n | NameServer 服務地址,格式 ip:port | ||
topicRoute | 查看 Topic 路由信息 | -t | topic 名稱 |
-h | 打印幫助 | ||
-n | NameServer 服務地址,格式 ip:port | ||
topicStatus | 查看 Topic 消息隊列offset | -t | topic 名稱 |
-h | 打印幫助 | ||
-n | NameServer 服務地址,格式 ip:port | ||
topicClusterList | 查看 Topic 所在集群列表 | -t | topic 名稱 |
-h | 打印幫助 | ||
-n | NameServer 服務地址,格式 ip:port | ||
updateTopicPerm | 更新 Topic 讀寫權限 | -t | topic 名稱 |
-h | 打印幫助 | ||
-n | NameServer 服務地址,格式 ip:port | ||
-b | Broker 地址,表示 topic 所在 Broker,只支持單台Broker,地址為ip:port | ||
-p | 指定新 topic 的讀寫權限( W=2|R=4|WR=6 ) | ||
-c | cluster 名稱,表示 topic 所在集群(集群可通過 clusterList 查詢),-b優先,如果沒有-b,則對集群中所有Broker執行命令 | ||
updateOrderConf | 從NameServer上創建、刪除、獲取特定命名空間的kv配置,目前還未啟用 | -h | 打印幫助 |
-n | NameServer 服務地址,格式 ip:port | ||
-t | topic,鍵 | ||
-v | orderConf,值 | ||
-m | method,可選get、put、delete | ||
allocateMQ | 以平均負載算法計算消費者列表負載消息隊列的負載結果 | -t | topic 名稱 |
-h | 打印幫助 | ||
-n | NameServer 服務地址,格式 ip:port | ||
-i | ipList,用逗號分隔,計算這些ip去負載Topic的消息隊列 | ||
statsAll | 打印Topic訂閱關系、TPS、積累量、24h讀寫總量等信息 | -h | 打印幫助 |
-n | NameServer 服務地址,格式 ip:port | ||
-a | 是否只打印活躍topic | ||
-t | 指定topic |
5.2 集群相關
名稱 | 含義 | 命令選項 | 說明 |
---|---|---|---|
clusterList | 查看集群信息,集群、BrokerName、BrokerId、TPS等信息 | -m | 打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday) |
-h | 打印幫助 | ||
-n | NameServer 服務地址,格式 ip:port | ||
-i | 打印間隔,單位秒 | ||
clusterRT | 發送消息檢測集群各Broker RT。消息發往${BrokerName} Topic。 | -a | amount,每次探測的總數,RT = 總時間 / amount |
-s | 消息大小,單位B | ||
-c | 探測哪個集群 | ||
-p | 是否打印格式化日志,以|分割,默認不打印 | ||
-h | 打印幫助 | ||
-m | 所屬機房,打印使用 | ||
-i | 發送間隔,單位秒 | ||
-n | NameServer 服務地址,格式 ip:port |
5.3 Broker相關
名稱 | 含義 | 命令選項 | 說明 |
---|---|---|---|
updateBrokerConfig | 更新 Broker 配置文件,會修改Broker.conf | -b | Broker 地址,格式為ip:port |
-c | cluster 名稱 | ||
-k | key 值 | ||
-v | value 值 | ||
-h | 打印幫助 | ||
-n | NameServer 服務地址,格式 ip:port | ||
brokerStatus | 查看 Broker 統計信息、運行狀態(你想要的信息幾乎都在里面) | -b | Broker 地址,地址為ip:port |
-h | 打印幫助 | ||
-n | NameServer 服務地址,格式 ip:port | ||
brokerConsumeStats | Broker中各個消費者的消費情況,按Message Queue維度返回Consume Offset,Broker Offset,Diff,TImestamp等信息 | -b | Broker 地址,地址為ip:port |
-t | 請求超時時間 | ||
-l | diff閾值,超過閾值才打印 | ||
-o | 是否為順序topic,一般為false | ||
-h | 打印幫助 | ||
-n | NameServer 服務地址,格式 ip:port | ||
getBrokerConfig | 獲取Broker配置 | -b | Broker 地址,地址為ip:port |
-n | NameServer 服務地址,格式 ip:port | ||
wipeWritePerm | 從NameServer上清除 Broker寫權限 | -b | Broker 地址,地址為ip:port |
-n | NameServer 服務地址,格式 ip:port | ||
-h | 打印幫助 | ||
cleanExpiredCQ | 清理Broker上過期的Consume Queue,如果手動減少對列數可能產生過期隊列 | -n | NameServer 服務地址,格式 ip:port |
-h | 打印幫助 | ||
-b | Broker 地址,地址為ip:port | ||
-c | 集群名稱 | ||
cleanUnusedTopic | 清理Broker上不使用的Topic,從內存中釋放Topic的Consume Queue,如果手動刪除Topic會產生不使用的Topic | -n | NameServer 服務地址,格式 ip:port |
-h | 打印幫助 | ||
-b | Broker 地址,地址為ip:port | ||
-c | 集群名稱 | ||
sendMsgStatus | 向Broker發消息,返回發送狀態和RT | -n | NameServer 服務地址,格式 ip:port |
-h | 打印幫助 | ||
-b | BrokerName,注意不同於Broker地址 | ||
-s | 消息大小,單位B | ||
-c | 發送次數 | ||
5.4 消息相關
名稱 | 含義 | 命令選項 | 說明 |
queryMsgById | 根據offsetMsgId查詢msg,如果使用開源控制台,應使用offsetMsgId,此命令還有其他參數,具體作用請閱讀QueryMsgByIdSubCommand。 | -i | msgId |
-h | 打印幫助 | ||
-n | NameServer 服務地址,格式 ip:port | ||
queryMsgByKey | 根據消息 Key 查詢消息 | -k | msgKey |
-t | Topic 名稱 | ||
-h | 打印幫助 | ||
-n | NameServer 服務地址,格式 ip:port | ||
queryMsgByOffset | 根據 Offset 查詢消息 | -b | Broker 名稱,(這里需要注意 填寫的是 Broker 的名稱,不是 Broker 的地址,Broker 名稱可以在 clusterList 查到) |
-i | query 隊列 id | ||
-o | offset 值 | ||
-t | topic 名稱 | ||
-h | 打印幫助 | ||
-n | NameServer 服務地址,格式 ip:port | ||
queryMsgByUniqueKey | 根據msgId查詢,msgId不同於offsetMsgId,區別詳見常見運維問題。-g,-d配合使用,查到消息后嘗試讓特定的消費者消費消息並返回消費結果 | -h | 打印幫助 |
-n | NameServer 服務地址,格式 ip:port | ||
-i | uniqe msg id | ||
-g | consumerGroup | ||
-d | clientId | ||
-t | topic名稱 | ||
checkMsgSendRT | 檢測向topic發消息的RT,功能類似clusterRT | -h | 打印幫助 |
-n | NameServer 服務地址,格式 ip:port | ||
-t | topic名稱 | ||
-a | 探測次數 | ||
-s | 消息大小 | ||
sendMessage | 發送一條消息,可以根據配置發往特定Message Queue,或普通發送。 | -h | 打印幫助 |
-n | NameServer 服務地址,格式 ip:port | ||
-t | topic名稱 | ||
-p | body,消息體 | ||
-k | keys | ||
-c | tags | ||
-b | BrokerName | ||
-i | queueId | ||
consumeMessage | 消費消息。可以根據offset、開始&結束時間戳、消息隊列消費消息,配置不同執行不同消費邏輯,詳見ConsumeMessageCommand。 | -h | 打印幫助 |
-n | NameServer 服務地址,格式 ip:port | ||
-t | topic名稱 | ||
-b | BrokerName | ||
-o | 從offset開始消費 | ||
-i | queueId | ||
-g | 消費者分組 | ||
-s | 開始時間戳,格式詳見-h | ||
-d | 結束時間戳 | ||
-c | 消費多少條消息 | ||
printMsg | 從Broker消費消息並打印,可選時間段 | -h | 打印幫助 |
-n | NameServer 服務地址,格式 ip:port | ||
-t | topic名稱 | ||
-c | 字符集,例如UTF-8 | ||
-s | subExpress,過濾表達式 | ||
-b | 開始時間戳,格式參見-h | ||
-e | 結束時間戳 | ||
-d | 是否打印消息體 | ||
printMsgByQueue | 類似printMsg,但指定Message Queue | -h | 打印幫助 |
-n | NameServer 服務地址,格式 ip:port | ||
-t | topic名稱 | ||
-i | queueId | ||
-a | BrokerName | ||
-c | 字符集,例如UTF-8 | ||
-s | subExpress,過濾表達式 | ||
-b | 開始時間戳,格式參見-h | ||
-e | 結束時間戳 | ||
-p | 是否打印消息 | ||
-d | 是否打印消息體 | ||
-f | 是否統計tag數量並打印 | ||
resetOffsetByTime | 按時間戳重置offset,Broker和consumer都會重置 | -h | 打印幫助 |
-n | NameServer 服務地址,格式 ip:port | ||
-g | 消費者分組 | ||
-t | topic名稱 | ||
-s | 重置為此時間戳對應的offset | ||
-f | 是否強制重置,如果false,只支持回溯offset,如果true,不管時間戳對應offset與consumeOffset關系 | ||
-c | 是否重置c++客戶端offset |
5.5 消費者、消費組相關
名稱 | 含義 | 命令選項 | 說明 |
consumerProgress | 查看訂閱組消費狀態,可以查看具體的client IP的消息積累量 | -g | 消費者所屬組名 |
-s | 是否打印client IP | ||
-h | 打印幫助 | ||
-n | NameServer 服務地址,格式 ip:port | ||
consumerStatus | 查看消費者狀態,包括同一個分組中是否都是相同的訂閱,分析Process Queue是否堆積,返回消費者jstack結果,內容較多,使用者參見ConsumerStatusSubCommand | -h | 打印幫助 |
-n | NameServer 服務地址,格式 ip:port | ||
-g | consumer group | ||
-i | clientId | ||
-s | 是否執行jstack | ||
getConsumerStatus | 獲取 Consumer 消費進度 | -g | 消費者所屬組名 |
-t | 查詢主題 | ||
-i | Consumer 客戶端 ip | ||
-n | NameServer 服務地址,格式 ip:port | ||
-h | 打印幫助 | ||
updateSubGroup | 更新或創建訂閱關系 | -n | NameServer 服務地址,格式 ip:port |
-h | 打印幫助 | ||
-b | Broker地址 | ||
-c | 集群名稱 | ||
-g | 消費者分組名稱 | ||
-s | 分組是否允許消費 | ||
-m | 是否從最小offset開始消費 | ||
-d | 是否是廣播模式 | ||
-q | 重試隊列數量 | ||
-r | 最大重試次數 | ||
-i | 當slaveReadEnable開啟時有效,且還未達到從slave消費時建議從哪個BrokerId消費,可以配置備機id,主動從備機消費 | ||
-w | 如果Broker建議從slave消費,配置決定從哪個slave消費,配置BrokerId,例如1 | ||
-a | 當消費者數量變化時是否通知其他消費者負載均衡 | ||
deleteSubGroup | 從Broker刪除訂閱關系 | -n | NameServer 服務地址,格式 ip:port |
-h | 打印幫助 | ||
-b | Broker地址 | ||
-c | 集群名稱 | ||
-g | 消費者分組名稱 | ||
cloneGroupOffset | 在目標群組中使用源群組的offset | -n | NameServer 服務地址,格式 ip:port |
-h | 打印幫助 | ||
-s | 源消費者組 | ||
-d | 目標消費者組 | ||
-t | topic名稱 | ||
-o | 暫未使用 |
5.6 連接相關
名稱 | 含義 | 命令選項 | 說明 |
consumerConnec tion | 查詢 Consumer 的網絡連接 | -g | 消費者所屬組名 |
-n | NameServer 服務地址,格式 ip:port | ||
-h | 打印幫助 | ||
producerConnec tion | 查詢 Producer 的網絡連接 | -g | 生產者所屬組名 |
-t | 主題名稱 | ||
-n | NameServer 服務地址,格式 ip:port | ||
-h | 打印幫助 |
5.7 NameServer相關
名稱 | 含義 | 命令選項 | 說明 |
updateKvConfig | 更新NameServer的kv配置,目前還未使用 | -s | 命名空間 |
-k | key | ||
-v | value | ||
-n | NameServer 服務地址,格式 ip:port | ||
-h | 打印幫助 | ||
deleteKvConfig | 刪除NameServer的kv配置 | -s | 命名空間 |
-k | key | ||
-n | NameServer 服務地址,格式 ip:port | ||
-h | 打印幫助 | ||
getNamesrvConfig | 獲取NameServer配置 | -n | NameServer 服務地址,格式 ip:port |
-h | 打印幫助 | ||
updateNamesrvConfig | 修改NameServer配置 | -n | NameServer 服務地址,格式 ip:port |
-h | 打印幫助 | ||
-k | key | ||
-v | value |
5.8 其他
名稱 | 含義 | 命令選項 | 說明 |
startMonitoring | 開啟監控進程,監控消息誤刪、重試隊列消息數等 | -n | NameServer 服務地址,格式 ip:port |
-h | 打印幫助 |
6 可視化頁面監控平台
RocketMQ
有一個對其擴展的開源項目incubator-rocketmq-externals,這個項目中有一個子模塊叫rocketmq-console
,這個便是管理控制台項目了,先將incubator-rocketmq-externals拉到本地,因為我們需要自己對rocketmq-console
進行編譯打包運行。
下載(也可以直接下載zip包)
git clone https://github.com/apache/rocketmq-externals
在打包之前先要修改resource/application.properties文件,配置監控的 nameServer地址
rocketmq.config.namesrvAddr=192.168.25.135:9876;192.168.25.138:9876
打包:
cd rocketmq-console
mvn clean package -Dmaven.test.skip=true
運行生成在target下的jar包:
java -jar rocketmq-console-ng-2.0.0.jar
查看頁面:http://127.0.0.1:8080/#/cluster
,可以看到建群的相關信息