1.准備
首先要在本地安裝RocketMQ,安裝教程參考 RocketMQ安裝。
2.創建SpringBoot項目集成RocketMQ
首先創建SpringBoot項目
1)pom引入依賴如下
<dependencies>
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
2)application.properties配置文件如下
server.port=8088 rocketmq.producer.groupName=ProducerGroup rocketmq.producer.namesrvAddr=127.0.0.1:9876 rocketmq.producer.instanceName=ProducerGroup rocketmq.producer.topic=topic2020 rocketmq.producer.tag=test rocketmq.producer.maxMessageSize=131072 rocketmq.producer.sendMsgTimeout=10000 rocketmq.consumer.namesrvAddr=127.0.0.1:9876 rocketmq.consumer.groupName=ConsumerGroup rocketmq.consumer.topic=topic2020 rocketmq.consumer.tag=test rocketmq.consumer.consumeThreadMin=20 rocketmq.consumer.consumeThreadMax=64
3)創建MessageProcessor消息處理接口
package com.springboot.message; import org.apache.rocketmq.common.message.MessageExt; public interface MessageProcessor { boolean handle(MessageExt messageExt); }
4)實現MessageProcessorImpl消息處理類
package com.springboot.message.impl; import com.springboot.message.MessageProcessor; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Service; /** * Description:監聽消息處理類 */ @Service public class MessageProcessorImpl implements MessageProcessor { @Override public boolean handle(MessageExt messageExt) { // 收到的body(消息體),字節類型,需轉為String
String result = new String(messageExt.getBody()); System.out.println("監聽到了消息,消息為:"+ result); return true; } }
5)創建MessageListen消息監聽類
package com.springboot.message; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; /** * Description:監聽類 */ @Component public class MessageListen implements MessageListenerConcurrently { @Autowired private MessageProcessor messageProcessor; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt ext = list.get(0); boolean result = messageProcessor.handle(ext); if (!result) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
6)創建消息生產者RocketMQProducer
package com.springboot.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Description:生產者配置 */ @Configuration @Slf4j public class RocketMQProducer { @Value("${rocketmq.producer.groupName}") private String groupName; @Value("${rocketmq.producer.namesrvAddr}") private String nameserAddr; @Value("${rocketmq.producer.instanceName}") private String instanceName; @Value("${rocketmq.producer.maxMessageSize}") private int maxMessageSize; @Value("${rocketmq.producer.sendMsgTimeout}") private int sendMsgTimeout; @Bean(initMethod = "start", destroyMethod = "shutdown") public DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer = new DefaultMQProducer(groupName); producer.setNamesrvAddr(nameserAddr); producer.setInstanceName(instanceName); producer.setMaxMessageSize(maxMessageSize); producer.setSendMsgTimeout(sendMsgTimeout); producer.setVipChannelEnabled(false); log.info("================>生產者創建完成,ProducerGroupName{}<================", groupName); return producer; } }
(@Bean(initMethod = "start", destroyMethod = "shutdown"),在bean注解中開啟/銷毀生成者)
7)創建消息消費者RocketMQConsumer
package com.springboot.consumer; import com.springboot.message.MessageListen; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Description:消費者配置 */ @Configuration @Slf4j public class RocketMQConsumer { @Autowired private MessageListen messageListen; @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.groupName}") private String groupName; @Value("${rocketmq.consumer.topic}") private String topic; @Value("${rocketmq.consumer.tag}") private String tag; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Bean(initMethod = "start", destroyMethod = "shutdown") public DefaultMQPushConsumer getRocketMQConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.setVipChannelEnabled(false); // 我們自己實現的監聽類 consumer.registerMessageListener(messageListen); try { consumer.subscribe(topic,tag); log.info("================>消費者創建完成,ConsumerGroupName{}<================",groupName); log.info("============>消費者監聽開始,groupName:{},topic:{}<============",groupName,topic); } catch (MQClientException e) { log.error("消費者啟動失敗"); e.printStackTrace(); } return consumer; } }
(@Bean(initMethod = "start", destroyMethod = "shutdown"),在bean注解中開啟/銷毀消費者)
8)創建controller類,RocketMqController
package com.springboot.controller; import com.springboot.consumer.RocketMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.text.SimpleDateFormat; import java.util.Date; /** * Description: */ @RestController public class RocketMqController { @Autowired @Qualifier("rocketMQProducer") RocketMQProducer rocketMQProducer; @GetMapping("/test") public void TestSend() { DefaultMQProducer producer = rocketMQProducer.getRocketMQProducer(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String body = "hi RocketMQ, now is " + sdf.format(new Date()) + "."; Message message = new Message("topic2020", "test", body.getBytes()); try { producer.send(message); } catch (Exception e) { e.printStackTrace(); } } }
3.測試
1)首先啟動本地RocketMQ(啟動參考)
2)啟動主啟動類,在瀏覽器輸入 localhost:8088/test,可以看到消費者已經創建並開始監聽,並且已經消費消息
3)可視化圖看出,在瀏覽器輸入 localhost:7777