如何保證rabbitmq消息零丟失?
我們從三個角色開始分析
1.生產者發送消息不丟失
生產者發消息到rabbitmq的網絡傳輸過程中丟失了
以及消息發送到了rabbitmq但是mq內部出錯,沒有保存
上面的問題有兩種方案
第一種:rabbitmq支持事務消息,通過開啟事務->發送消息->異常捕獲並回滾->發送成功提交事務的方式保證消息發送mq成功, 但是有個弊端,這種方式是同步的,會導致消息的吞吐量下降,一般不使用這種方式
第二種:rabbitmq的channel開啟confirm,其實就是回調機制,發送完消息后不用管,讓rabbitmq通知你消息是發送成功還是失敗,這種方式是異步的,對消息的吞吐量沒什么影響,主要使用這種方法.
2.rabbitmq消息保存失敗
rabbitmq接收到消息之后暫存在內存之中,如果在消費者還沒有消費的時候,消息還在內存中,rabbitmq宕機了,那么內存中的消息就會丟失
一種方案:
rabbitmq對queue設置持久化,就是寫一條消息就直接存儲到磁盤上
3.消費者消費消息失敗
消費者默認是autoAck,就是收到消息就自動提交ack,這就導致消息還沒處理完,消費者宕機了,那么正在處理的消息就丟失了,恢復了之后,消費者會拉取新的消息
一種方案:
消費者關閉自動提交,改為手動ack,等消息全部處理完畢再提交ack,通知rabbitmq消息處理完畢,再發新的消息過來;
下面是整合spring的rabbitmq生產者代碼實現:由於rabbitmq的隊列持久化設置在管理平台就可以操作,消費者設置手動提交也比較簡單,主要貼上生產者的代碼實現
配置發送模板
package cn.picclife.cust.rrd.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; /** * rabbitmq 配置 * @ClassName: RabbitMqConfig * @Description: 初始化Rabbitmq * @author bin.zhao */ @Configuration public class RabbitMqConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.virtual-host}") private String vHost; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.queue}") private String queue; @Value("${spring.rabbitmq.exchange}") private String exchange; @Value("${spring.rabbitmq.routing.key}") private String rountingKey; /** * 針對消費者配置 * 1. 設置交換機類型 * 2. 將隊列綁定到交換機 */ //創建隊列,如果已創建好,就不用寫 // @Bean // public Queue topicQueue(){ // return new Queue(this.queue,true);//創建隊列並持久化 // } // //創建交換機 // @Bean // public TopicExchange topicExchange(){ // return new TopicExchange(this.exchange,true,false); // } // // //創建綁定 // @Bean // public Binding topicBinding(){ // return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(this.rountingKey); // } // @Bean(name = "MQConnectionFactory") public CachingConnectionFactory connectionFactory() throws Exception { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setUsername(this.username); connectionFactory.setPassword(this.password); connectionFactory.setAddresses(this.host); connectionFactory.setVirtualHost(this.vHost); //消息是否投遞到exchange成功 connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Scope//默認單例模式 @Bean(name = "groupRabbitTemplate") public RabbitTemplate rabbitTemplate( //直接使用注解,把連接工廠注入到模板中,防止找不到,導致發消息到mq失敗 @Qualifier("MQConnectionFactory") CachingConnectionFactory connectionFactory) throws Exception { RabbitTemplate template = new RabbitTemplate(connectionFactory); return template; } }
發送消息
/**
* @description 發送消息 * @date 2020 */ @Component public class AppRabbitMQ implements RabbitTemplate.ReturnCallback,RabbitTemplate.ConfirmCallback {private static final String EXCHANGE = ResourceUtils.getResource("config").getValue("spring.rabbitmq.exchange"); private static final String ROUTING_KEY = ResourceUtils.getResource("config").getValue("spring.rabbitmq.routing.key"); @Autowired private RabbitTemplate groupRabbitTemplate;/** *發送消息*/ public void sendMessage(String message) throws Exception { //設置由於網絡問題導致的連接Rabbitmq失敗的重試策略 RetryTemplate retryTemplate = new RetryTemplate(); retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3)); //發送之前可以先把消息保存到數據庫 groupRabbitTemplate.setEncoding("UTF-8");
groupRabblitTemplate.setMandatory(true); //true當消息無法被正常送達的時候被返回給生產者,false丟棄 groupRabbitTemplate.setConfirmCallback(this);//ack回調 groupRabbitTemplate.setReturnCallback(this);//回退回調 try {//消息發送帶上correlationData這個對象中保存有消息的唯一id,以便在數據庫中查找消息或者從緩存中獲取消息為了發送失敗從新發送. groupRabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY, JSONObject.toJSONString(message), correlationData); logger.info("消息發送,id:{}",correlationData.getId()); Thread.sleep(100);//不讓線程直接結束,等待回調函數confirm,如果不等,會直接異常,因為rabbitmq找不到回調方法 }catch (Exception e){ logger.error("發送消息失敗:{}",ExceptionUtils.getStackTrace(e));
//可以重試發送消息,我這里直接保存到數據庫,后續定時任務掃描表格進行補發
//記錄失敗消息到失敗數據表,並且更新消息表狀態為發送失敗 }finally { message=null;//強引用設置為null,便於gc回收 } }
/** @Description: 用於定時任務的消息發送
* @param:
* @date: 2020
* @return: void
*/
public void sendMessageOfTimeTask(String message){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
CorrelationData correlationData = new CorrelationData(message.getId());
groupRabbitTemplate.setEncoding("UTF-8");
//true當消息無法被正常送達的時候被返回給生產者,false丟棄
groupRabbitTemplate.setMandatory(true);//設置手工ack確認,
groupRabbitTemplate.setConfirmCallback(this);//ack回調
groupRabbitTemplate.setReturnCallback(this);//回退回調
groupRabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,JSONObject.toJSONString(message),correlationData);
try {
Thread.sleep(100);//線程休眠,為了不讓方法直接結束,回調函數無法正常回調confirm方法
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
message=null;//強引用設置為null,便於gc回收
}
}
/** * 如果消息沒有到exchange,則confirm回調,ack=false * 如果消息到達exchange,則confirm回調,ack=true */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info("消息回調confirm函數:{},ack:{},cause:{}", JSONObject.toJSONString(correlationData),ack,cause); if (ack) { //消費成功更新數據庫記錄為已發送狀態 } else { logger.info("推送消息失敗,id:{},原因:{}",correlationData.getId(),cause); //記錄失敗消息到失敗數據表,並且更新消息表狀態為發送失敗 } } /** * exchange到queue成功,則不回調return * exchange到queue失敗,則回調return(需設置mandatory=true,否則不回回調,消息就丟了) */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey); //記錄失敗的消息id,更新數據庫失敗表 String messgage = new String(message.getBody()); } }