引言
RabbitMQ的模型是生產者發送信息到 Broker (代理),消費者從 Broker 中取出信息。但是生產者怎么知道消息是否真的發送到 Broker 中了呢?Broker 又怎么知道消息到底有沒有被消費者消費?
如果由於網絡原因出現故障,生產者生產的消息未到達 Broker 或者 Broker 的消息被虛假消費,而它們又不知道,就會產生很嚴重的問題,如重復消費等。
RabbitMQ的消息確認流程

從圖中可以看出:
消息確認機制分為生產者確認和消費者確認
- ConfirmCallback 生產者
- ReturnCallback 生產者
- ACK 消費者
生產者確認
- 消息到達RabbitMQ的Exchange:Exchange向生產者發送Confirm確認。成功抑或失敗都會返回一個confirmCallback
- 消息成功達到Exchange,但是從Exchange投遞Queue失敗:向生產者返回一個returnCallback。只有失敗才會返回
消費者確認
- 消費者收到消息后需要對 RabbitMQ Server 進行消息 ACK 確認,RabbitMQ 根據確認信息決定是刪除隊列中的該信息還是重新發送
代碼實現
生產者確認
重點在於生產者重寫下面兩個方法
-
rabbitMQTemplate.setConfirmCallback
-
rabbitMQTemplate.setReturnCallback
-
開啟生產者消息確認
spring: rabbitmq: host: localhost port: 5672 virtual-host: / username: root password: root # 開啟兩個模式的生產者消息確認 publisher-confirm-type: simple publisher-returns: true -
聲明交換機、隊列,綁定交換機和隊列
@Configuration public class RabbitMQConfig { private static final String SB_TOPIC_EXCHANGE="sb_topic_exchange"; private static final String SB_TOPIC_QUEUE="sb_topic_queue1"; // 注入交換機 topic類型 @Bean("topicExchange") public Exchange topicExchange(){ return ExchangeBuilder.topicExchange(SB_TOPIC_EXCHANGE).durable(true) .autoDelete().build(); } // 聲明隊列 @Bean public Queue queue1(){ return QueueBuilder.durable(SB_TOPIC_QUEUE).build(); } // 綁定隊列和交換機 @Bean public Binding exchangQueue(@Qualifier("queue1") Queue queue, @Qualifier("topicExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("user.#").noargs(); } } -
創建消費者
@Component @RabbitListener(queues = "sb_topic_queue1") public class Consumer { @RabbitHandler public void testPublishConfirm(String msg) { System.out.println("收到的信息:"+msg); } } -
創建生產者
創建生產者發送消息到消息隊列,模擬兩種異常情況
@SpringBootTest class RabiitmqSpringbootApplicationTests { @Autowired RabbitTemplate template; @Test void testConfirmTrue() { // 設置confirm回調函數 template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) { if (b) System.out.println("消息發送成功"); else System.out.println("消息發送失敗"); } }); // 模擬生產者發送信息--正常情況 template.convertAndSend("sb_topic_exchange","user.info","日志級別:info;日志模塊:user;日志信息:*****"); } @Test void testConfirmFalse() { // 設置confirm回調函數 template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, java.lang.String s) { if (b) System.out.println("消息發送成功"); else System.out.println("消息發送失敗"); } }); // 模擬生產者發送信息 // 不存在的交換機--異常情況 template.convertAndSend("sb_topic_exchange_noexist","user.info","日志級別:info;日志模塊:user;日志信息:*****"); } @Test void testReturnFalse() { // 設置return回調函數 template.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int i, java.lang.String s, java.lang.String s1, java.lang.String s2) { System.out.println(message.toString()); System.out.println(s+"*********"); } }); template.setMandatory(true); // 模擬生產者發送信息 // 正確的交換機 錯誤的routekey -- 異常情況 template.convertAndSend("sb_topic_exchange","noexist.user.info","日志級別:info;日志模塊:user;日志信息:*****"); } }
消費者確認
重點在於消費者的下面兩個方法
- channel.basicAck 消費者簽收
- channel.basicNAck 消費者拒絕簽收
-
開啟消費者確認模式
spring: rabbitmq: host: localhost port: 5672 virtual-host: / username: root password: root # 設置消費端手動簽收 listener: direct: acknowledge-mode: manual simple: acknowledge-mode: manual -
創建消費者
/** * 注入消費者--手動簽到 */ @Component @RabbitListener(queues = "sb_topic_queue1") public class Consumer2 { @RabbitHandler public void testComsumer(String msg, Channel channel, Message message) throws InterruptedException, IOException { // 消費端設置手動簽收代碼 try { System.out.println(msg); // 正常簽收,mq收到此消息被正常簽收后即可從隊列中刪除vi信息 // 是喲了那個channel的方法 // 第一個參數是deliverytag 標識哪條信息 第二個參數是是否批量簽收 // int i=2/0; 模擬異常 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); System.out.println("消費者簽收了該信息,服務器你可以刪了"); }catch (Exception e){ // 異常拒絕簽收,讓mq重發此信息 System.out.println("該信息丟了,給我重發"); channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true); // 該信息丟了,但是不需要你重發 // channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false); } } } -
創建生產者
@SpringBootTest class RabiitmqSpringbootApplicationTests { @Autowired RabbitTemplate template; @Test void testConsumerAck() { template.convertAndSend("sb_topic_exchange","noexist.user.info","日志級別:info;日志模塊:user;日志信息:*****"); } }
