SpringBoot整合RocketMQ


1.RocketMQ的下載與配置

到官網選擇想要的版本下載即可,https://rocketmq.apache.org/release_notes/

下載速度會比較慢,這里提供目前最新版本4.9.3的壓縮包,https://wwm.lanzouw.com/ijBoC01bi2yh


壓縮包選擇路徑解壓即可

配置一個環境變量:ROCKETMQ_HOME,變量值為RocketMQ的安裝路徑,並把這個變量配置到path中,類似於JAVA_HOME

建議配置環境變量:NAMESRV_ADDR,變量值為127.0.0.1:9876。不配的話啟動mqbroker之前需要cmd進入RocketMQ安裝目錄下的bin目錄set一下這個變量

在bin目錄下先后啟動mqnamesrv和mqbroker

啟動成功會分別有如下顯示


 2.整合

pom.xml中添加依賴,如下所示

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.1</version>
        </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <scope>compile</scope> </dependency>

配置application.yaml,具體如下

server:
  port: 80

rocketmq:
  name-server: localhost:9876
  producer:
    group: group_1

編寫controller層

package me.xiaomaju.controller;

import me.xiaomaju.service.RocketMQService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/msg")
public class RockerMQController {

    @Autowired
    private RocketMQService rocketMQService;

    @RequestMapping("{id}")
    public void sendMessage(@PathVariable  String id){
        rocketMQService.sendMessage(id);
    }

}

編寫service層,消息異步發送的邏輯

package me.xiaomaju.service.impl;

import me.xiaomaju.service.RocketMQService;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RocketMQImpl implements RocketMQService {

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Override
public void sendMessage(String id) {
System.out.println("已進入處理隊列,id:" + id);
rocketMQTemplate.asyncSend("id_user", id, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息已成功發送:" + sendResult);
}

@Override
public void onException(Throwable throwable) {
System.out.println("消息發送異常,發送失敗");
}
});
}
}

編寫listener層,消息消費的邏輯

package me.xiaomaju.listener;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "id_user",consumerGroup = "group_1")
public class MsgListenter implements RocketMQListener<String> {

    @Override
    public void onMessage(String id) {
        System.out.println("消息已消費,id:"+id);
    }
}

此時服務正常啟動,瀏覽器輸入 http://localhost/msg/xmj0001 回車

控制台打印如下

完畢


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM