本文只針對springboot整合rabbitmq的消息防丟失,話不多說,上干貨....
設置發送mq消息不丟失實現思路
第一步,要對隊列,消息以及交換機進行持久化操作(保存到物理磁盤中)
因為mq消息默認是保存在內存中
交換機我們在聲明的時候可以進行持久化
@Bean(EX_BUYING_ADDPOINTUSER)
public Exchange EX_BUYING_ADDPOINTUSER(){
return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTUSER).durable(true).build();
}
解析:
durable(true)表示對當前交換進行持久化
隊列持久化
@Bean
public Queue queue(){
return new Queue(ORDER_TACK);
}
解析:
當前new的過程中如果只有一個參數則表示默認的就是已經持久化了
源碼:
public Queue(String name) {
this(name, true, false, false);
}
注意:
消息持久化,不需要設置的,我們的消息是保存在隊列中,隊列如果說是持久化的,那么我們的消息就是持久化的。
confirm機制
confirm模式需要基於channel進行設置, 一旦某條消息被投遞到隊列之后,消息隊列就會發送一個確認信息給生產者,如果隊列與消息是可持久化的, 那么確認消息會等到消息成功寫入到磁盤之后發出.
confirm的性能高,主要得益於它是異步的.生產者在將第一條消息發出之后等待確認消息的同時也可以繼續發送后續的消息.當確認消息到達之后,就可以通過回調方法處理這條確認消息. 如果MQ服務宕機了,則會返回nack消息. 生產者同樣在回調方法中進行后續處理。
思路是把要發送的消息先放一份到redis中 ,當消息發到了交換機exchange中完成就回返回ack為true,那么就完成發送刪除redis的消息,否則就從redis取出消息再次發送.直到發送成功....
//增強rabbitmq,代替原來發送消息的一個類,可以防止丟失數據
@Component
public class ConfirmMessageSender implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate redisTemplate;
public static final String MESSAGE_CONFIRM_KEY="message_confirm_";
//有參構造
public ConfirmMessageSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
//rabbitmq設置提交回調
rabbitTemplate.setConfirmCallback(this);
}
//接收消息服務器返回的通知的 ,成功的通知和失敗通知,第二步
/**
*
* @param correlationData 用來保證消息的唯一
* @param ack 應答 true表示成功的通知,false失敗的通知
* @param cause 原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
//成功通知,表示我們數據已經發送成功並且持久化到了磁盤中
//刪除redis中的相關數據
redisTemplate.delete(correlationData.getId());
redisTemplate.delete(MESSAGE_CONFIRM_KEY+correlationData.getId());
}else{
//失敗通知 ,沒有持久化到磁盤中
//從redis中獲取剛才的消息內容
Map<String,String> map = (Map<String,String>)redisTemplate.opsForHash().entries(MESSAGE_CONFIRM_KEY+correlationData.getId());
//重新發送
String exchange = map.get("exchange");
String routingkey = map.get("routingKey");
String message = map.get("message");
//再次發送,直到持久化到磁盤為止,每次發送都帶着唯一標識
rabbitTemplate.convertAndSend(exchange,routingkey, JSON.toJSONString(message),correlationData);
}
}
//自定義消息發送方法,第一步,當執行該方法的之后,去調用confirm
public void sendMessage(String exchange,String routingKey,String message){
/**
* 重點是 CorrelationData 對象,每個發送的消息都需要配備一個 CorrelationData 相關數據對象,CorrelationData 對象內部只有一個 id 屬性,用來表示當前消息唯一性。
*/
//設置消息的唯一標識並存入到redis中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//把消息緩存到redis中 ,目的是備份
redisTemplate.opsForValue().set(correlationData.getId(),message);
//將本次發送消息的相關元數據保存到redis中
Map<String,String> map = new HashMap<>();
map.put("exchange",exchange);
map.put("routingKey",routingKey);
map.put("message",message);
//把元數據緩存到redis中,保證消息的唯一
redisTemplate.opsForHash().putAll(MESSAGE_CONFIRM_KEY+correlationData.getId(),map);
//攜帶着本次消息的唯一標識,進行數據發送
rabbitTemplate.convertAndSend(exchange,routingKey,message,correlationData);
}
}
獲取獲取mq中的消息不丟失?
在application.yml中設置手動應答:
rabbitmq:
host: 192.168.200.128
listener:
simple:
acknowledge-mode: manual #手動
在監聽類中設置手動應答:
簡單來說就是當業務邏輯處理沒問題就執行channel.basicAck的方法,來返回消費完成,如果出現問題了 就執行channel.basicNack的方法,消息會回到原有的隊列,重新的發送,一直到消息發送業務邏輯執行成功
@Component
public class ConsumeListener {
@RabbitListener(queues = RabbitMQConfig.SECKILL_ORDER_KEY)
public void receiveSecKillOrderMessage(Channel channel, Message message){
try {
.....邏輯處理........
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
e.printStackTrace();
//返回失敗通知
//第一個boolean true所有消費者都會拒絕這個消息,false代表只有當前消費者拒絕
//第二個boolean false當前消息會進入到死信隊列,true重新回到原有隊列中,默認回到頭部
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
流量削峰:保證在同一時間內流量進行削弱,然后放行過來
在監聽類中設置:
channel.basicQos(300);
這個300是官方給出的值,代表每次在mq中抓取300個消息消費,
太大會影響服務器的性能,太小就回浪費
希望對大家有幫助...