RocketMq基礎 看這一篇就夠了


RocketMQ

  • 編譯安裝
  • HelloWorld

官方網站

http://rocketmq.apache.org

GitHub

https://github.com/apache/rocketmq

Quick Start

Linux下使用Maven編譯源碼安裝

Rocketmq4.6+需要jdk1.8環境編譯和運行

各版本要求

Version Client Broker NameServer
4.0.0-incubating >=1.7 >=1.8 >=1.8
4.1.0-incubating >=1.6 >=1.8 >=1.8
4.2.0 >=1.6 >=1.8 >=1.8
4.3.x >=1.6 >=1.8 >=1.8
4.4.x >=1.6 >=1.8 >=1.8
4.5.x >=1.6 >=1.8 >=1.8
4.6.x >=1.6 >=1.8 >=1.8

1.從GitHub上下載源碼並上傳到服務器

2.在Linux上安裝Maven

下載Maven

wget https://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz

添加阿里雲鏡像

修改maven/conf目錄下的settings.xml

mirrors節點下添加

<mirror> 
    <id>aliyun-maven</id> 
    <mirrorOf>*</mirrorOf> 
    <name>aliyun maven</name> 
    <url>http://maven.aliyun.com/nexus/content/groups/public</url> 
</mirror>

配置maven環境變量

修改/etc/profile

export M2_HOME=/usr/local/maven
export PATH=$PATH:$M2_HOME/bin

配置java環境變量

export JAVA_HOME="/usr/java/jdk1.8.0_181-amd64"
export CLASS_PATH="$JAVA_HOME/lib"
export PATH=".$PATH:$JAVA_HOME/bin"

環境變量修完執行source /etc/profile立即生效

進入rocketmq主目錄編譯項目

mvn -Prelease-all -DskipTests clean install -U

3.啟動nameserver

bin目錄下執行

./mqnamesrv

正常提示

4.啟動Broker

./mqbroker -n localhost:9876

正常提示

5.測試消息發送

使用tool.sh腳本執行測試程序

bin目錄下執行

./tools.sh org.apache.rocketmq.example.quickstart.Producer

提示如下表示成功

6.接受消息

./tools.sh org.apache.rocketmq.example.quickstart.Consumer

控制台rocketmq-console編譯安裝

下載

https://github.com/apache/rocketmq-externals

中文指南

https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md

上傳到服務器並解壓縮

編譯

進入rocketmq-console目錄

執行編譯

mvn clean package -Dmaven.test.skip=true

啟動

編譯成功后在rocketmq-console/target目錄下執行rocketmq-console-ng-1.0.1.jar

啟動時,直接動態添加nameserver地址或編輯application.properties添加屬性

java -jar rocketmq-console-ng-1.0.1.jar --rocketmq.config.namesrvAddr=127.0.0.1:9876

啟動成功后訪問服務器8080端口即可

pom.xml依賴

 <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.6.1</version>
 </dependency>

安裝啟動常見錯誤

編譯時包無法在mirror上找到 提示502錯誤

原因:網絡不好或maven倉庫服務器出錯

重試即可,或者歡迎鏡像倉庫

發送失敗提示connect to null failed

 ./tools.sh org.apache.rocketmq.example.quickstart.Producer
22:49:02.470 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to
 null failed

原因:不知道nameserver在哪兒

tools腳本中添加

export NAMESRV_ADDR=localhost:9876

啟動broker失敗 Cannot allocate memory

原因:jvm啟動初始化內存分配大於物理內存

[root@node-113b bin]# ./mqbroker -n localhost:9876
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed
; error='Cannot allocate memory' (errno=12)#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /usr/local/rocketmq/bin/hs_err_pid1997.log

修改啟動腳本中的jvm參數

runbroker.sh broker

runserver.sh nameserver

默認數值給的都很大,改小即可

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=100m"

啟動broker成功但提示:Failed to obtain the host name

[root@node-113b bin]# ./mqbroker -n localhost:9876
22:30:42.307 [main] ERROR RocketmqCommon - Failed to obtain the host name
java.net.UnknownHostException: node-113b: node-113b: No address associated with hostname
	at java.net.InetAddress.getLocalHost(InetAddress.java:1505) ~[na:1.8.0_181]
	at org.apache.rocketmq.common.BrokerConfig.localHostName(BrokerConfig.java:189) [rocketmq-common-4.6
.1.jar:4.6.1]	at org.apache.rocketmq.common.BrokerConfig.<init>(BrokerConfig.java:38) [rocketmq-common-4.6.1.jar:4
.6.1]	at org.apache.rocketmq.broker.BrokerStartup.createBrokerController(BrokerStartup.java:110) [rocketmq
-broker-4.6.1.jar:4.6.1]	at org.apache.rocketmq.broker.BrokerStartup.main(BrokerStartup.java:58) [rocketmq-broker-4.6.1.jar:4
.6.1]Caused by: java.net.UnknownHostException: node-113b: No address associated with hostname
	at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) ~[na:1.8.0_181]
	at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) ~[na:1.8.0_181]
	at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) ~[na:1.8.0_181]
	at java.net.InetAddress.getLocalHost(InetAddress.java:1500) ~[na:1.8.0_181]
	... 4 common frames omitted
The broker[DEFAULT_BROKER, 192.168.150.213:10911] boot success. serializeType=JSON and name server is localh
ost:9876

原因:無法解析當前的主機名

hosts里添加映射即可

192.168.150.213 node-113b

linux日期校准

安裝ntpdate

yum install ntpdate

ntpdate ntp1.aliyun.com

RocketMQ 功能 大綱

主流的MQ有很多,比如ActiveMQ、RabbitMQ、RocketMQ、Kafka、ZeroMQ等。

之前阿里巴巴也是使用ActiveMQ,隨着業務發展,ActiveMQ IO 模塊出現瓶頸,后來阿里巴巴 通過一系列優化但是還是不能很好的解決,之后阿里巴巴把注意力放到了主流消息中間件kafka上面,但是kafka並不能滿足他們的要求,尤其是低延遲和高可靠性。

所以RocketMQ是站在巨人的肩膀上(kafka)MetaQ的內核,又對其進行了優化讓其更滿足互聯網公司的特點。它是純Java開發,具有高吞吐量、高可用性、適合大規模分布式系統應用的特點。 RocketMQ目前在阿里集團被廣泛應用於交易、充值、流計算、消息推送、日志流式處理、binglog分發等場景。

RocketMQ介紹

  • 消息隊列應用場景
  • rocketmq中的角色
    • nameserver
  • 官網解讀

消息發送

  • 同步發送

  • 異步發送

  • 單向發送

  • 消息批量發送

  • 消息結構

  • 消息發送流程

消息存儲

  • 存儲方式
  • 發送消息時存儲流程
  • 存儲文件與內存映射
  • 刷盤機制
  • 文件恢復與過期刪除機制
  • 索引

消息消費

  • 消息訂閱
  • 消息拉取和推送
  • 消息處理隊列
  • 順序消費
  • 定時消息機制
  • 消息過濾TAG與sql92
  • FilterServer過濾機制
  • 並發消息消費
  • 消費負載與算法
  • 消費者動態添加
  • 消息消費過程
  • ACK
  • 消費進度與offset

Rocketmq集群 HA

  • 集群搭建
  • 主從同步復制原理
  • 讀寫分離機制

整合

  • 使用spring
  • SpringCloud整合

監控與運維

  • rocketmq-console監控平台
  • 命令行運維 MQAdmin
  • 自定義日志

消息隊列介紹

消息隊列是《數據結構》中先進先出的一種數據結構,在當前的架構中,作為中間件提供服務。

消息中間件功能

應用解耦

AB應用不在互相依賴

流量削峰

流量達到高峰的時候,通常使用限流算法來控制流量涌入系統,避免系統被擊癱,但是這種方式損失了一部分請求

此時可以使用消息中間件來緩沖大量的請求,勻速消費,當消息隊列中堆積消息過多時,我們可以動態上線增加消費端,來保證不丟失重要請求。

大數據處理

消息中間件可以把各個模塊中產生的管理員操作日志、用戶行為、系統狀態等數據文件作為消息收集到主題中

數據使用方可以訂閱自己感興趣的數據內容互不影響,進行消費

異構系統

跨語言

RocketMQ 角色

broker

  • Broker面向producer和consumer接受和發送消息
  • 向nameserver提交自己的信息
  • 是消息中間件的消息存儲、轉發服務器。
  • 每個Broker節點,在啟動時,都會遍歷NameServer列表,與每個NameServer建立長連接,注冊自己的信息,之后定時上報。
broker集群
  • Broker高可用,可以配成Master/Slave結構,Master可寫可讀,Slave只可以讀,Master將寫入的數據同步給Slave。
    • 一個Master可以對應多個Slave,但是一個Slave只能對應一個Master
    • Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義BrokerId為0表示Master,非0表示Slave
  • Master多機負載,可以部署多個broker
    • 每個Broker與nameserver集群中的所有節點建立長連接,定時注冊Topic信息到所有nameserver。

producer

  • 消息的生產者
  • 通過集群中的其中一個節點(隨機選擇)建立長連接,獲得Topic的路由信息,包括Topic下面有哪些Queue,這些Queue分布在哪些Broker上等
  • 接下來向提供Topic服務的Master建立長連接,且定時向Master發送心跳

consumer

消息的消費者,通過NameServer集群獲得Topic的路由信息,連接到對應的Broker上消費消息。

注意,由於Master和Slave都可以讀取消息,因此Consumer會與Master和Slave都建立連接。

nameserver

底層由netty實現,提供了路由管理、服務注冊、服務發現的功能,是一個無狀態節點

nameserver是服務發現者,集群中各個角色(producer、broker、consumer等)都需要定時想nameserver上報自己的狀態,以便互相發現彼此,超時不上報的話,nameserver會把它從列表中剔除

nameserver可以部署多個,當多個nameserver存在的時候,其他角色同時向他們上報信息,以保證高可用,

NameServer集群間互不通信,沒有主備的概念

nameserver內存式存儲,nameserver中的broker、topic等信息默認不會持久化

為什么不用zookeeper?:rocketmq希望為了提高性能,CAP定理,客戶端負載均衡

對比JSM中的Topic和Queue

Topic是一個邏輯上的概念,實際上Message是在每個Broker上以Queue的形式記錄。

對應到JMS中的topic實現是由客戶端來完成的

        consumer.setMessageModel(MessageModel.BROADCASTING);

以上為RocketMq 基礎知識,重點理解RocketMq 的組成和特性。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM