RocketMq 集群方式搭建 步驟教學包教包會


mq集群方式搭建

有段時間沒寫這些技術文章了, 今天抽空寫一點,不然自己都快忘記了
這篇文章記錄了rocketmq 集群方式搭建的過程, 也是自己半天的成果記錄吧! 感興趣的朋友點個贊在走唄!

好了,廢話不多,下面開搞。

本文章參考https://blog.csdn.net/qq_35400008/article/details/82467562#comments 這個博客文章編寫

准備工作

第一步:關閉要搭建的所有機器的防火牆
第二步:每台機器執行下如下步驟

[root@ma01 ~]# vim /etc/sysconfig/selinux
......
SELINUX=disabled
[root@ma01~]# setenforce 0
[root@ma01~]# getenforce

第三步:所有機器裝好jdk, maven , zip , unzip , ssh 免密登錄

配置crt連接: https://blog.csdn.net/cmqwan/article/details/61932792
安裝maven參考老哥博客:  https://www.cnblogs.com/clicli/p/5866390.html
安裝zip,unzip參考: http://www.rpmfind.net/linux/rpm2html/search.php?query=zip&submit=Search+...&system=&arch=
安裝ssh參考: https://blog.csdn.net/m0_37590135/article/details/74275859
jdk自己百度哈, 很多參考博客的!

第四步: 如下命令是ssh機器之間copy用的命令

scp -r /home/administrator/test/ root@192.168.1.100:/root/

在這里插入圖片描述
第五步: 下載完成后, 解壓

unzip  rocketmq-all-4.4.0-bin-release.zip 

第六步:進入解壓后的文件夾rocketmq-bin4.4.0 , 在文件夾里面新建logs , data/store, data2/store 目錄
在這里插入圖片描述
第七步:安裝順序修改bin下面的幾個啟動文件, 因默認配置內存空間太大,本地啟動會報錯

1. vim runbroker.sh    對應地方更改為  -server -Xms512m -Xmx512m -Xmn256m
2. vim runserver.sh (同樣的道理) -server -Xms512m -Xmx512m -Xmn126m -XX:PermSize=128m -XX:MaxPermSize=320m
3. vim tools.sh           -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m

第八步:到rocketmq-bin4.4.0/conf/2m-2s-async 下 修改這四個文件
在這里插入圖片描述

注: 這里說明下哈, 我是用了三台機器,所有配置了130, 131和132一樣的,你們2台機器完全可以用,131和132配置一台就可以了哈,ip自行更改哈。

第九步: 130主機器修改如下配置文件, broker-a.properties broker-b-s.properties兩個文件 內容分別如下
broker-a.properties

brokerClusterName=RocketMQCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
##Broker 對外服務的監聽端口
listenPort=10911
#nameserver地址,分號分割
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
brokerIP1=192.168.175.130
storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data/store/commitlog
# 消費隊列存儲路徑存儲路徑
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data/store/consumequeue
#消息索引存儲路徑
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data/store/checkpoint
#abort 文件存儲路徑
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data/store/abort
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
# commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存300W條,根據業務情況調整
mapedFileSizeConsumeQueue=3000000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
#diskMaxUsedSpaceRatio=88

broker-b-s.properties

brokerClusterName=RocketMQCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10921
#nameserver地址,分號分割
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
brokerIP1=192.168.175.130
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data2/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/commitlog2
# 消費隊列存儲路徑存儲路徑
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/consumequeue2
#消息索引存儲路徑
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/index2
#checkpoint 文件存儲路徑
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/checkpoint2
#abort 文件存儲路徑
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/abort2
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
# commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存300W條,根據業務情況調整
mapedFileSizeConsumeQueue=3000000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
#diskMaxUsedSpaceRatio=88

第十步: 131, 132 機器只修改 broker-b.properties 和broker-a-s.properties 內容分別如下:
broker-b.properties

# limitations under the License.
brokerClusterName=RocketMQCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
#nameserver地址,分號分割
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
brokerIP1=192.168.175.131
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true

storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data/store/commitlog
# 消費隊列存儲路徑存儲路徑
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data/store/consumequeue
#消息索引存儲路徑
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data/store/checkpoint
#abort 文件存儲路徑
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
# commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存300W條,根據業務情況調整
mapedFileSizeConsumeQueue=3000000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
#diskMaxUsedSpaceRatio=88

broker-a-s.properties

# limitations under the License.
brokerClusterName=RocketMQCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

listenPort=10921
namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876
brokerIP1=192.168.175.131
#在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true

storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data2/store
storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/commitlog
# 消費隊列存儲路徑存儲路徑
storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/consumequeue
#消息索引存儲路徑
storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/checkpoint
#abort 文件存儲路徑
abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000

#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=120
# commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
#diskMaxUsedSpaceRatio=88

第十一步: 啟動


三台都執行:

nohup sh bin/mqnamesrv > ./logs/namesrvrun.log 2>&1 &

130機器執行: 
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-a.log 2>&1 &

nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-b-s.log 2>&1 &

131, 132 機器執行:
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties  -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-b.log 2>&1 &

nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-a-s.log 2>&1 &

執行之后,jps結果,有兩個brokerstartup就行了, 如果報錯的化,看下自己建的logs文件夾日志
在這里插入圖片描述

好了,到此rocketmq 基礎配置就搭建起來了,下面在講一講實戰代碼


導入依賴包

	<dependencies>
		<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>4.4.0</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba.rocketmq</groupId>
			<artifactId>rocketmq-all</artifactId>
			<version>3.5.9</version>
			<type>pom</type>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.1.1</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>1.1.1</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
	</dependencies>

mq消息發送方


import java.io.UnsupportedEncodingException;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

/**
 * 消息發送者
 * @author LELE
 *
 */
public class Producer {

	public static void main(String[] args) throws MQClientException, InterruptedException {

		// 聲明並初始化一個producer
		// 需要一個producer group名字作為構造方法的參數,這里為producer1
		DefaultMQProducer producer = new DefaultMQProducer("producer1");
		producer.setVipChannelEnabled(false);
		// 設置NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔
		// NameServer的地址必須有
		// producer.setClientIP("xxxx");
		// producer.setInstanceName("Producer");
		producer.setNamesrvAddr("192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876");

		// 調用start()方法啟動一個producer實例
		producer.start();

		// 發送1條消息到Topic為TopicTest,tag為TagA,消息內容為“Hello RocketMQ”拼接上i的值
		try {
			for(int i=0;i<30000;i++) {
				// 封裝消息
				Message msg = new Message("TopicTest", // topic
						"TagA", // tag
						("Hello RocketMQ--------"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
				);
				// 調用producer的send()方法發送消息
				// 這里調用的是同步的方式,所以會有返回結果
				SendResult sendResult = producer.send(msg);
				// 打印返回結果
				System.out.println(sendResult);
			}
			
		} catch (RemotingException e) {
			e.printStackTrace();
		} catch (MQBrokerException e) {
			e.printStackTrace();
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		// 發送完消息之后,調用shutdown()方法關閉producer
		System.out.println("send success");
		producer.shutdown();
	}

}

消息消費者


import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * 消息接收者, 需要服務器啟動mq服務
 * @author LELE
 *
 */
public class Consumer {

	public static void main(String[] args) throws MQClientException {

		// 聲明並初始化一個consumer
		// 需要一個consumer group名字作為構造方法的參數,這里為consumer1
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
		// consumer.setVipChannelEnabled(false);
		// 同樣也要設置NameServer地址
		consumer.setNamesrvAddr("192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876");

		// 這里設置的是一個consumer的消費策略
		// CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息
		// CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍
		// CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

		// 設置consumer所訂閱的Topic和Tag,*代表全部的Tag
		consumer.subscribe("TopicTest", "*");

		// 設置一個Listener,主要進行消息的邏輯處理
		consumer.registerMessageListener(new MessageListenerConcurrently() {

			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

				MessageExt msg = msgs.get(0);

				if (msg.getTopic().equals("TopicTest")) {

					// 執行TopicTest1的消費邏輯

					System.out.println("TagA:" + new String(msg.getBody()));

				}

				System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()
						+ "----------------------------------------------------------------------------------");
				// 返回消費狀態
				// CONSUME_SUCCESS 消費成功
				// RECONSUME_LATER 消費失敗,需要稍后重新消費
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});
		// 調用start()方法啟動consumer
		consumer.start();
		System.out.println("Consumer Started.");

	}
}

啟動開始盡情玩耍吧,少年, 記得點贊哦!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM