RocketMQ發送消息的三種方式:
同步發送(sync)
在消息完全發送完成之后才返回結果,此方式存在需要同步等待發送結果的時間代價。具有內部重試機制,即在主動聲明本次消息發送失敗之前,內部實現將重試一定次數,默認為2次。
rocketMQTemplate.syncSend("topic-name", "send sync message !");
異步發送(async)
消息發送后立刻返回,當消息完全完成發送后,會調用回調函數sendCallback來告知發送者本次發送是成功或者失敗。異步模式通常用於響應時間敏感業務場景,即承受不了同步發送消息時等待返回的耗時代價。和同步發送一樣具有內部重試機制。
rocketMQTemplate.asyncSend("topic-name", "send async message!", new SendCallback();
直接發送 (one-way)
發送端發送完消息后會立即返回,不會等待來自broker的ack來告知本次消息發送是否完全完成發送。吞吐量很大,存在消息丟失的風險,所以其適用於不重要的消息發送,比如日志收集。
rocketMQTemplate.sendOneWay("topic-name", "message");
RocketMQ使用:
發送消息代碼:
@Service
public class RocketMqService {
@Resource
private RocketMQTemplate rocketMQTemplate;
// 指定topic的同時,設置tag值,以便消費端可以根據tag值進行選擇性消費
SubscribeMessage msg = new SubscriberMessage();
rocketMQTemplate.syncSend(msg.getTopic()+ ":" +msg.getTags(), msg);
}
監聽消息代碼:
@Service
@RocketMQMessageListener(topic = "${rocketmq.topic}",
consumerGroup = "${rocketmq.group}", selectorExpression="${rocketmq. selectorExpression}")
public class CcuMessageListener implements RocketMQListener<CcuMessage> {
@Override
public void onMessage(CcuMessage message) {
String msgBody = new String(message.getBody());
}
}
RocketMQMessageListener注解默認selectorExpression為*,表示接收當前Topic下的所有數據。