安装
RocketMQ
RocketMQ 使用Java语言开发,因此需要JDK运行环境和Maven编译构建环境。
1、安装JDK
不赘述
2、安装Maven
# 下载3.5.4版本maven安装包
wget http://mirrors.cnnic.cn/apache/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gz
# 解压安装包
tar -zxvf apache-maven-3.5.4-bin.tar.gz
# 重命名目录
mv apache-maven-3.5.4 maven
# 进入安装目录
cd maven/
# 查看安装全路径
pwd
# 安装全路径
/home/bingo/soft/maven
------------------------------------------------------------------------------------
# 切换至root账户
su
# 打开环境变量配置文件
vim /etc/profile
# 在相应位置写入maven环境变量配置
```
# Maven
export MAVEN_HOME=/home/bingo/soft/maven
export PATH=$MAVEN_HOME/bin:$PATH
```
# 重载配置文件使环境变量配置生效
source /etc/profile
# 查看maven版本(若普通账户显示无mvn命令,再次 source /etc/profile 即可)
mvn -version
```
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /home/bingo/soft/maven
Java version: 1.8.0_171, vendor: Oracle Corporation, runtime: /home/bingo/soft/java/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "4.18.0-240.15.1.el8_3.x86_64", arch: "amd64", family: "unix"
```
3、安装RocketMQ
# 下载RocketMQ源码
wget https://github.com/apache/rocketmq/archive/refs/tags/rocketmq-all-4.9.2.tar.gz
# 解压压缩包
tar -zxvf rocketmq-all-4.9.2.tar.gz
# 重命名
mv rocketmq-rocketmq-all-4.9.2/ rocketmq-all
# 进入源码目录
cd rocketmq-all/
# 执行mvn命令编译RocketMQ源码
mvn -Prelease-all -DskipTests clean install -U
# 出现以下内容表示编译成功
```
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Apache RocketMQ 4.9.2 4.9.2 ........................ SUCCESS [03:17 min]
[INFO] rocketmq-logging 4.9.2 ............................. SUCCESS [ 18.488 s]
[INFO] rocketmq-remoting 4.9.2 ............................ SUCCESS [ 4.688 s]
[INFO] rocketmq-common 4.9.2 .............................. SUCCESS [ 7.096 s]
[INFO] rocketmq-client 4.9.2 .............................. SUCCESS [ 9.334 s]
[INFO] rocketmq-store 4.9.2 ............................... SUCCESS [ 4.628 s]
[INFO] rocketmq-srvutil 4.9.2 ............................. SUCCESS [ 0.320 s]
[INFO] rocketmq-filter 4.9.2 .............................. SUCCESS [ 2.143 s]
[INFO] rocketmq-acl 4.9.2 ................................. SUCCESS [ 2.286 s]
[INFO] rocketmq-broker 4.9.2 .............................. SUCCESS [ 3.817 s]
[INFO] rocketmq-tools 4.9.2 ............................... SUCCESS [ 2.196 s]
[INFO] rocketmq-namesrv 4.9.2 ............................. SUCCESS [ 0.773 s]
[INFO] rocketmq-logappender 4.9.2 ......................... SUCCESS [ 1.330 s]
[INFO] rocketmq-test 4.9.2 ................................ SUCCESS [ 4.238 s]
[INFO] rocketmq-openmessaging 4.9.2 ....................... SUCCESS [ 1.898 s]
[INFO] rocketmq-example 4.9.2 ............................. SUCCESS [ 10.156 s]
[INFO] rocketmq-distribution 4.9.2 4.9.2 .................. SUCCESS [01:40 min]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 06:13 min
[INFO] Finished at: 2021-11-15T14:09:47+08:00
[INFO] ------------------------------------------------------------------------
```
# 进入目录
cd distribution/
# 启动RocketMQ NameServer
nohup sh bin/mqnamesrv &
# 报错显示找不到 NameServer 服务启动主类,是因为我们没有进入真正的构建完成目录
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
错误: 找不到或无法加载主类 org.apache.rocketmq.namesrv.NamesrvStartup
# 进入真正的构建完成目录(全路径: /home/bingo/soft/mq/rocketmq/rocketmq-all/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2)
cd rocketmq-all/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2
# 启动RocketMQ NameServer
nohup sh bin/mqnamesrv &
# 启动成功
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
# 启动RocketMQ broker服务
nohup sh bin/mqbroker -n localhost:9876 &
# 报错显示服务器内存不足,因为NameServer和Broker启动文件中都分配了4G内存,通过调整内存分配重新启动
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/bingo/soft/mq/rocketmq/rocketmq-all/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2/hs_err_pid1046990.log
# 先停止NameServer服务
sh bin/mqshutdown namesrv
# 停止broker命令
sh bin/mqshutdown broker
# 修改NameServer服务启动脚本将4g调整为1g或512m
vim runserver.sh
# 修改Broker服务启动脚本将4g调整为1g或512m
vim runbroker.sh
# 新建日志目录(为了方便查询日志)
mkdir logs
# 再次启动NameServer服务
nohup ./bin/mqnamesrv > logs/namesrv.log 2>&1 &
# 启动成功
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
# 再次启动broker服务
nohup ./bin/mqbroker -n localhost:9876 > logs/broker.log 2>&1 &
# 启动成功,监听端口 9876
The broker[iZbp1c3u9bhuo790ircffmZ, 172.28.177.11:10911] boot success. serializeType=JSON and name server is localhost:9876
# 查看进程
jps
# broker & nameserver 服务进程
1111315 BrokerStartup
1090788 NamesrvStartup
# 查看监听端口
netstat -ntlp
# nameserver监听端口 9876
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp6 0 0 :::9876 :::* LISTEN 1090788/java
4、测试用例
执行生产者测试用例
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
报错信息如下:
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:679)
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:509)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.tryToFindTopicPublishInfo(DefaultMQProducerImpl.java:709)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:579)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1391)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1335)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:336)
at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67)
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:394)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1367)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1357)
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:622)
... 7 more
java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:679)
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:509)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.tryToFindTopicPublishInfo(DefaultMQProducerImpl.java:709)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:579)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1391)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1335)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:336)
at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67)
这是因为我们未添加NameServer环境变量
export NAMESRV_ADDR=localhost:9876
再次执行生产者测试用例
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
生产消息如下:
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
SendResult [sendStatus=SEND_OK, msgId=7F000001CE4F7D4991AD4CF361840000, offsetMsgId=AC1CB10B00002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F000001CE4F7D4991AD4CF361C10001, offsetMsgId=AC1CB10B00002A9F00000000000000BE, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F000001CE4F7D4991AD4CF361C60002, offsetMsgId=AC1CB10B00002A9F000000000000017C, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F000001CE4F7D4991AD4CF361D00003, offsetMsgId=AC1CB10B00002A9F000000000000023A, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F000001CE4F7D4991AD4CF361D30004, offsetMsgId=AC1CB10B00002A9F00000000000002F8, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=1], queueOffset=1]
执行消费者测试用例
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
消费消息如下:
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=2, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1636987018689, bornHost=/172.28.177.11:59072, storeTimestamp=1636987018691, storeHost=/172.28.177.11:10911, msgId=AC1CB10B00002A9F00000000000000BE, commitLogOffset=190, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1636987042291, UNIQ_KEY=7F000001CE4F7D4991AD4CF361C10001, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=2, storeSize=190, queueOffset=1, sysFlag=0, bornTimestamp=1636987018710, bornHost=/172.28.177.11:59072, storeTimestamp=1636987018711, storeHost=/172.28.177.11:10911, msgId=AC1CB10B00002A9F00000000000003B6, commitLogOffset=950, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1636987042293, UNIQ_KEY=7F000001CE4F7D4991AD4CF361D60005, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53], transactionId='null'}]]
ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=2, storeSize=190, queueOffset=2, sysFlag=0, bornTimestamp=1636987018719, bornHost=/172.28.177.11:59072, storeTimestamp=1636987018722, storeHost=/172.28.177.11:10911, msgId=AC1CB10B00002A9F00000000000006AE, commitLogOffset=1710, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1636987042293, UNIQ_KEY=7F000001CE4F7D4991AD4CF361DF0009, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57], transactionId='null'}]]
5、控制台
从网上找了很多博文均说从官方github扩展包直接下载-编译即可运行。但实践发现不管是externals包还是dashboard包都无法编译成功。
在将externals包切换到origin/release-rocketmq-console-1.0.0分支,发现rocketmq-console项目有很多过时的方法。
最后通过下载网友们编译好的包直接运行才将控制台搭建完成,搭建过程中也踩了很多坑,一一记录。
编译完成的包已上传网盘以做备份:链接: https://pan.baidu.com/s/1aQUzvjhdTNTEMVDpF3rZWg 密码: fe2t
编写启动命令启动控制台程序
nohup java -Xms512m -Xmx512m -jar rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr='127.0.0.1:9876' > ./logs/console.log 2>&1 &
### 如果不通过 --rocketmq.config.namesrvAddr='127.0.0.1:9876' 外部参数指定NameServer,程序启动成功后不可用。
打开页面,控制台如下
http://localhost:8080/
6、程序
利用SpringBoot2.x下的rocketmq-spring-boot-starter来整合RocketMQ进行消息的生产与消费。
场景为:用户点赞视频,生产者将数据(用户编号+视频编号)通过MQ发往消费者,达到削峰解耦的目的。
生产者
pom.xml依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- common模块中定义了请求参数也为消息体、Topic和消费者组 -->
<dependency>
<groupId>com.bingo</groupId>
<artifactId>common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
application.yaml 配置文件
server:
port: 10000
spring:
application:
name: producer
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: producer-group-praise
生产者程序
package com.bingo.producer.controller;
import com.bingo.common.constants.RocketMQConstants;
import com.bingo.common.request.PraiseRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @program: PraiseController
* @description: 点赞
* @author: Bingo
* @create: 2021-11-16 12:16
**/
@Slf4j
@RestController
public class PraiseController {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 点赞
*
* RocketMQ的消息发送方式主要含syncSend()同步发送、asyncSend()异步发送、sendOneWay()三种方式,
* sendOneWay()也是异步发送,区别在于不需等待Broker返回确认,所以可能会存在信息丢失的状况,但吞吐量更高,具体需根据业务情况选用。
* 性能:sendOneWay > asyncSend > syncSend RocketMQTemplate的send()方法默认是同步(syncSend)的
* @param request
* @return
*/
@PostMapping(value = "praise")
public ResponseEntity praise(@RequestBody PraiseRequest request) {
log.info("点赞请求参数: {}", request);
rocketMQTemplate.sendOneWay(RocketMQConstants.PRAISE_TOPIC, MessageBuilder.withPayload(request).build());
return ResponseEntity.ok(Boolean.TRUE);
}
}
消费者
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- common模块中定义了请求参数也为消息体、Topic和消费者组 -->
<dependency>
<groupId>com.bingo</groupId>
<artifactId>common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
application.yaml 配置文件
server:
port: 10003
spring:
application:
name: consumer3
rocketmq:
name-server: 127.0.0.1:9876
消费者程序
package com.bingo.consumer.listener;
import com.bingo.common.constants.RocketMQConstants;
import com.bingo.common.request.PraiseRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* @program: PraiseListener
* @description: 点赞消费者监听
* @author: Bingo
* @create: 2021-11-16 12:29
**/
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQConstants.PRAISE_TOPIC, consumerGroup = RocketMQConstants.PRAISE_CONSUMER_GROUP)
public class PraiseListener implements RocketMQListener<PraiseRequest> {
@Override
public void onMessage(PraiseRequest request) {
log.info("消费者1消费点赞数据: {}", request);
}
}
程序测试
1、启动生产者程序,多次调用点赞接口
[exec-1] c.b.p.controller.PraiseController : 点赞请求参数: PraiseRequest(videoId=123, userId=456)
[exec-2] c.b.p.controller.PraiseController : 点赞请求参数: PraiseRequest(videoId=1234, userId=456)
[exec-3] c.b.p.controller.PraiseController : 点赞请求参数: PraiseRequest(videoId=12345, userId=456)
[exec-4] c.b.p.controller.PraiseController : 点赞请求参数: PraiseRequest(videoId=123456, userId=456)
[exec-5] c.b.p.controller.PraiseController : 点赞请求参数: PraiseRequest(videoId=1234567, userId=456)
[exec-6] c.b.p.controller.PraiseController : 点赞请求参数: PraiseRequest(videoId=12345678, userId=456)
!!! 调用点赞接口发送生产消息时,无法连接服务器,报错如下:
2021-11-16 14:46:03.659 ERROR 4722 --- [io-10000-exec-1] o.a.r.spring.core.RocketMQTemplate : sendOneWay failed. destination:topic-praise, message:GenericMessage [payload=PraiseRequest(videoId=1234567891234567, userId=456), headers={id=eabf0a33-d4ca-0b03-8773-d3642c1570d0, timestamp=1637045160524}]
2021-11-16 14:46:03.676 ERROR 4722 --- [io-10000-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.messaging.MessagingException: Send [1] times, still failed, cost [3085]ms, Topic: topic-praise, BrokersSent: [iZbp1c3u9bhuo790ircffmZ]
See http://rocketmq.apache.org/docs/faq/ for further details.; nested exception is org.apache.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [3085]ms, Topic: topic-praise, BrokersSent: [iZbp1c3u9bhuo790ircffmZ]
See http://rocketmq.apache.org/docs/faq/ for further details.] with root cause
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to 172.28.177.11:10911 failed
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeOneway(NettyRemotingClient.java:554) ~[rocketmq-remoting-4.9.1.jar:4.9.1]
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:471) ~[rocketmq-client-4.9.1.jar:4.9.1]
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:431) ~[rocketmq-client-4.9.1.jar:4.9.1]
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:877) ~[rocketmq-client-4.9.1.jar:4.9.1]
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:607) ~[rocketmq-client-4.9.1.jar:4.9.1]
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendOneway(DefaultMQProducerImpl.java:1028) ~[rocketmq-client-4.9.1.jar:4.9.1]
at org.apache.rocketmq.client.producer.DefaultMQProducer.sendOneway(DefaultMQProducer.java:403) ~[rocketmq-client-4.9.1.jar:4.9.1]
at org.apache.rocketmq.spring.core.RocketMQTemplate.sendOneWay(RocketMQTemplate.java:911) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
at com.bingo.producer.controller.PraiseController.praise(PraiseController.java:40) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_301]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_301]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_301]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_301]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-5.3.12.jar:5.3.12]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067) ~[spring-webmvc-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) ~[spring-webmvc-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) ~[spring-webmvc-5.3.12.jar:5.3.12]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:681) ~[tomcat-embed-core-9.0.54.jar:4.0.FR]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.12.jar:5.3.12]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:764) ~[tomcat-embed-core-9.0.54.jar:4.0.FR]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.54.jar:9.0.54]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.12.jar:5.3.12]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.12.jar:5.3.12]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.12.jar:5.3.12]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.12.jar:5.3.12]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.12.jar:5.3.12]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.12.jar:5.3.12]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:895) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1722) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.54.jar:9.0.54]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_301]
翻了一圈博客发现需要修改RocketMQ conf/broker.conf文件并在启动broker的时候指定此配置文件
原文链接: https://blog.csdn.net/u014786083/article/details/118714139
修改如下:
# 新增这个配置,IP为 NameServer 地址
brokerIP1=8.136.224.118
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
重新启动broker
nohup ./bin/mqbroker -n localhost:9876 -c conf/broker.conf > logs/broker.log 2>&1 &
2、启动消费者程序,消息无序消费
[MessageThread_5] c.b.consumer.listener.PraiseListener : 消费者1消费点赞数据: PraiseRequest(videoId=123456, userId=456)
[MessageThread_3] c.b.consumer.listener.PraiseListener : 消费者1消费点赞数据: PraiseRequest(videoId=1234, userId=456)
[MessageThread_1] c.b.consumer.listener.PraiseListener : 消费者1消费点赞数据: PraiseRequest(videoId=12345, userId=456)
[MessageThread_6] c.b.consumer.listener.PraiseListener : 消费者1消费点赞数据: PraiseRequest(videoId=12345678, userId=456)
[MessageThread_2] c.b.consumer.listener.PraiseListener : 消费者1消费点赞数据: PraiseRequest(videoId=123, userId=456)
[MessageThread_4] c.b.consumer.listener.PraiseListener : 消费者1消费点赞数据: PraiseRequest(videoId=1234567, userId=456)
3、修改IDEA配置启动多个消费者(修改方式:Edit Configurations... -> Single instance only 前面的勾去掉 -> 修改application.yaml配置文件端口号 即可将同一个程序启动多个端口)
4、多次调用点赞接口
[exec-8] c.b.p.controller.PraiseController : 点赞请求参数: PraiseRequest(videoId=123456789, userId=456)
[exec-9] c.b.p.controller.PraiseController : 点赞请求参数: PraiseRequest(videoId=1234567891, userId=456)
[exec-10] c.b.p.controller.PraiseController : 点赞请求参数: PraiseRequest(videoId=12345678912, userId=456)
[exec-1] c.b.p.controller.PraiseController : 点赞请求参数: PraiseRequest(videoId=123456789123, userId=456)
[exec-2] c.b.p.controller.PraiseController : 点赞请求参数: PraiseRequest(videoId=1234567891234, userId=456)
[exec-3] c.b.p.controller.PraiseController : 点赞请求参数: PraiseRequest(videoId=12345678912345, userId=456)
[exec-4] c.b.p.controller.PraiseController : 点赞请求参数: PraiseRequest(videoId=123456789123456, userId=456)
[exec-5] c.b.p.controller.PraiseController : 点赞请求参数: PraiseRequest(videoId=1234567891234567, userId=456)
5、消费者10001消费消息(10001、10002、10003指的是端口)
[MessageThread_7] c.b.consumer.listener.PraiseListener : 消费者1消费点赞数据: PraiseRequest(videoId=123456789123, userId=456)
[MessageThread_8] c.b.consumer.listener.PraiseListener : 消费者1消费点赞数据: PraiseRequest(videoId=1234567891234, userId=456)
[MessageThread_9] c.b.consumer.listener.PraiseListener : 消费者1消费点赞数据: PraiseRequest(videoId=12345678912345, userId=456)
[MessageThread_10] c.b.consumer.listener.PraiseListener : 消费者1消费点赞数据: PraiseRequest(videoId=1234567891234567, userId=456)
6、消费者10002消费消息
[MessageThread_1] c.b.consumer.listener.PraiseListener : 消费者1消费点赞数据: PraiseRequest(videoId=1234567891, userId=456)
[MessageThread_2] c.b.consumer.listener.PraiseListener : 消费者1消费点赞数据: PraiseRequest(videoId=123456789123456, userId=456)
7、消费者10003消费消息
[MessageThread_1] c.b.consumer.listener.PraiseListener : 消费者1消费点赞数据: PraiseRequest(videoId=123456789, userId=456)
[MessageThread_2] c.b.consumer.listener.PraiseListener : 消费者1消费点赞数据: PraiseRequest(videoId=12345678912, userId=456)
本章所涉及代码已上传giteespringboot-rocketmq-demo
至此 RocketMQ 的单Master环境搭建和简单应用已经完成,后续将继续研究它的其他特性,例如:集群环境搭建、消息重试、消息重复处理、顺序消息、定时消息、批量发送消息、事务消息、回溯消息以及多种消息过滤方式等等。