CentOS7 安裝 RocketMQ 實踐和小示例


CentOS7 安裝 RocketMQ 實踐和小示例

 

1、通過 SSH 工具(比如 XShell)連接到 CentOS7 服務器上;

2、進入到 /usr/local 目錄中:
  cd /usr/local

3、下載二進制版的 rocketmq:
  wget http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip

4、將下載下來的 rocketmq-all-4.4.0-bin-release.zip 解壓:
  unzip rocketmq-all-4.4.0-bin-release.zip
得到
  rocketmq-all-4.4.0-bin-release

5、將 rocketmq-all-4.4.0-bin-release 更名為 rocketmq-all-4.4.0:
  mv rocketmq-all-4.4.0-bin-release rocketmq-all-4.4.0

6、進入到 rocketmq-all-4.4.0 目錄中:
  cd rocketmq-all-4.4.0/bin

7、打開【環境配置】文件 profile:
  vim /etc/profile
在文件末尾加入如下配置:
  export ROCKET_MQ_HOME=/usr/local/rocketmq-all-4.4.0
  export NAMESRV_ADDR=localhost:9876
  export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$MAVEN_HOME/bin:$ROCKET_MQ_HOME/bin:$PATH
  保存並關閉 profile:
  按 ESC,輸入 qw! 回車
  使 profile 立即生效:
    source /etc/profile

8、我們先看下 rocketmq 當前的狀態:
  ps aux | grep rocketmq

9、在當前目錄中創建客戶端配置文件 broker.properties:
  sh mqbroker -m > broker.properties

10、修改配置文件 broker.properties,將 brokerIP1 的值改為我們機器的“公網 IP”,這里是關鍵,並將本次生成此配置文件的時間數據刪除:
  namesrvAddr=localhost:9876
  brokerIP1=公網IP
  brokerName=iZ2zehfhto8e2w4t58a282Z
  brokerClusterName=DefaultCluster
  brokerId=0
  autoCreateTopicEnable=true
  autoCreateSubscriptionGroup=true
  msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
  traceTopicEnable=false
  rejectTransactionMessage=false
  fetchNamesrvAddrByAddressServer=false
  transactionTimeOut=6000
  transactionCheckMax=15
  transactionCheckInterval=60000
  aclEnable=false
  storePathRootDir=/root/store
  storePathCommitLog=/root/store/commitlog
  flushIntervalCommitLog=500
  commitIntervalCommitLog=200
  flushCommitLogTimed=false
  deleteWhen=04
  fileReservedTime=72
  maxTransferBytesOnMessageInMemory=262144
  maxTransferCountOnMessageInMemory=32
  maxTransferBytesOnMessageInDisk=65536
  maxTransferCountOnMessageInDisk=8
  accessMessageInMemoryMaxRatio=40
  messageIndexEnable=true
  messageIndexSafe=false
  haMasterAddress=
  brokerRole=ASYNC_MASTER
  flushDiskType=ASYNC_FLUSH
  cleanFileForciblyEnable=true
  transientStorePoolEnable=false

11、創建日志的存放目錄(如果目錄已存在則忽略):
  mkdir -p /data/log/rocketMQ

12、啟動服務端:
  nohup sh mqnamesrv > /data/log/rocketMQ/server.log &

13、啟動客戶端;

  nohup sh mqbroker -n localhost:9876 -c broker.properties autoCreateTopicEnable=true > /data/log/rocketMQ/client.log &
  注意:這里的地址是 localhost:9876 而非公網 IP。

14、看下 rocketmq 的狀態:
  jps
  出現如下則表明服務端和客戶端均啟動成功:
  2992 Jps
  28282 BrokerStartup
  27484 NamesrvStartup
  也可以通過 ps aux | grep rocketmq 來查看。

15、rocketmq 的配置類:
package com.smbea.rocketMQ;

/**
* rocketMQ 配置
* @author hapday
* @date 2019年5月15日 @time 下午10:47:14
* @since 0.0.1
*
*/
public class RocketMQConfig {

public static final String IP = "10.11.12.13"; // 公網 IP 地址
public static final int PORT = 9876; // 端口,默認端口 9876
public static final String GROUP_NAME = "rocketMQ-group-hapday"; // 組名
public static final String INSTANCE_NAME = "rocketMQ-instance-hapday"; // 實例名
public static final String TOPIC_NAME = "rocketMQ-topic-hapday"; // 主題名
public static final String TAG_NAME = "rocketMQ-tag-hapday"; // 標簽名

}

16、數據生產者:
package com.smbea.rocketMQ;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.artup.common.utils.CommonUtils;

import lombok.extern.slf4j.Slf4j;

/**
* 數據生產者
* @author hapday
* @date 2019年5月15日 @time 下午10:48:13
* @since 0.0.1
*/
@Slf4j
public class RocketMQProducer {

public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer(RocketMQConfig.GROUP_NAME);
producer.setNamesrvAddr(RocketMQConfig.IP + ":" + RocketMQConfig.PORT);
producer.setInstanceName(RocketMQConfig.INSTANCE_NAME);
producer.setVipChannelEnabled(false); // 關閉 VIP 通道

try {
producer.start(); // 啟動【生產者】
} catch (MQClientException e) {
log.error("", e);
}

Message message = new Message();
message.setTopic(RocketMQConfig.TOPIC_NAME);
message.setTags(RocketMQConfig.TAG_NAME);

SendResult sendResult = null;
String content = CommonUtils.getCurrentDateAndTime().concat(" 大家好!我是消息隊列 RocketMQ - "); // 消息的內容

for (int index = 0; index < 10; index++) {
message.setBody( content.concat(String.valueOf(index)).getBytes() );

try {
sendResult = producer.send(message); // 發送消息
} catch (MQClientException e) {
log.error("", e);
} catch (RemotingException e) {
log.error("", e);
} catch (MQBrokerException e) {
log.error("", e);
} catch (InterruptedException e) {
log.error("", e);
}

log.debug("響應結果:{}", sendResult);
}

producer.shutdown(); // 關閉【生產者】
}

}

17、數據消費者:
package com.smbea.rocketMQ;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import lombok.extern.slf4j.Slf4j;

/**
* 數據消費者
* @author hapday
* @date 2019年5月15日 @time 下午10:48:32
* @since 0.0.1
*
*/
@Slf4j
public class RocketMQConsumer {

public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMQConfig.GROUP_NAME);
consumer.setNamesrvAddr(RocketMQConfig.IP + ":" + RocketMQConfig.PORT);
consumer.setInstanceName(RocketMQConfig.INSTANCE_NAME);
consumer.setVipChannelEnabled(false); // 關閉 VIP 通道

try {
consumer.subscribe(RocketMQConfig.TOPIC_NAME, RocketMQConfig.TAG_NAME); // 訂閱
} catch (MQClientException e) {
log.error("", e);
}

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtendList, ConsumeConcurrentlyContext context) {
for (MessageExt messageExtend : messageExtendList) {
log.debug("消息內容:{}", new String(messageExtend.getBody()));
}
// log.debug("context = {}", context);

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

});

try {
consumer.start(); // 啟動【消費者】
} catch (MQClientException e) {
log.error("", e);
}

log.debug("【消費者】已啟動。");
}

}

 

18、關閉服務:
  18.1 關閉 broker 服務: sh mqshutdown broker
  18.2 關閉 namesrv: sh mqshutdown namesrv


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM