安裝
RocketMQ
RocketMQ 使用Java語言開發,因此需要JDK運行環境和Maven編譯構建環境。
1、安裝JDK
不贅述
2、安裝Maven
# 下載3.5.4版本maven安裝包
wget http://mirrors.cnnic.cn/apache/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gz
# 解壓安裝包
tar -zxvf apache-maven-3.5.4-bin.tar.gz
# 重命名目錄
mv apache-maven-3.5.4 maven
# 進入安裝目錄
cd maven/
# 查看安裝全路徑
pwd
# 安裝全路徑
/home/bingo/soft/maven
------------------------------------------------------------------------------------
# 切換至root賬戶
su
# 打開環境變量配置文件
vim /etc/profile
# 在相應位置寫入maven環境變量配置
```
# Maven
export MAVEN_HOME=/home/bingo/soft/maven
export PATH=$MAVEN_HOME/bin:$PATH
```
# 重載配置文件使環境變量配置生效
source /etc/profile
# 查看maven版本(若普通賬戶顯示無mvn命令,再次 source /etc/profile 即可)
mvn -version
```
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /home/bingo/soft/maven
Java version: 1.8.0_171, vendor: Oracle Corporation, runtime: /home/bingo/soft/java/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "4.18.0-240.15.1.el8_3.x86_64", arch: "amd64", family: "unix"
```
3、安裝RocketMQ
# 下載RocketMQ源碼
wget https://github.com/apache/rocketmq/archive/refs/tags/rocketmq-all-4.9.2.tar.gz
# 解壓壓縮包
tar -zxvf rocketmq-all-4.9.2.tar.gz
# 重命名
mv rocketmq-rocketmq-all-4.9.2/ rocketmq-all
# 進入源碼目錄
cd rocketmq-all/
# 執行mvn命令編譯RocketMQ源碼
mvn -Prelease-all -DskipTests clean install -U
# 出現以下內容表示編譯成功
```
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Apache RocketMQ 4.9.2 4.9.2 ........................ SUCCESS [03:17 min]
[INFO] rocketmq-logging 4.9.2 ............................. SUCCESS [ 18.488 s]
[INFO] rocketmq-remoting 4.9.2 ............................ SUCCESS [ 4.688 s]
[INFO] rocketmq-common 4.9.2 .............................. SUCCESS [ 7.096 s]
[INFO] rocketmq-client 4.9.2 .............................. SUCCESS [ 9.334 s]
[INFO] rocketmq-store 4.9.2 ............................... SUCCESS [ 4.628 s]
[INFO] rocketmq-srvutil 4.9.2 ............................. SUCCESS [ 0.320 s]
[INFO] rocketmq-filter 4.9.2 .............................. SUCCESS [ 2.143 s]
[INFO] rocketmq-acl 4.9.2 ................................. SUCCESS [ 2.286 s]
[INFO] rocketmq-broker 4.9.2 .............................. SUCCESS [ 3.817 s]
[INFO] rocketmq-tools 4.9.2 ............................... SUCCESS [ 2.196 s]
[INFO] rocketmq-namesrv 4.9.2 ............................. SUCCESS [ 0.773 s]
[INFO] rocketmq-logappender 4.9.2 ......................... SUCCESS [ 1.330 s]
[INFO] rocketmq-test 4.9.2 ................................ SUCCESS [ 4.238 s]
[INFO] rocketmq-openmessaging 4.9.2 ....................... SUCCESS [ 1.898 s]
[INFO] rocketmq-example 4.9.2 ............................. SUCCESS [ 10.156 s]
[INFO] rocketmq-distribution 4.9.2 4.9.2 .................. SUCCESS [01:40 min]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 06:13 min
[INFO] Finished at: 2021-11-15T14:09:47+08:00
[INFO] ------------------------------------------------------------------------
```
# 進入目錄
cd distribution/
# 啟動RocketMQ NameServer
nohup sh bin/mqnamesrv &
# 報錯顯示找不到 NameServer 服務啟動主類,是因為我們沒有進入真正的構建完成目錄
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
錯誤: 找不到或無法加載主類 org.apache.rocketmq.namesrv.NamesrvStartup
# 進入真正的構建完成目錄(全路徑: /home/bingo/soft/mq/rocketmq/rocketmq-all/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2)
cd rocketmq-all/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2
# 啟動RocketMQ NameServer
nohup sh bin/mqnamesrv &
# 啟動成功
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
# 啟動RocketMQ broker服務
nohup sh bin/mqbroker -n localhost:9876 &
# 報錯顯示服務器內存不足,因為NameServer和Broker啟動文件中都分配了4G內存,通過調整內存分配重新啟動
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/bingo/soft/mq/rocketmq/rocketmq-all/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2/hs_err_pid1046990.log
# 先停止NameServer服務
sh bin/mqshutdown namesrv
# 停止broker命令
sh bin/mqshutdown broker
# 修改NameServer服務啟動腳本將4g調整為1g或512m
vim runserver.sh
# 修改Broker服務啟動腳本將4g調整為1g或512m
vim runbroker.sh
# 新建日志目錄(為了方便查詢日志)
mkdir logs
# 再次啟動NameServer服務
nohup ./bin/mqnamesrv > logs/namesrv.log 2>&1 &
# 啟動成功
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
# 再次啟動broker服務
nohup ./bin/mqbroker -n localhost:9876 > logs/broker.log 2>&1 &
# 啟動成功,監聽端口 9876
The broker[iZbp1c3u9bhuo790ircffmZ, 172.28.177.11:10911] boot success. serializeType=JSON and name server is localhost:9876
# 查看進程
jps
# broker & nameserver 服務進程
1111315 BrokerStartup
1090788 NamesrvStartup
# 查看監聽端口
netstat -ntlp
# nameserver監聽端口 9876
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp6 0 0 :::9876 :::* LISTEN 1090788/java
4、測試用例
執行生產者測試用例
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
報錯信息如下:
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:679)
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:509)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.tryToFindTopicPublishInfo(DefaultMQProducerImpl.java:709)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:579)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1391)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1335)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:336)
at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67)
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:394)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1367)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1357)
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:622)
... 7 more
java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:679)
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:509)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.tryToFindTopicPublishInfo(DefaultMQProducerImpl.java:709)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:579)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1391)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1335)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:336)
at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67)
這是因為我們未添加NameServer環境變量
export NAMESRV_ADDR=localhost:9876
再次執行生產者測試用例
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
生產消息如下:
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
SendResult [sendStatus=SEND_OK, msgId=7F000001CE4F7D4991AD4CF361840000, offsetMsgId=AC1CB10B00002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F000001CE4F7D4991AD4CF361C10001, offsetMsgId=AC1CB10B00002A9F00000000000000BE, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F000001CE4F7D4991AD4CF361C60002, offsetMsgId=AC1CB10B00002A9F000000000000017C, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F000001CE4F7D4991AD4CF361D00003, offsetMsgId=AC1CB10B00002A9F000000000000023A, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F000001CE4F7D4991AD4CF361D30004, offsetMsgId=AC1CB10B00002A9F00000000000002F8, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=1], queueOffset=1]
執行消費者測試用例
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
消費消息如下:
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=2, storeSize=190, queueOffset=0, sysFlag=0, bornTimestamp=1636987018689, bornHost=/172.28.177.11:59072, storeTimestamp=1636987018691, storeHost=/172.28.177.11:10911, msgId=AC1CB10B00002A9F00000000000000BE, commitLogOffset=190, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1636987042291, UNIQ_KEY=7F000001CE4F7D4991AD4CF361C10001, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=2, storeSize=190, queueOffset=1, sysFlag=0, bornTimestamp=1636987018710, bornHost=/172.28.177.11:59072, storeTimestamp=1636987018711, storeHost=/172.28.177.11:10911, msgId=AC1CB10B00002A9F00000000000003B6, commitLogOffset=950, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1636987042293, UNIQ_KEY=7F000001CE4F7D4991AD4CF361D60005, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53], transactionId='null'}]]
ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=iZbp1c3u9bhuo790ircffmZ, queueId=2, storeSize=190, queueOffset=2, sysFlag=0, bornTimestamp=1636987018719, bornHost=/172.28.177.11:59072, storeTimestamp=1636987018722, storeHost=/172.28.177.11:10911, msgId=AC1CB10B00002A9F00000000000006AE, commitLogOffset=1710, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1636987042293, UNIQ_KEY=7F000001CE4F7D4991AD4CF361DF0009, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57], transactionId='null'}]]
5、控制台
從網上找了很多博文均說從官方github擴展包直接下載-編譯即可運行。但實踐發現不管是externals包還是dashboard包都無法編譯成功。
在將externals包切換到origin/release-rocketmq-console-1.0.0分支,發現rocketmq-console項目有很多過時的方法。
最后通過下載網友們編譯好的包直接運行才將控制台搭建完成,搭建過程中也踩了很多坑,一一記錄。
編譯完成的包已上傳網盤以做備份:鏈接: https://pan.baidu.com/s/1aQUzvjhdTNTEMVDpF3rZWg 密碼: fe2t
編寫啟動命令啟動控制台程序
nohup java -Xms512m -Xmx512m -jar rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr='127.0.0.1:9876' > ./logs/console.log 2>&1 &
### 如果不通過 --rocketmq.config.namesrvAddr='127.0.0.1:9876' 外部參數指定NameServer,程序啟動成功后不可用。
打開頁面,控制台如下
http://localhost:8080/
6、程序
利用SpringBoot2.x下的rocketmq-spring-boot-starter來整合RocketMQ進行消息的生產與消費。
場景為:用戶點贊視頻,生產者將數據(用戶編號+視頻編號)通過MQ發往消費者,達到削峰解耦的目的。
生產者
pom.xml依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- common模塊中定義了請求參數也為消息體、Topic和消費者組 -->
<dependency>
<groupId>com.bingo</groupId>
<artifactId>common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
application.yaml 配置文件
server:
port: 10000
spring:
application:
name: producer
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: producer-group-praise
生產者程序
package com.bingo.producer.controller;
import com.bingo.common.constants.RocketMQConstants;
import com.bingo.common.request.PraiseRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @program: PraiseController
* @description: 點贊
* @author: Bingo
* @create: 2021-11-16 12:16
**/
@Slf4j
@RestController
public class PraiseController {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 點贊
*
* RocketMQ的消息發送方式主要含syncSend()同步發送、asyncSend()異步發送、sendOneWay()三種方式,
* sendOneWay()也是異步發送,區別在於不需等待Broker返回確認,所以可能會存在信息丟失的狀況,但吞吐量更高,具體需根據業務情況選用。
* 性能:sendOneWay > asyncSend > syncSend RocketMQTemplate的send()方法默認是同步(syncSend)的
* @param request
* @return
*/
@PostMapping(value = "praise")
public ResponseEntity praise(@RequestBody PraiseRequest request) {
log.info("點贊請求參數: {}", request);
rocketMQTemplate.sendOneWay(RocketMQConstants.PRAISE_TOPIC, MessageBuilder.withPayload(request).build());
return ResponseEntity.ok(Boolean.TRUE);
}
}
消費者
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- common模塊中定義了請求參數也為消息體、Topic和消費者組 -->
<dependency>
<groupId>com.bingo</groupId>
<artifactId>common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
application.yaml 配置文件
server:
port: 10003
spring:
application:
name: consumer3
rocketmq:
name-server: 127.0.0.1:9876
消費者程序
package com.bingo.consumer.listener;
import com.bingo.common.constants.RocketMQConstants;
import com.bingo.common.request.PraiseRequest;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* @program: PraiseListener
* @description: 點贊消費者監聽
* @author: Bingo
* @create: 2021-11-16 12:29
**/
@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQConstants.PRAISE_TOPIC, consumerGroup = RocketMQConstants.PRAISE_CONSUMER_GROUP)
public class PraiseListener implements RocketMQListener<PraiseRequest> {
@Override
public void onMessage(PraiseRequest request) {
log.info("消費者1消費點贊數據: {}", request);
}
}
程序測試
1、啟動生產者程序,多次調用點贊接口
[exec-1] c.b.p.controller.PraiseController : 點贊請求參數: PraiseRequest(videoId=123, userId=456)
[exec-2] c.b.p.controller.PraiseController : 點贊請求參數: PraiseRequest(videoId=1234, userId=456)
[exec-3] c.b.p.controller.PraiseController : 點贊請求參數: PraiseRequest(videoId=12345, userId=456)
[exec-4] c.b.p.controller.PraiseController : 點贊請求參數: PraiseRequest(videoId=123456, userId=456)
[exec-5] c.b.p.controller.PraiseController : 點贊請求參數: PraiseRequest(videoId=1234567, userId=456)
[exec-6] c.b.p.controller.PraiseController : 點贊請求參數: PraiseRequest(videoId=12345678, userId=456)
!!! 調用點贊接口發送生產消息時,無法連接服務器,報錯如下:
2021-11-16 14:46:03.659 ERROR 4722 --- [io-10000-exec-1] o.a.r.spring.core.RocketMQTemplate : sendOneWay failed. destination:topic-praise, message:GenericMessage [payload=PraiseRequest(videoId=1234567891234567, userId=456), headers={id=eabf0a33-d4ca-0b03-8773-d3642c1570d0, timestamp=1637045160524}]
2021-11-16 14:46:03.676 ERROR 4722 --- [io-10000-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.messaging.MessagingException: Send [1] times, still failed, cost [3085]ms, Topic: topic-praise, BrokersSent: [iZbp1c3u9bhuo790ircffmZ]
See http://rocketmq.apache.org/docs/faq/ for further details.; nested exception is org.apache.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [3085]ms, Topic: topic-praise, BrokersSent: [iZbp1c3u9bhuo790ircffmZ]
See http://rocketmq.apache.org/docs/faq/ for further details.] with root cause
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to 172.28.177.11:10911 failed
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeOneway(NettyRemotingClient.java:554) ~[rocketmq-remoting-4.9.1.jar:4.9.1]
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:471) ~[rocketmq-client-4.9.1.jar:4.9.1]
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:431) ~[rocketmq-client-4.9.1.jar:4.9.1]
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:877) ~[rocketmq-client-4.9.1.jar:4.9.1]
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:607) ~[rocketmq-client-4.9.1.jar:4.9.1]
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendOneway(DefaultMQProducerImpl.java:1028) ~[rocketmq-client-4.9.1.jar:4.9.1]
at org.apache.rocketmq.client.producer.DefaultMQProducer.sendOneway(DefaultMQProducer.java:403) ~[rocketmq-client-4.9.1.jar:4.9.1]
at org.apache.rocketmq.spring.core.RocketMQTemplate.sendOneWay(RocketMQTemplate.java:911) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]
at com.bingo.producer.controller.PraiseController.praise(PraiseController.java:40) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_301]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_301]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_301]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_301]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-5.3.12.jar:5.3.12]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067) ~[spring-webmvc-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) ~[spring-webmvc-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.12.jar:5.3.12]
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) ~[spring-webmvc-5.3.12.jar:5.3.12]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:681) ~[tomcat-embed-core-9.0.54.jar:4.0.FR]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.12.jar:5.3.12]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:764) ~[tomcat-embed-core-9.0.54.jar:4.0.FR]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.54.jar:9.0.54]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.12.jar:5.3.12]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.12.jar:5.3.12]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.12.jar:5.3.12]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.12.jar:5.3.12]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.12.jar:5.3.12]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.12.jar:5.3.12]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) ~[tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:895) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1722) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) [tomcat-embed-core-9.0.54.jar:9.0.54]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.54.jar:9.0.54]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_301]
翻了一圈博客發現需要修改RocketMQ conf/broker.conf文件並在啟動broker的時候指定此配置文件
原文鏈接: https://blog.csdn.net/u014786083/article/details/118714139
修改如下:
# 新增這個配置,IP為 NameServer 地址
brokerIP1=8.136.224.118
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
重新啟動broker
nohup ./bin/mqbroker -n localhost:9876 -c conf/broker.conf > logs/broker.log 2>&1 &
2、啟動消費者程序,消息無序消費
[MessageThread_5] c.b.consumer.listener.PraiseListener : 消費者1消費點贊數據: PraiseRequest(videoId=123456, userId=456)
[MessageThread_3] c.b.consumer.listener.PraiseListener : 消費者1消費點贊數據: PraiseRequest(videoId=1234, userId=456)
[MessageThread_1] c.b.consumer.listener.PraiseListener : 消費者1消費點贊數據: PraiseRequest(videoId=12345, userId=456)
[MessageThread_6] c.b.consumer.listener.PraiseListener : 消費者1消費點贊數據: PraiseRequest(videoId=12345678, userId=456)
[MessageThread_2] c.b.consumer.listener.PraiseListener : 消費者1消費點贊數據: PraiseRequest(videoId=123, userId=456)
[MessageThread_4] c.b.consumer.listener.PraiseListener : 消費者1消費點贊數據: PraiseRequest(videoId=1234567, userId=456)
3、修改IDEA配置啟動多個消費者(修改方式:Edit Configurations... -> Single instance only 前面的勾去掉 -> 修改application.yaml配置文件端口號 即可將同一個程序啟動多個端口)
4、多次調用點贊接口
[exec-8] c.b.p.controller.PraiseController : 點贊請求參數: PraiseRequest(videoId=123456789, userId=456)
[exec-9] c.b.p.controller.PraiseController : 點贊請求參數: PraiseRequest(videoId=1234567891, userId=456)
[exec-10] c.b.p.controller.PraiseController : 點贊請求參數: PraiseRequest(videoId=12345678912, userId=456)
[exec-1] c.b.p.controller.PraiseController : 點贊請求參數: PraiseRequest(videoId=123456789123, userId=456)
[exec-2] c.b.p.controller.PraiseController : 點贊請求參數: PraiseRequest(videoId=1234567891234, userId=456)
[exec-3] c.b.p.controller.PraiseController : 點贊請求參數: PraiseRequest(videoId=12345678912345, userId=456)
[exec-4] c.b.p.controller.PraiseController : 點贊請求參數: PraiseRequest(videoId=123456789123456, userId=456)
[exec-5] c.b.p.controller.PraiseController : 點贊請求參數: PraiseRequest(videoId=1234567891234567, userId=456)
5、消費者10001消費消息(10001、10002、10003指的是端口)
[MessageThread_7] c.b.consumer.listener.PraiseListener : 消費者1消費點贊數據: PraiseRequest(videoId=123456789123, userId=456)
[MessageThread_8] c.b.consumer.listener.PraiseListener : 消費者1消費點贊數據: PraiseRequest(videoId=1234567891234, userId=456)
[MessageThread_9] c.b.consumer.listener.PraiseListener : 消費者1消費點贊數據: PraiseRequest(videoId=12345678912345, userId=456)
[MessageThread_10] c.b.consumer.listener.PraiseListener : 消費者1消費點贊數據: PraiseRequest(videoId=1234567891234567, userId=456)
6、消費者10002消費消息
[MessageThread_1] c.b.consumer.listener.PraiseListener : 消費者1消費點贊數據: PraiseRequest(videoId=1234567891, userId=456)
[MessageThread_2] c.b.consumer.listener.PraiseListener : 消費者1消費點贊數據: PraiseRequest(videoId=123456789123456, userId=456)
7、消費者10003消費消息
[MessageThread_1] c.b.consumer.listener.PraiseListener : 消費者1消費點贊數據: PraiseRequest(videoId=123456789, userId=456)
[MessageThread_2] c.b.consumer.listener.PraiseListener : 消費者1消費點贊數據: PraiseRequest(videoId=12345678912, userId=456)
本章所涉及代碼已上傳giteespringboot-rocketmq-demo
至此 RocketMQ 的單Master環境搭建和簡單應用已經完成,后續將繼續研究它的其他特性,例如:集群環境搭建、消息重試、消息重復處理、順序消息、定時消息、批量發送消息、事務消息、回溯消息以及多種消息過濾方式等等。