使用RocketMQ 發送簡單消息


Apache RocketMQ(官網地址:http://rocketmq.apache.org)是由阿里巴巴集團開源的大型消息隊列,現在已經貢獻給了Apache開源基金會,同時是一個分布式消息傳遞和流媒體平台,具有低延遲、高性能、可靠性、萬億級容量和靈活的可擴展性。(Github官網地址:https://github.com/apache/rocketmq)

 

1. 加入RocketMQ依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.2.0</version>
</dependency>

2. 配置RocketMQ服務信息

xc:
  rocketmq:
    consumer:
      PushConsumer: PushConsumer
    producer:
      producerGroup: Producer
    namesrvAddr: 172.19.25.168:9876

3. 編寫生產者和消費者

這里以發送10條簡單消息為例,創建一個生產者,這里使用的是默認生產者DefaultMQProducer,在構建生產者的時候使用構造方法設置生產者的組名。使用setNamesrvAddr()方法設置NameServer,如果有多個NameServer,就使用逗號分隔。這里需要注意一點,生產者對象只調用一次start方法即可,不需要每次都調用。在構建消息體時設置topic和tags。

@Component
public class RocketMQSender {
    @Value("${xc.rocketmq.producer.producerGroup}")
    private String producerGroup;
    @Value("${xc.rocketmq.namesrvAddr}")
    private String namesrvAddr;
    private static final Logger log = LoggerFactory.getLogger(RocketMQSender.class);

    public void defaultMQProducer() {
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setVipChannelEnabled(false);
        producer.setNamesrvAddr(namesrvAddr);
        try {
            producer.start();
            Message message = new Message("TopicTest", "push", "【發送消息】".getBytes());
            StopWatch stop = new StopWatch();
            stop.start();
            for (int i = 0; i < 10; i++) {
                SendResult result = producer.send(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, 1);
                log.info("發送響應:MsgId:" + result.getMsgId() + ",發送狀態:" + result.getSendStatus());
            }
            stop.stop();
            log.info("----------------發送十條消息耗時:" + stop.getTotalTimeMillis());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }

}

接下來編寫一個消費者,其中的設置與生產者類似

@Component
public class RocketMQReceiver {
    @Value("${xc.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;
    @Value("${xc.rocketmq.namesrvAddr}")
    private String namesrvAddr;
    private static final Logger log = LoggerFactory.getLogger(RocketMQReceiver.class);

    //@PostContruct是spring框架的注解,在方法上加該注解會在項目啟動的時候執行該方法,也可以理解為在spring容器初始化的時候執行該方法。
    @PostConstruct
    public void defaultMQPushConsumer() {
        //消費者的組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

        //指定NameServer地址,多個地址以 ; 隔開
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            //訂閱PushTopic下Tag為push的消息
            consumer.subscribe("TopicTest", "push");

            //設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
            //如果非第一次啟動,那么按照上次消費的位置繼續消費
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {
                        //輸出消息內容
                        log.info("messageExt: " + messageExt);
                        String messageBody = new String(messageExt.getBody());
                        //輸出消息內容
                        log.info("【defaultMQPushConsumer消費響應】:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    //稍后再試
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                //消費成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //@PostContruct是spring框架的注解,在方法上加該注解會在項目啟動的時候執行該方法,也可以理解為在spring容器初始化的時候執行該方法。
    @PostConstruct
    public void defaultMQPushConsumer2() {
        //消費者的組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("aaa");

        //指定NameServer地址,多個地址以 ; 隔開
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            //訂閱PushTopic下Tag為push的消息
            consumer.subscribe("TopicTest", "push");

            //設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
            //如果非第一次啟動,那么按照上次消費的位置繼續消費
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {
                        //輸出消息內容
                        log.info("---- messageExt: " + messageExt);
                        String messageBody = new String(messageExt.getBody());
                        //輸出消息內容
                        log.info("----【defaultMQPushConsumer2消費響應】:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    //稍后再試
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                //消費成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

最后編寫一個RocketMQController類來調用生產者發送消息

@RestController
public class RocketMQController {

    @Autowired
    private RocketMQSender rocketMQSender;

    @GetMapping("testRocketmq")
    public void testRocketmq() {
        rocketMQSender.defaultMQProducer();
    }

}

啟動項目后,在瀏覽器中訪問http://localhost:8080/testRocketmq

 

文章來源:Spring Boot 2實戰之旅 9.3 RocketMQ消息隊列

源碼:https://gitee.com/caoyeoo0/xc-springboot/tree/mq%2FRocketMQ/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM