RocketMQ簡單學習和使用


RocketMQ發送消息的三種方式:

同步發送(sync

在消息完全發送完成之后才返回結果,此方式存在需要同步等待發送結果的時間代價。具有內部重試機制,即在主動聲明本次消息發送失敗之前,內部實現將重試一定次數,默認為2次。

rocketMQTemplate.syncSend("topic-name", "send sync message !");

 

異步發送(async

消息發送后立刻返回,當消息完全完成發送后,會調用回調函數sendCallback來告知發送者本次發送是成功或者失敗。異步模式通常用於響應時間敏感業務場景,即承受不了同步發送消息時等待返回的耗時代價。和同步發送一樣具有內部重試機制。

rocketMQTemplate.asyncSend("topic-name", "send async message!", new SendCallback();

 

直接發送 one-way

發送端發送完消息后會立即返回,不會等待來自brokerack來告知本次消息發送是否完全完成發送。吞吐量很大,存在消息丟失的風險,所以其適用於不重要的消息發送,比如日志收集。

rocketMQTemplate.sendOneWay("topic-name", "message");

 

RocketMQ使用

 

發送消息代碼:

 

@Service

 

public class RocketMqService {

 

@Resource

 

private RocketMQTemplate rocketMQTemplate;

 

// 指定topic的同時,設置tag值,以便消費端可以根據tag值進行選擇性消費

 

SubscribeMessage msg = new SubscriberMessage();

 

rocketMQTemplate.syncSend(msg.getTopic()+ ":" +msg.getTags(), msg);

 

}

 

 

 

監聽消息代碼:

 

@Service

 

@RocketMQMessageListener(topic = "${rocketmq.topic}",

 

consumerGroup = "${rocketmq.group}", selectorExpression="${rocketmq. selectorExpression}")

 

public class CcuMessageListener implements RocketMQListener<CcuMessage> {

 

@Override

 

public void onMessage(CcuMessage message) {

 

String msgBody = new String(message.getBody());

 

}

 

}

 

RocketMQMessageListener注解默認selectorExpression*,表示接收當前Topic下的所有數據。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM