1.RocketMQ的下載與配置
到官網選擇想要的版本下載即可,https://rocketmq.apache.org/release_notes/
下載速度會比較慢,這里提供目前最新版本4.9.3的壓縮包,https://wwm.lanzouw.com/ijBoC01bi2yh
壓縮包選擇路徑解壓即可
配置一個環境變量:ROCKETMQ_HOME,變量值為RocketMQ的安裝路徑,並把這個變量配置到path中,類似於JAVA_HOME
建議配置環境變量:NAMESRV_ADDR,變量值為127.0.0.1:9876。不配的話啟動mqbroker之前需要cmd進入RocketMQ安裝目錄下的bin目錄set一下這個變量
在bin目錄下先后啟動mqnamesrv和mqbroker
啟動成功會分別有如下顯示
2.整合
pom.xml中添加依賴,如下所示
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <scope>compile</scope> </dependency>
配置application.yaml,具體如下
server: port: 80 rocketmq: name-server: localhost:9876 producer: group: group_1
編寫controller層
package me.xiaomaju.controller; import me.xiaomaju.service.RocketMQService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/msg") public class RockerMQController { @Autowired private RocketMQService rocketMQService; @RequestMapping("{id}") public void sendMessage(@PathVariable String id){ rocketMQService.sendMessage(id); } }
編寫service層,消息異步發送的邏輯
package me.xiaomaju.service.impl;
import me.xiaomaju.service.RocketMQService;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RocketMQImpl implements RocketMQService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void sendMessage(String id) {
System.out.println("已進入處理隊列,id:" + id);
rocketMQTemplate.asyncSend("id_user", id, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息已成功發送:" + sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("消息發送異常,發送失敗");
}
});
}
}
編寫listener層,消息消費的邏輯
package me.xiaomaju.listener; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "id_user",consumerGroup = "group_1") public class MsgListenter implements RocketMQListener<String> { @Override public void onMessage(String id) { System.out.println("消息已消費,id:"+id); } }
此時服務正常啟動,瀏覽器輸入 http://localhost/msg/xmj0001 回車
控制台打印如下
完畢