RocketMQ的基本使用


1、MQ 的基本介紹

MQ(Message Queue)消息隊列,是基礎數據結構中“先進先出”的一種數據結構。指把要傳輸的數據(消息)放在隊列中,用隊列機制來實現消息傳遞 —— 生產者產生消息並把消息放入隊列,然后由消費者去處理。消費者可以到指定隊列拉取消息,或者訂閱相應的隊列,由MQ服務端給其推送消息。

 

1.1、MQ的優點

消息隊列中間件是分布式系統中重要的組件,主要解決應用解耦,異步消息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。

 

MQ常見優點如下:

  • 應用解耦

一個業務需要多個模塊共同實現,或者一條消息有多個系統需要對應處理,只需要主業務完成以后,發送一條MQ,其余模塊消費MQ消息,即可實現業務,降低模塊之間的耦合。

系統的耦合性越高,容錯性就越低。以電商應用為例,用戶創建訂單后,如果耦合調用庫存系統、物流系統、支付系統,任何一個子系統出了故障或者因為升級等原因暫時不可用,都會造成下單操作異常,影響用戶使用體驗。

 

使用消息隊列解耦合,系統的耦合性就會提高了。比如物流系統發生故障,需要幾分鍾才能來修復,在這段時間內,物流系統將要處理的數據被緩存到消息隊列中,用戶的下單操作即正常完成。當物流系統回復后,補充處理存在消息隊列中的訂單消息即可,終端系統感知不到物流系統發生過幾分鍾故障。

                         

 

  • 流量削峰

高並發情況下,業務異步處理,提供高峰期業務處理能力,避免系統癱瘓。

應用系統如果遇到系統請求流量的瞬間猛增,有可能會將系統壓垮。有了消息隊列可以將大量請求緩存起來,分散到很長一段時間處理,這樣可以大大提到系統的穩定性和用戶體驗。

一般情況,為了保證系統的穩定性,如果系統負載超過閾值,就會阻止用戶請求,這會影響用戶體驗。而如果使用消息隊列將請求緩存起來,等待系統處理完畢后通知用戶下單完畢,這樣總不能下單體驗要好。

處於經濟考量目的:業務系統正常時段的QPS如果是1000,流量最高峰是10000,為了應對流量高峰配置高性能的服務器顯然不划算,這時可以使用消息隊列對峰值流量削峰。

                        

 

  • 數據分發

通過消息隊列可以讓數據在多個系統更加之間進行流通。數據的產生方不需要關心誰來使用數據,只需要將數據發送到消息隊列,數據使用方直接在消息隊列中直接獲取數據即可。                  

 

  • 異步

主業務執行結束后從屬業務通過MQ,異步執行,減低業務的響應時間,提高用戶體驗。

 

1.2、MQ的缺點

MQ 的缺點包含以下幾點:

  • 系統可用性降低

    系統引入的外部依賴越多,系統穩定性越差。一旦MQ宕機,就會對業務造成影響。

    問題:如何保證MQ的高可用?

  • 系統復雜度提高

    MQ的加入大大增加了系統的復雜度,以前系統間是同步的遠程調用,現在是通過MQ進行異步調用。

    問題:如何保證消息沒有被重復消費?怎么處理消息丟失情況?那么保證消息傳遞的順序性?

  • 一致性問題

    A系統處理完業務,通過MQ給B、C、D三個系統發消息數據,如果B系統、C系統處理成功,D系統處理失敗。

    問題:如何保證消息數據處理的一致性?

 

1.3、常見MQ產品及比較

常見的MQ產品包括Kafka、ActiveMQ、RabbitMQ、RocketMQ。

 

2、RocketMQ 的基本介紹

RocketMQ是阿里巴巴2016年MQ中間件,使用Java語言開發,在阿里內部,RocketMQ承接了例如“雙11”等高並發場景的消息流轉,能夠處理萬億級別的消息。

 

3、RocketMQ的基本使用

3.1、下載安裝 RocketMQ

環境要求:

  • Linux64位系統

  • JDK1.8(64位)

  • 如果是以源碼安裝的話,需要安裝Maven 3.2.x

 

提供下載鏈接:鏈接:https://pan.baidu.com/s/1V7eghMTMTi0ivD-CPICtjQ  提取碼:dzpm    該鏈接的文件解壓后在 資料/rocketmq安裝包 目錄里可以找到 rocketmq 的安裝包。

我們先在 Linux 系統上將 JDK 安裝好后,將 roketmq 上傳至 Linux 上,比如上傳至:/usr/mysoft 目錄下,然后解壓安裝包:

unzip rocketmq-all-4.4.0-bin-release.zip

實際上解壓完成后已相當於安裝完成,當然我們可以把這些解壓后的文件移到我們想要的目錄下,比如:/usr/local/rocketmq 目錄下,作為我們的軟件安裝目錄:

mv /usr/mysoft/rocketmq-all-4.4.0-bin-release/ /usr/local/rocketmq/

 

3.2、Rocketmq 的目錄介紹

rocketmq 解壓后目錄結構如下:

  • bin:啟動腳本,包括shell腳本和CMD腳本
  • conf:實例配置文件 ,包括broker配置文件、logback配置文件等
  • lib:依賴jar包,包括Netty、commons-lang、FastJSON等

 

3.3、啟動Rocketmq

首先進入 rocketmq 解壓后目錄的 bin 目錄下,該目錄存放着各個腳本文件。

  1. 啟動NameServer
# 啟動NameServer
nohup sh mqnamesrv &

示例:

 啟動后,可以查看啟動日志。如果沒有啟動過 nameserver ,該啟動日志文件是不存在的

# 查看啟動日志
tail -f ~/logs/rocketmqlogs/namesrv.log

 

  1. 啟動Broker
# 啟動Broker
nohup sh mqbroker -n localhost:9876 &

啟動完成后可以查看啟動日志:

# 查看啟動日志
tail -f ~/logs/rocketmqlogs/broker.log 

啟動完成后可通過 ps -ef | grep mq 命令查看進程,如果能找到 nameserver 和 broker 則證明兩個都啟動成功了:

或者可以通過 jps 命令來查看已啟動的 java 進程:

 

如果提示 jps 命令不存在,執行以下命令安裝 jps 命令:

yum install -y java-1.8.0-openjdk-devel.x86_64

# 安裝后查看jps命令是否存在
which jps

 

如果啟動 broker 后仍然找不到 broker 進程,或者是找不到 broker 的啟動日志,此時可能是啟動失敗了,可查看以下分析解決問題。

 

3.3.1、啟動不成功原因

在我們啟動 broker 后,可能會出現找不到 broker.log 日志文件的情況,這時是因為 broker 並沒有啟動成功。這是因為RocketMQ默認的虛擬機內存較大,啟動Broker可能因為內存不足而導致失敗,此時需要編輯如下兩個配置文件,將JVM內存調小一點:

# 編輯runbroker.sh和runserver.sh將JVM默認內存調小
vi runbroker.sh
vi runserver.sh

參考配置:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

如下為默認 JVM 內存大小

可以將上面配置調整為以下配置即可:

 調整完成后,重新執行啟動 broker 命令即可。

 

3.4、測試發送消息和接收消息

執行下面命令可發送消息:

#  設置環境變量
export NAMESRV_ADDR=localhost:9876
#  通過 bin 目錄下的 tools.sh 腳本,使用安裝包的Demo發送消息
sh tools.sh org.apache.rocketmq.example.quickstart.Producer

 

執行下面命令可接收消息:

#  設置環境變量
export NAMESRV_ADDR=localhost:9876
#  接收消息
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

執行接收消息命令后,進程不會自動退出,重新發送消息,可以看到接收消息進程可以自動接收到消息。

 

3.5、關閉rocketmq

在安裝目錄的 bin 目錄下執行以下消息可關閉 rocketmq:

#  關閉NameServer
sh mqshutdown namesrv
#  關閉Broker
sh mqshutdown broker

 

4、RocketMQ集群介紹

4.1、集群涉及各角色介紹(Producer、Consumer、Broker、NameServer、Topic、Message Queue)

  • Producer:消息的發送者;舉例:發信者
  • Consumer:消息接收者;舉例:收信者
  • Broker:暫存和傳輸消息;舉例:郵局
  • NameServer:管理Broker;舉例:各個郵局的管理機構
  • Topic:區分消息的種類;一個發送者可以發送消息給一個或者多個Topic;一個消息的接收者可以訂閱一個或者多個Topic消息
  • Message Queue:相當於是Topic的分區;用於並行發送和接收消息

 

4.2、 集群特點

  • NameServer是一個幾乎無狀態節點(即每個節點都是相同的),可集群部署,節點之間無任何信息同步。

  • Broker部署相對復雜,Broker分為Master(主節點)與Slave(從節點),一個Master可以對應多個Slave,但是一個Slave只能對應一個Master。Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerName相同則意味着是同一組的Broker,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有NameServer。

  • Producer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer取Topic路由信息,並向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。

  • Consumer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer取Topic路由信息,並向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。Consumer接收消息即可以主動從Broker拉取,也可以由Broker主動推送,所以Consumer需定時向Master、Slave發送心跳,以便讓Broker知道哪個Consumer節點是在線的,哪個是離線的。

NameServer 和 Producer、Consumer 的集群搭建比較簡單,因為多個節點之間無需信息同步,所以只要啟動多個節點集群也就搭建完成。Broker 集群搭建比較復雜,因為有主從節點,主從之間需要信息同步。

 

4.2.1、消費者組和生產者組(group)

RocketMQ 中在使用生產者或者消費者時,可以為他們指定分別指定組。同一組表示具有相同角色的生產者組合或消費者組合,稱為生產者組或消費者組。

生產者組和消費者組不一定要一樣,只要消費者訂閱和生產者一樣的主題 topic,就能接收到生產者的消息。

  • 生產者組:

通常具有同樣屬性(處理的消息種類-topic、以及消息處理邏輯流程—分布式多個客戶端)的一些producer可以歸為同一個group。生產者組的作用是在集群的情況下,一個生產者down之后,本地事務回滾后,可以繼續聯系該組下的另外一個生產者實例,不至於導致業務走不下去。

在事務消息機制中,如果發送某條消息的producer-A宕機,使得事務消息一直處於PREPARED狀態並超時,則broker會回查同一個group的其他producer,確認這條消息應該commit 還是 rollback。

  • 消費者組:

類似於前面提到的生產者組的完全相同的作用,消費者被組合在一起並命名消費組。通過消費者組可以實現消息消費的負載均衡和消息容錯目標。

 

4.3、集群模式

1)單Master模式

這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用。不建議線上環境使用,可以用於本地測試。

2)多Master模式

一個集群無Slave,全是Master,例如2個Master或者3個Master,這種模式的優缺點如下:

  • 優點:配置簡單,單個Master宕機或重啟維護對應用無影響,在磁盤配置為RAID10時,即使機器宕機不可恢復情況下,由於RAID10磁盤非常可靠,消息也不會丟(異步刷盤丟失少量消息,同步刷盤一條不丟),性能最高;
  • 缺點:單台機器宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受到影響。

3)多Master多Slave模式(異步)

每個Master配置一個Slave,有多對Master-Slave,HA采用異步復制方式,主備有短暫消息延遲(毫秒級)。異步復制也就是說在主節點寫入消息成功后就反饋給 producer 已寫入成功,然后主節點再同步消息到從節點。

這種模式的優缺點如下:

  • 優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,同時Master宕機后,消費者仍然可以從Slave消費,而且此過程對應用透明,不需要人工干預,性能同多Master模式幾乎一樣;
  • 缺點:Master宕機,磁盤損壞情況下會丟失少量消息。

4)多Master多Slave模式(同步)

每個Master配置一個Slave,有多對Master-Slave,HA采用同步雙寫方式,即只有主備都寫成功,才向應用返回成功,這種模式的優缺點如下:

  • 優點:數據與服務都無單點故障,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高;
  • 缺點:性能比異步復制模式略低(大約低10%左右),發送單個消息的RT會略高,且目前版本在主節點宕機后,備機不能自動切換為主機。

 

5、mqadmin管理工具

RocketMQ 提供有控制台及一系列控制台命令,用於管理員對主題,集群,broker 等信息的管理。

RocketMQ 安裝目錄的 bin 目錄下有個mqadmin 腳本,可通過執行 ./mqadmin {command} {args} 命令來對集群、主題等信息進行一系列的管理操作。

 

5.1、mqadmin命令介紹

可參考:https://www.cnblogs.com/zyguo/p/4962425.html

 

6、集群監控平台搭建

RocketMQ有一個對其擴展的開源項目incubator-rocketmq-externals,這個項目中有一個子模塊叫rocketmq-console,這個便是管理控制台項目了,先將incubator-rocketmq-externals拉到本地,因為我們需要自己對rocketmq-console進行編譯打包運行。

下載項目:

git clone https://github.com/apache/rocketmq-externals

打包前需修改該項目的配置文件 src\main\resources\application.properties,修改配置如下:

rocketmq.config.namesrvAddr=192.168.32.130:9876;192.168.32.131:9876

編譯打包:

mvn clean package -Dmaven.test.skip=true

將項目上傳至服務器上,啟動rocketmq-console:

java -jar rocketmq-console-ng-1.0.0.jar

啟動成功后,我們就可以通過在服務器上的瀏覽器通過訪問http://localhost:8080進入控制台界面了,類似下圖,不過由於沒有發送過消息,所以圖表都是空的。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM