1.簡介
RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability. It is the third generation distributed messaging middleware open sourced by Alibaba in 2012. On November 21, 2016, Alibaba donated RocketMQ to the Apache Software Foundation. Next year, on February 20, the Apache Software Foundation announced Apache RocketMQ as a Top-Level Project.
-- quote from Wikipedia
RocketMQ是一個輕量級、高可用、低延時的消息中間件,能實現消息的存儲,消息的失敗重試,批量消息處理,延時消息處理等特性,在各種消息中間件中表現優異。
2.RocketMQ的組成部分
RocketMQ一共包含四個部分:Name Server,Brokers,Producers,Consumers 。這幾個部分都能單獨的水平擴展。
-
NameServer Cluster
輕量級的組件,主要用於服務的發現,能夠讀寫路由信息以及全局的信息,支持高速的存儲;
-
Broker Cluster
輕量級的Topic與Queue的機制去管理數據的存儲,方便實現容錯需要2到3個數據分片的提供。客戶端可以向Broker推送或者拉取數據,同時也支持災難恢復和數據統計;
-
Producer Cluster
Producer能夠進行分布式部署,推送消息到Brokers能進行均衡的分配,還具有高可用,低延時的特性;
-
Consumer Cluster
Consumer 也可以進行分布式的部署,推送和拉取消息,還能實時的訂閱消息,從集群中消費消息,還能實現消息的廣播。
3.RocketMQ的部署
服務器配置:
// 我們這里安裝兩台服務器,都是master,也就是兩台master:
192.168.2.128 nameServer1,brokerServer1
192.168.2.135 nameServer2,brokerServer2
添加Host文件:
vim /etc/hosts
// 添加如下配置
192.168.2.128 rocketmq-nameserver1
192.168.2.128 rocketmq-master1
192.168.2.135 rocketmq-nameserver2
192.168.2.135 rocketmq-master2
// 重啟服務
service network restart
這里我們部署的是RocketMQ的3.2.6的版本,最新的版本可以參閱官網的地址。下載地址如下:
https://archive.apache.org/dist/rocketmq/3.2.6/rocketmq-all-3.2.6-source-release.zip
解壓相關的包:
cd /usr/local
wget https://archive.apache.org/dist/rocketmq/3.2.6/rocketmq-all-3.2.6-source-release.zip
unzip rocketmq-all-3.2.6-source-release.zip
mv rocketmq-all-3.2.6-source-release rocketmq-all-3.2.6
cd rocketmq-all-3.2.6/
// maven打包需要先配置maven環境 這里也可以在window的編譯器中打包相關
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/apache-rocketmq
// 找到tar.gz
tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local
mv alibaba-rocketmq alibaba-rocketmq-3.2.6
ln -s alibaba-rocketmq-3.2.6 rocketmq
創建存儲路徑:
mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index
RocketMq配置文件【兩台機器】
vim /usr/local/rocketmq/conf/2m-noslave/broker-a.properties
vim /usr/local/rocketmq/conf/2m-noslave/broker-b.properties
相關配置文件模板如下:
broker-a.properties:
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a|broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分號分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/data/rocketmq/store
#commitLog 存儲路徑
storePathCommitLog=/data/rocketmq/store/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/data/rocketmq/store/consumequeue
#消息索引存儲路徑
storePathIndex=/data/rocketmq/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/data/rocketmq/store/checkpoint
#abort 文件存儲路徑
abortFile=/data/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步復制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
broker-b.properties:
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a|broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分號分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/data/rocketmq/store
#commitLog 存儲路徑
storePathCommitLog=/data/rocketmq/store/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/data/rocketmq/store/consumequeue
#消息索引存儲路徑
storePathIndex=/data/rocketmq/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/data/rocketmq/store/checkpoint
#abort 文件存儲路徑
abortFile=/data/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步復制Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
修改日志配置文件【兩台機器】
mkdir -p /usr/local/rocketmq/logs
cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
修改啟動NameServer【兩台機器】
vim /usr/local/rocketmq/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -
XX:PermSize=128m -XX:MaxPermSize=320m"
vim /usr/local/rocketmq/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -
XX:PermSize=128m -XX:MaxPermSize=320m"
構建步驟如下:
①Start Name Server
cd /usr/local/rocketmq/bin
nohup sh bin/mqnamesrv &
tail -200f ~/logs/rocketmqlogs/namesrv.log
②Start Broker
// 啟動BrokerServerA:
cd /usr/local/rocketmq/bin
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties &
// 可用jps查看啟動是否成功:
jps
// 啟動BrokerServerB:
cd /usr/local/rocketmq/bin
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-b.properties &
4.部署RocketMQ Console
將rocketmq-web-console 部署到webapps目錄中。
// 下載源碼地址:
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
// 下載后需要修改config.properties
rocketmq.namesrv.addr=192.168.2.128:9876;192.168.2.135:9876
// 使用maven打成war包
// 部署在tomcat如下目錄:
/usr/local/apache-tomcat-7.0.65/webapps/
// 啟動tomcat,並訪問:
http://IP地址:8080/rocketmq-web-console/cluster/list.do
// 既可以查看相應的集群狀況
5.Java調用RocketMQ
pom.xml:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.6.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>3.0.10</version>
<type>pom</type>
</dependency>
Producer:
public class Producer {
public static void main(String[] args) throws Exception {
// 1.分組
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
// 2.服務器集群地址
producer.setNamesrvAddr("192.168.2.128:9876;192.168.2.135:9876");
producer.setInstanceName("producer");
producer.start();
for (int i = 0; i < 1; i++) {
Thread.sleep(1000);
Message message = new Message(
"abc_topic",
"tagA",
("abc--" + i).getBytes()
);
SendResult sendResult = producer.send(message);
System.out.println(sendResult.toString());
System.out.println("producer send:"+message);
}
producer.shutdown();
}
}
Consumer:
public class Consumer {
private static Map<String,String> map = new HashMap<>();
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
consumer.setNamesrvAddr("192.168.2.128:9876;192.168.2.135:9876");
consumer.setInstanceName("consumer");
consumer.subscribe(
"abc_topic",
"tagA"
);
consumer.registerMessageListener(
new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
String msgId = messageExt.getMsgId();
byte[] body = messageExt.getBody();
System.out.println("msgId:" + msgId + ",body:" + new String(body));
}
if(map.get("try")!=null){
System.out.println("重復消費");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
System.out.println("開始報錯");
try {
int w = 1/0;
} catch (Exception e) {
e.printStackTrace();
// map.put("try","1");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 消費狀態 1:代表消費成功 2.代表消費失敗
map.put("try","1");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();
System.out.println("Consumer Started.");
}
}