1.准备
首先要在本地安装RocketMQ,安装教程参考 RocketMQ安装。
2.创建SpringBoot项目集成RocketMQ
首先创建SpringBoot项目
1)pom引入依赖如下
<dependencies>
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
2)application.properties配置文件如下
server.port=8088 rocketmq.producer.groupName=ProducerGroup rocketmq.producer.namesrvAddr=127.0.0.1:9876 rocketmq.producer.instanceName=ProducerGroup rocketmq.producer.topic=topic2020 rocketmq.producer.tag=test rocketmq.producer.maxMessageSize=131072 rocketmq.producer.sendMsgTimeout=10000 rocketmq.consumer.namesrvAddr=127.0.0.1:9876 rocketmq.consumer.groupName=ConsumerGroup rocketmq.consumer.topic=topic2020 rocketmq.consumer.tag=test rocketmq.consumer.consumeThreadMin=20 rocketmq.consumer.consumeThreadMax=64
3)创建MessageProcessor消息处理接口
package com.springboot.message; import org.apache.rocketmq.common.message.MessageExt; public interface MessageProcessor { boolean handle(MessageExt messageExt); }
4)实现MessageProcessorImpl消息处理类
package com.springboot.message.impl; import com.springboot.message.MessageProcessor; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Service; /** * Description:监听消息处理类 */ @Service public class MessageProcessorImpl implements MessageProcessor { @Override public boolean handle(MessageExt messageExt) { // 收到的body(消息体),字节类型,需转为String
String result = new String(messageExt.getBody()); System.out.println("监听到了消息,消息为:"+ result); return true; } }
5)创建MessageListen消息监听类
package com.springboot.message; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; /** * Description:监听类 */ @Component public class MessageListen implements MessageListenerConcurrently { @Autowired private MessageProcessor messageProcessor; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt ext = list.get(0); boolean result = messageProcessor.handle(ext); if (!result) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
6)创建消息生产者RocketMQProducer
package com.springboot.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Description:生产者配置 */ @Configuration @Slf4j public class RocketMQProducer { @Value("${rocketmq.producer.groupName}") private String groupName; @Value("${rocketmq.producer.namesrvAddr}") private String nameserAddr; @Value("${rocketmq.producer.instanceName}") private String instanceName; @Value("${rocketmq.producer.maxMessageSize}") private int maxMessageSize; @Value("${rocketmq.producer.sendMsgTimeout}") private int sendMsgTimeout; @Bean(initMethod = "start", destroyMethod = "shutdown") public DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer = new DefaultMQProducer(groupName); producer.setNamesrvAddr(nameserAddr); producer.setInstanceName(instanceName); producer.setMaxMessageSize(maxMessageSize); producer.setSendMsgTimeout(sendMsgTimeout); producer.setVipChannelEnabled(false); log.info("================>生产者创建完成,ProducerGroupName{}<================", groupName); return producer; } }
(@Bean(initMethod = "start", destroyMethod = "shutdown"),在bean注解中开启/销毁生成者)
7)创建消息消费者RocketMQConsumer
package com.springboot.consumer; import com.springboot.message.MessageListen; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Description:消费者配置 */ @Configuration @Slf4j public class RocketMQConsumer { @Autowired private MessageListen messageListen; @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.groupName}") private String groupName; @Value("${rocketmq.consumer.topic}") private String topic; @Value("${rocketmq.consumer.tag}") private String tag; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Bean(initMethod = "start", destroyMethod = "shutdown") public DefaultMQPushConsumer getRocketMQConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.setVipChannelEnabled(false); // 我们自己实现的监听类 consumer.registerMessageListener(messageListen); try { consumer.subscribe(topic,tag); log.info("================>消费者创建完成,ConsumerGroupName{}<================",groupName); log.info("============>消费者监听开始,groupName:{},topic:{}<============",groupName,topic); } catch (MQClientException e) { log.error("消费者启动失败"); e.printStackTrace(); } return consumer; } }
(@Bean(initMethod = "start", destroyMethod = "shutdown"),在bean注解中开启/销毁消费者)
8)创建controller类,RocketMqController
package com.springboot.controller; import com.springboot.consumer.RocketMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.text.SimpleDateFormat; import java.util.Date; /** * Description: */ @RestController public class RocketMqController { @Autowired @Qualifier("rocketMQProducer") RocketMQProducer rocketMQProducer; @GetMapping("/test") public void TestSend() { DefaultMQProducer producer = rocketMQProducer.getRocketMQProducer(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String body = "hi RocketMQ, now is " + sdf.format(new Date()) + "."; Message message = new Message("topic2020", "test", body.getBytes()); try { producer.send(message); } catch (Exception e) { e.printStackTrace(); } } }
3.测试
1)首先启动本地RocketMQ(启动参考)
2)启动主启动类,在浏览器输入 localhost:8088/test,可以看到消费者已经创建并开始监听,并且已经消费消息
3)可视化图看出,在浏览器输入 localhost:7777