RocketMQ總結整理
https://blog.csdn.net/asdf08442a/article/details/54882769
SpringBoot集成RocketMQ
依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
yml
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: my-group
produce
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringRocketMQ {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void send() {
/**
* 同步發送 sync
* 發送消息采用同步模式,這種方式只有在消息完全發送完成之后才返回結果,此方式存在需要同步等待發送結果的時間代價。
* 這種方式具有內部重試機制,即在主動聲明本次消息發送失敗之前,內部實現將重試一定次數,默認為2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。
* 發送的結果存在同一個消息可能被多次發送給給broker,這里需要應用的開發者自己在消費端處理冪等性問題。
*/
SendResult syncSend = rocketMQTemplate.syncSend("my-topic", "syncSend");
System.out.println(syncSend);
/**
* 異步發送 async
* 發送消息采用異步發送模式,消息發送后立刻返回,當消息完全完成發送后,會調用回調函數sendCallback來告知發送者本次發送是成功或者失敗。
* 異步模式通常用於響應時間敏感業務場景,即承受不了同步發送消息時等待返回的耗時代價。同同步發送一樣,異步模式也在內部實現了重試機制,
* 默認次數為2次(DefaultMQProducer#getRetryTimesWhenSendAsyncFailed})。發送的結果同樣存在同一個消息可能被多次發送給給broker,需要應用的開發者自己在消費端處理冪等性問題。
*/
rocketMQTemplate.asyncSend("my-topic", "syncSend", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("send successful");
}
@Override
public void onException(Throwable throwable) {
System.out.println("send fail; {}" + throwable.getMessage());
}
});
/**
* 直接發送 one-way
* 采用one-way發送模式發送消息的時候,發送端發送完消息后會立即返回,不會等待來自broker的ack來告知本次消息發送是否完全完成發送。
* 這種方式吞吐量很大,但是存在消息丟失的風險,所以其適用於不重要的消息發送,比如日志收集
*/
rocketMQTemplate.sendOneWay("my-topic", "sendOneWay");
rocketMQTemplate.convertAndSend("my-topic", "convertAndSend");
}
}
consume
@Component
@RocketMQMessageListener(
topic = "my-topic",
consumerGroup = "my-group",
selectorExpression = "*"
)
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("接收到消息 -> " + msg);
}
}