- 准備工作

- 3個虛擬機節點的構成如下 :

- 安裝步驟

- 操作過程
1、安裝包已經上傳至其中1個節點。

2、解壓縮安裝包
命令:unzip rocketmq-all-4.0.0-incubating-bin-release.zip
解壓縮之后如下:

3、 我這里將解壓縮之后的文件夾移動了位置,並修改了名字,以便后續操作。
命令: mv /home/hadmin/software/apache-rocketmq-all/ /home/hadmin/rocketmq
移動之后路徑如下:

4、修改配置文件
我這里已經將配置文件提前准備好了,只呈現以下配置文件的結果。
默認配置可以參考源碼:
Tips1:
autoCreateTopicEnable,建議線下開啟,線上關閉。
Tips2:
同一個機器上有多個Broker時,各個Broker的文件路徑要獨立。
■節點1(192.168.6.3)配置文件:
《broker-a-m.properties》
brokerClusterName=post brokerName=broker-a namesrvAddr=192.168.6.3:9876;192.168.6.4:9876 brokerId=0 listenPort=10911 brokerIP1=192.168.6.3 deleteWhen=04 fileReservedTime=72 brokerRole=ASYNC_MASTER storePathRootDir=/home/hadmin/data/rocketmq/rootdir-a-m storePathCommitLog=/home/hadmin/data/rocketmq/commitlog-a-m defaultTopicQueueNums=4 autoCreateTopicEnable=false autoCreateSubscriptionGroup=true flushDiskType=ASYNC_FLUSH
《broker-c-s.properties》
brokerClusterName=post brokerName=broker-c namesrvAddr=192.168.6.3:9876;192.168.6.4:9876 brokerId=1 listenPort=10920 brokerIP1=192.168.6.3 deleteWhen=04 fileReservedTime=72 brokerRole=SLAVE storePathRootDir=/home/hadmin/data/rocketmq/rootdir-c-s storePathCommitLog=/home/hadmin/data/rocketmq/commitlog-c-s defaultTopicQueueNums=4 autoCreateTopicEnable=false autoCreateSubscriptionGroup=true flushDiskType=ASYNC_FLUSH
■節點2(192.168.6.4)配置文件:
《broker-a-s.properties》
brokerClusterName=post brokerName=broker-a namesrvAddr=192.168.6.3:9876;192.168.6.4:9876 brokerId=1 listenPort=10920 deleteWhen=04 brokerIP1=192.168.6.4 fileReservedTime=72 brokerRole=SLAVE storePathRootDir=/home/hadmin/data/rocketmq/rootdir-a-s storePathCommitLog=/home/hadmin/data/rocketmq/commitlog-a-s defaultTopicQueueNums=4 autoCreateTopicEnable=false autoCreateSubscriptionGroup=true flushDiskType=ASYNC_FLUSH
《broker-b-m.properties》
brokerClusterName=post brokerName=broker-b namesrvAddr=192.168.6.3:9876;192.168.6.4:9876 brokerId=0 listenPort=10911 brokerIP1=192.168.6.4 deleteWhen=04 fileReservedTime=72 brokerRole=ASYNC_MASTER storePathRootDir=/home/hadmin/data/rocketmq/rootdir-b-m storePathCommitLog=/home/hadmin/data/rocketmq/commitlog-b-m defaultTopicQueueNums=4 autoCreateTopicEnable=false autoCreateSubscriptionGroup=true flushDiskType=ASYNC_FLUSH
■節點3(192.168.6.5)配置文件:
《broker-b-s.properties》
brokerClusterName=post brokerName=broker-b namesrvAddr=192.168.6.3:9876;192.168.6.4:9876 brokerId=1 listenPort=10920 brokerIP1=192.168.6.5 deleteWhen=04 fileReservedTime=72 brokerRole=SLAVE storePathRootDir=/home/hadmin/data/rocketmq/rootdir-b-s storePathCommitLog=/home/hadmin/data/rocketmq/commitlog-b-s defaultTopicQueueNums=4 autoCreateTopicEnable=false autoCreateSubscriptionGroup=true flushDiskType=ASYNC_FLUSH
《broker-c-m.properties》
brokerClusterName=post brokerName=broker-c namesrvAddr=192.168.6.3:9876;192.168.6.4:9876 brokerId=0 listenPort=10911 brokerIP1=192.168.6.5 deleteWhen=04 fileReservedTime=72 brokerRole=ASYNC_MASTER storePathRootDir=/home/hadmin/data/rocketmq/rootdir-c-m storePathCommitLog=/home/hadmin/data/rocketmq/commitlog-c-m defaultTopicQueueNums=4 autoCreateTopicEnable=false autoCreateSubscriptionGroup=true flushDiskType=ASYNC_FLUSH
5、啟動nameserver
啟動命令:nohup sh /home/hadmin/rocketmq/bin/mqnamesrv >/home/hadmin/rocketmq/logs/mqnamesrv.log 2>&1 &
注意:我這里將啟動日志重定義到了logs路徑下,需要提前手動創建logs文件夾,以便於統一管理日志,方便查看。
創建文件夾命令:mkdir /home/hadmin/rocketmq/logs
下圖中展示了啟動命令,日志中可以看到NameServer成功啟動的日志。
■NameServer - 節點1

■NameServer = 節點2

6、啟動Broker-a(Master位於節點1、Slave位於節點2)
broker-a分為Master和Slave,分別位於節點1和節點2上,需要分別啟動。
注意:需要根據啟動角色,為broker指定一個配置文件。
■broker-a的master - 節點1
命令: nohup sh /home/hadmin/rocketmq/bin/mqbroker -c /home/hadmin/rocketmq/conf/broker-a-m.properties >/home/hadmin/rocketmq/logs/broker-a-m.log 2>&1 &

■broker-a的slave - 節點2
命令:nohup sh /home/hadmin/rocketmq/bin/mqbroker -c /home/hadmin/rocketmq/conf/broker-a-s.properties >/home/hadmin/rocketmq/logs/broker-a-s.log 2>&1 &

■驗證broker-a:
broker-a啟動結束,這時候可以使用命令查看一下rocketmq集群狀態。
命令:sh /home/hadmin/rocketmq/bin/mqadmin clusterList -n 192.168.6.3:9876

7、啟動broker-b(Master位於節點2,、Slave位於節點3)
■broker-b的Master - 節點2
命令:nohup sh /home/hadmin/rocketmq/bin/mqbroker -c /home/hadmin/rocketmq/conf/broker-b-m.properties >/home/hadmin/rocketmq/logs/broker-b-m.log 2>&1 &

■broker-b的Slave - 節點3
命令:nohup sh /home/hadmin/rocketmq/bin/mqbroker -c /home/hadmin/rocketmq/conf/broker-b-s.properties >/home/hadmin/rocketmq/logs/broker-b-s.log 2>&1 &

■驗證broker-b
命令:sh /home/hadmin/rocketmq/bin/mqadmin clusterList -n 192.168.6.3:9876

8、 啟動broker-b(Master位於節點3,、Slave位於節點1)
■broker-c的Master - 節點3
命令:nohup sh /home/hadmin/rocketmq/bin/mqbroker -c /home/hadmin/rocketmq/conf/broker-c-m.properties >/home/hadmin/rocketmq/logs/broker-c-m.log 2>&1 &

■broker-c的Slave - 節點1
命令:nohup sh /home/hadmin/rocketmq/bin/mqbroker -c /home/hadmin/rocketmq/conf/broker-c-s.properties >/home/hadmin/rocketmq/logs/broker-c-s.log 2>&1 &

■驗證broker-c
命令:sh /home/hadmin/rocketmq/bin/mqadmin clusterList -n 192.168.6.3:9876

- 問題1:
啟動broker的時候提示內存不夠的錯誤。

解決方法:由於個人電腦配置不夠,無法為虛擬機申請更大的內存。所以,采用修改broker啟動內存的方式解決了。
修改文件路徑:{ROCKET_HOME}/bin/runbroker.sh
修改前:JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
修改后:JAVA_OPT="${JAVA_OPT} -server -Xms1024m -Xmx1536m -Xmn1546m"
如圖所示:

重新啟動broker,日志中沒有錯誤,jps進程中也可以看到broker正常啟動。


- 問題2:
在同一台機器上啟動多個broker的時候提示如下錯誤。

問題原因:
按照本文最開始的額圖片所示,同一台機器上會存在兩個Broker,如果不進行特殊指定,broker的默認端口是10911。
所以一台機器上啟動兩個broker時,第二個broker就會出現端口被占用的錯誤。
解決辦法:
修改rocketmq的配置文件,增加listenPort配置。配置之后如下所示:
→同一台機器的配置文件

→broker-a-m.properties

→broker-c-s.properties

使用jps查看一下進程是否有問題

在查看一下啟動日志是否有問題

最后使用clusterList命令來驗證一下集群健康狀態。
命令:sh bin/mqadmin clusterList -n 192.168.6.3:9876

成功實現了,如本文最開始圖片所示的安裝部署。
- 問題3:
同一台機器上啟動多個broker,在啟動第二個broker的時候報錯
RocketMq Lock failed,MQ already started;
解決辦法:
路徑沖突了,修改各種路徑即可。我這版修改了一下5個路徑之后,解決。
storePathRootDir=/home/radmin/data/rocketmq/rootdir-b-m storePathCommitLog=/home/radmin/data/rocketmq/commitlog-b-m storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-b-m storePathIndex=/home/radmin/data/rocketmq/index-b-m storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-b-m
- 使用程序測試
編寫了java程序代碼嘗試向集群中生產消息,程序代碼如下:
需要注意的是,如果autoCreateTopicEnable=false的時候,需要自己手動創建Topic。
命令樣例:sh $MQ_HOME/bin/mqadmin updateTopic -c post -n "192.168.1.80:9876;192.168.1.81:9876" -t QCH2 -r 4 -w 4
package post;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.UUID;
public class ProducerTest {
private static DefaultMQProducer producer = null;
public static void main(String[] args) {
System.out.print("[----------]Start");
boolean result = false;
try {
ProducerStart();
SendMessage("qch_20170706", "hello rocketmq!");
}finally {
producer.shutdown();
}
System.out.print("[----------]Succeed");
}
private static boolean ProducerStart() {
producer = new DefaultMQProducer("pro_qch_test");
producer.setNamesrvAddr("192.168.6.3:9876;192.168.6.4:9876");
producer.setInstanceName(UUID.randomUUID().toString());
producer.setVipChannelEnabled(false);
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
return false;
}
return true;
}
private static boolean SendMessage(String topic, String str) {
Message msg = new Message(topic, str.getBytes());
try {
producer.send(msg);
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
return false;
}
return true;
}
}
運行之后,日志中提示下面的錯誤:

問題原因:
因為broker部署在虛擬機,並且虛擬雙網卡,client無法正常連接服務端。
解決方法:
可以在broker的配置文件中配置brokerIP1(本機IP)屬性。
修改后配置文件如下圖所示:

修改之后,重新啟動rocketmq集群,運行生產者程序,確認正常結束。

然后,有嘗試這編寫消費者代碼,驗證是否可以正常消費。結果正常,這里貼一下代碼及結果日志。
package post;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.UUID;
public class ConsumerTest {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("con_qch_test");
consumer.setInstanceName(UUID.randomUUID().toString());
consumer.setConsumeMessageBatchMaxSize(32);
consumer.setNamesrvAddr("192.168.6.3:9876;192.168.6.4:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(MessageExt me : list) {
System.out.print(new String(me.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
try {
consumer.subscribe("qch_20170706", "*");
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}

樣例代碼上傳到了git上了,有需要的可以去參考。
https://github.com/quchunhui/rocketmq_sample
--END--
