mq集群方式搭建
有段時間沒寫這些技術文章了, 今天抽空寫一點,不然自己都快忘記了
這篇文章記錄了rocketmq 集群方式搭建的過程, 也是自己半天的成果記錄吧! 感興趣的朋友點個贊在走唄!
好了,廢話不多,下面開搞。
本文章參考https://blog.csdn.net/qq_35400008/article/details/82467562#comments 這個博客文章編寫
准備工作
第一步:關閉要搭建的所有機器的防火牆
第二步:每台機器執行下如下步驟
[root@ma01 ~]# vim /etc/sysconfig/selinux
......
SELINUX=disabled
[root@ma01~]# setenforce 0
[root@ma01~]# getenforce
第三步:所有機器裝好jdk, maven , zip , unzip , ssh 免密登錄
配置crt連接: https://blog.csdn.net/cmqwan/article/details/61932792
安裝maven參考老哥博客: https://www.cnblogs.com/clicli/p/5866390.html
安裝zip,unzip參考: http://www.rpmfind.net/linux/rpm2html/search.php?query=zip&submit=Search+...&system=&arch=
安裝ssh參考: https://blog.csdn.net/m0_37590135/article/details/74275859
jdk自己百度哈, 很多參考博客的!
第四步: 如下命令是ssh機器之間copy用的命令
scp -r /home/administrator/test/ root@192.168.1.100:/root/
第五步: 下載完成后, 解壓
unzip rocketmq-all-4.4.0-bin-release.zip
第六步:進入解壓后的文件夾rocketmq-bin4.4.0 , 在文件夾里面新建logs , data/store, data2/store 目錄
第七步:安裝順序修改bin下面的幾個啟動文件, 因默認配置內存空間太大,本地啟動會報錯
1. vim runbroker.sh 對應地方更改為 -server -Xms512m -Xmx512m -Xmn256m
2. vim runserver.sh (同樣的道理) -server -Xms512m -Xmx512m -Xmn126m -XX:PermSize=128m -XX:MaxPermSize=320m
3. vim tools.sh -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m
第八步:到rocketmq-bin4.4.0/conf/2m-2s-async 下 修改這四個文件
注: 這里說明下哈, 我是用了三台機器,所有配置了130, 131和132一樣的,你們2台機器完全可以用,131和132配置一台就可以了哈,ip自行更改哈。
第九步: 130主機器修改如下配置文件, broker-a.properties broker-b-s.properties兩個文件 內容分別如下
broker-a.properties
brokerClusterName=RocketMQCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
##Broker 對外服務的監聽端口
listenPort=10911
#nameserver地址,分號分割
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
brokerIP1=192.168.175.130
storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data/store/commitlog
# 消費隊列存儲路徑存儲路徑
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data/store/consumequeue
#消息索引存儲路徑
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data/store/checkpoint
#abort 文件存儲路徑
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data/store/abort
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
# commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存300W條,根據業務情況調整
mapedFileSizeConsumeQueue=3000000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
#diskMaxUsedSpaceRatio=88
broker-b-s.properties
brokerClusterName=RocketMQCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10921
#nameserver地址,分號分割
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
brokerIP1=192.168.175.130
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data2/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/commitlog2
# 消費隊列存儲路徑存儲路徑
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/consumequeue2
#消息索引存儲路徑
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/index2
#checkpoint 文件存儲路徑
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/checkpoint2
#abort 文件存儲路徑
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/abort2
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
# commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存300W條,根據業務情況調整
mapedFileSizeConsumeQueue=3000000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
#diskMaxUsedSpaceRatio=88
第十步: 131, 132 機器只修改 broker-b.properties 和broker-a-s.properties 內容分別如下:
broker-b.properties
# limitations under the License.
brokerClusterName=RocketMQCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
#nameserver地址,分號分割
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
brokerIP1=192.168.175.131
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data/store/commitlog
# 消費隊列存儲路徑存儲路徑
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data/store/consumequeue
#消息索引存儲路徑
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data/store/checkpoint
#abort 文件存儲路徑
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
# commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存300W條,根據業務情況調整
mapedFileSizeConsumeQueue=3000000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
#diskMaxUsedSpaceRatio=88
broker-a-s.properties
# limitations under the License.
brokerClusterName=RocketMQCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10921
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
brokerIP1=192.168.175.131
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data2/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/commitlog
# 消費隊列存儲路徑存儲路徑
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/consumequeue
#消息索引存儲路徑
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/checkpoint
#abort 文件存儲路徑
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
# commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
#diskMaxUsedSpaceRatio=88
第十一步: 啟動
三台都執行:
nohup sh bin/mqnamesrv > ./logs/namesrvrun.log 2>&1 &
130機器執行:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-a.log 2>&1 &
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-b-s.log 2>&1 &
131, 132 機器執行:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-b.log 2>&1 &
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-a-s.log 2>&1 &
執行之后,jps結果,有兩個brokerstartup就行了, 如果報錯的化,看下自己建的logs文件夾日志
好了,到此rocketmq 基礎配置就搭建起來了,下面在講一講實戰代碼
導入依賴包
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>3.5.9</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
mq消息發送方
import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* 消息發送者
* @author LELE
*
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 聲明並初始化一個producer
// 需要一個producer group名字作為構造方法的參數,這里為producer1
DefaultMQProducer producer = new DefaultMQProducer("producer1");
producer.setVipChannelEnabled(false);
// 設置NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔
// NameServer的地址必須有
// producer.setClientIP("xxxx");
// producer.setInstanceName("Producer");
producer.setNamesrvAddr("192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876");
// 調用start()方法啟動一個producer實例
producer.start();
// 發送1條消息到Topic為TopicTest,tag為TagA,消息內容為“Hello RocketMQ”拼接上i的值
try {
for(int i=0;i<30000;i++) {
// 封裝消息
Message msg = new Message("TopicTest", // topic
"TagA", // tag
("Hello RocketMQ--------"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
);
// 調用producer的send()方法發送消息
// 這里調用的是同步的方式,所以會有返回結果
SendResult sendResult = producer.send(msg);
// 打印返回結果
System.out.println(sendResult);
}
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
// 發送完消息之后,調用shutdown()方法關閉producer
System.out.println("send success");
producer.shutdown();
}
}
消息消費者
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
/**
* 消息接收者, 需要服務器啟動mq服務
* @author LELE
*
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 聲明並初始化一個consumer
// 需要一個consumer group名字作為構造方法的參數,這里為consumer1
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
// consumer.setVipChannelEnabled(false);
// 同樣也要設置NameServer地址
consumer.setNamesrvAddr("192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876");
// 這里設置的是一個consumer的消費策略
// CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息
// CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍
// CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 設置consumer所訂閱的Topic和Tag,*代表全部的Tag
consumer.subscribe("TopicTest", "*");
// 設置一個Listener,主要進行消息的邏輯處理
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("TopicTest")) {
// 執行TopicTest1的消費邏輯
System.out.println("TagA:" + new String(msg.getBody()));
}
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()
+ "----------------------------------------------------------------------------------");
// 返回消費狀態
// CONSUME_SUCCESS 消費成功
// RECONSUME_LATER 消費失敗,需要稍后重新消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 調用start()方法啟動consumer
consumer.start();
System.out.println("Consumer Started.");
}
}
啟動開始盡情玩耍吧,少年, 記得點贊哦!