SpringBoot快速集成RocketMQ


RocketMQ總結整理

https://blog.csdn.net/asdf08442a/article/details/54882769

SpringBoot集成RocketMQ

依賴

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.0</version>
        </dependency>

yml

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: my-group

produce

@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringRocketMQ {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void send() {
        /**
         * 同步發送 sync
         * 發送消息采用同步模式,這種方式只有在消息完全發送完成之后才返回結果,此方式存在需要同步等待發送結果的時間代價。
         * 這種方式具有內部重試機制,即在主動聲明本次消息發送失敗之前,內部實現將重試一定次數,默認為2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。
         * 發送的結果存在同一個消息可能被多次發送給給broker,這里需要應用的開發者自己在消費端處理冪等性問題。
         */
        SendResult syncSend = rocketMQTemplate.syncSend("my-topic", "syncSend");
        System.out.println(syncSend);
        /**
         * 異步發送 async
         * 發送消息采用異步發送模式,消息發送后立刻返回,當消息完全完成發送后,會調用回調函數sendCallback來告知發送者本次發送是成功或者失敗。
         * 異步模式通常用於響應時間敏感業務場景,即承受不了同步發送消息時等待返回的耗時代價。同同步發送一樣,異步模式也在內部實現了重試機制,
         * 默認次數為2次(DefaultMQProducer#getRetryTimesWhenSendAsyncFailed})。發送的結果同樣存在同一個消息可能被多次發送給給broker,需要應用的開發者自己在消費端處理冪等性問題。
         */
        rocketMQTemplate.asyncSend("my-topic", "syncSend", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("send successful");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("send fail; {}" + throwable.getMessage());
            }
        });

        /**
         * 直接發送 one-way
         * 采用one-way發送模式發送消息的時候,發送端發送完消息后會立即返回,不會等待來自broker的ack來告知本次消息發送是否完全完成發送。
         * 這種方式吞吐量很大,但是存在消息丟失的風險,所以其適用於不重要的消息發送,比如日志收集
         */
        rocketMQTemplate.sendOneWay("my-topic", "sendOneWay");

        rocketMQTemplate.convertAndSend("my-topic", "convertAndSend");
    }
}

consume

@Component
@RocketMQMessageListener(
        topic = "my-topic",
        consumerGroup = "my-group",
        selectorExpression = "*"
)
public class SpringConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        System.out.println("接收到消息 -> " + msg);
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM