Apache RocketMQ(官網地址:http://rocketmq.apache.org)是由阿里巴巴集團開源的大型消息隊列,現在已經貢獻給了Apache開源基金會,同時是一個分布式消息傳遞和流媒體平台,具有低延遲、高性能、可靠性、萬億級容量和靈活的可擴展性。(Github官網地址:https://github.com/apache/rocketmq)
1. 加入RocketMQ依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.2.0</version> </dependency>
2. 配置RocketMQ服務信息
xc:
rocketmq:
consumer:
PushConsumer: PushConsumer
producer:
producerGroup: Producer
namesrvAddr: 172.19.25.168:9876
3. 編寫生產者和消費者
這里以發送10條簡單消息為例,創建一個生產者,這里使用的是默認生產者DefaultMQProducer,在構建生產者的時候使用構造方法設置生產者的組名。使用setNamesrvAddr()方法設置NameServer,如果有多個NameServer,就使用逗號分隔。這里需要注意一點,生產者對象只調用一次start方法即可,不需要每次都調用。在構建消息體時設置topic和tags。
@Component public class RocketMQSender { @Value("${xc.rocketmq.producer.producerGroup}") private String producerGroup; @Value("${xc.rocketmq.namesrvAddr}") private String namesrvAddr; private static final Logger log = LoggerFactory.getLogger(RocketMQSender.class); public void defaultMQProducer() { DefaultMQProducer producer = new DefaultMQProducer(producerGroup); producer.setVipChannelEnabled(false); producer.setNamesrvAddr(namesrvAddr); try { producer.start(); Message message = new Message("TopicTest", "push", "【發送消息】".getBytes()); StopWatch stop = new StopWatch(); stop.start(); for (int i = 0; i < 10; i++) { SendResult result = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 1); log.info("發送響應:MsgId:" + result.getMsgId() + ",發送狀態:" + result.getSendStatus()); } stop.stop(); log.info("----------------發送十條消息耗時:" + stop.getTotalTimeMillis()); } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
接下來編寫一個消費者,其中的設置與生產者類似
@Component public class RocketMQReceiver { @Value("${xc.rocketmq.consumer.PushConsumer}") private String consumerGroup; @Value("${xc.rocketmq.namesrvAddr}") private String namesrvAddr; private static final Logger log = LoggerFactory.getLogger(RocketMQReceiver.class); //@PostContruct是spring框架的注解,在方法上加該注解會在項目啟動的時候執行該方法,也可以理解為在spring容器初始化的時候執行該方法。 @PostConstruct public void defaultMQPushConsumer() { //消費者的組名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); //指定NameServer地址,多個地址以 ; 隔開 consumer.setNamesrvAddr(namesrvAddr); try { //訂閱PushTopic下Tag為push的消息 consumer.subscribe("TopicTest", "push"); //設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 //如果非第一次啟動,那么按照上次消費的位置繼續消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> { try { for (MessageExt messageExt : list) { //輸出消息內容 log.info("messageExt: " + messageExt); String messageBody = new String(messageExt.getBody()); //輸出消息內容 log.info("【defaultMQPushConsumer消費響應】:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody); } } catch (Exception e) { e.printStackTrace(); //稍后再試 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } //消費成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } //@PostContruct是spring框架的注解,在方法上加該注解會在項目啟動的時候執行該方法,也可以理解為在spring容器初始化的時候執行該方法。 @PostConstruct public void defaultMQPushConsumer2() { //消費者的組名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("aaa"); //指定NameServer地址,多個地址以 ; 隔開 consumer.setNamesrvAddr(namesrvAddr); try { //訂閱PushTopic下Tag為push的消息 consumer.subscribe("TopicTest", "push"); //設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 //如果非第一次啟動,那么按照上次消費的位置繼續消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> { try { for (MessageExt messageExt : list) { //輸出消息內容 log.info("---- messageExt: " + messageExt); String messageBody = new String(messageExt.getBody()); //輸出消息內容 log.info("----【defaultMQPushConsumer2消費響應】:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody); } } catch (Exception e) { e.printStackTrace(); //稍后再試 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } //消費成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }
最后編寫一個RocketMQController類來調用生產者發送消息
@RestController public class RocketMQController { @Autowired private RocketMQSender rocketMQSender; @GetMapping("testRocketmq") public void testRocketmq() { rocketMQSender.defaultMQProducer(); } }
啟動項目后,在瀏覽器中訪問http://localhost:8080/testRocketmq
文章來源:Spring Boot 2實戰之旅 9.3 RocketMQ消息隊列
源碼:https://gitee.com/caoyeoo0/xc-springboot/tree/mq%2FRocketMQ/