前言
在使用rabbitmq時,我們可以通過消息持久化來解決服務器因異常崩潰而造成的消息丟失。除此之外,我們還會遇到一個問題,當消息生產者發消息發送出去后,消息到底有沒有正確到達服務器呢?如果不進行特殊配置,默認情況下發送的消息是不會給生產者返回任何響應的,也就是默認情況下生產者並不知道消息是否正常到達了服務器。對於數據必達的需求,你肯定對消息的來龍去脈都有個了接,這種情況下就需要用到rabbitmq消息確認。
消息確認
rabbitmq消息確認分為生產者確認和消費者確認。
生產者消費確認提供了兩種機制:
- 通過事務機制實現
- 通過confirm機制實現
事務機制則用到channel.txSelect、channel.txCommit、channel.txRollback。可以參考下面AMQP協議流轉過程(參考Rabbitmq實戰指南)
事務機制在一條消息發送之后會阻塞發送端,以等待rabbitmq回應,之后才繼續發送下一條消息。所以相對來說事務機制的性能要差一些。事務機制會降低rabbitmq的吞吐量,所以又引入了另一種輕量級的方式:confirm機制。
生產者通過調用channel.confirmSelect將信道設置為confirm模式,之后Rabbitmq會返回Confirm.Select-Ok命令表示同意生產者將當前信道設置為confirm模式。所有被發送的后續消息都被ack或nack一次。類似如下代碼:
channel.confirmSelect()
channel.basicPublish("exchange","routingkey",null,"test".getBytes())
confirm機制流轉過程參考下圖(參考Rabbitmq實戰指南)
消費者確認
消費者在訂閱消息隊列時指定autoAck參數。當參數設置為false時rabbitmq會等待消費者顯式回復確認信號才會從內存或者磁盤種刪除這條消息。參數默認為true。當autoAck設置為false時,對於rabbitmq服務器而言,隊列中的消息分成了兩部分:一部分是等待投遞給消費者的消息、一部分是已經投遞給消費者的消息但是還沒有收到確認信號的消息。可通過RabbitMQ Web平台查看隊列中Ready和UnAck對應的數量。
消費者消息確認涉及到3個方法:channel.basicAck、channel.basicNack、channel.basicReject
SpringBoot集成rabbitmq下實現消息確認
springboot集成rabbitmq實現消息確認主要涉及兩個回調方法(ReturnCallback、ConfirmCallback)。這里消費者部分我用兩種方式來實現。一種是基於SimpleMessageListenerContainer。 另一種就是用RabbitListener注解實現。
1、application.yml
spring: rabbitmq: host: 192.168.80.128 port: 5672 username: admin password: admin virtual-host: / publisher-confirms: true publisher-returns: true listener: simple: acknowledge-mode: manual concurrency: 1 max-concurrency: 10 retry: enabled: true
2、配置文件(這里實現ReturnCallback、ConfirmCallback)
import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.lang.Nullable; @Configuration public class MqConfig { private Logger logger= LoggerFactory.getLogger(MqConfig.class); @Autowired RabbitTemplate rabbitTemplate; @Autowired ConnectionFactory connectionFactory; @Bean public Queue queue(){ return new Queue("testMq",true); //持久化隊列(默認值也是true) } @Bean public DirectExchange directExchange(){ return new DirectExchange("testMq",true,false); } @Bean Binding binding(Queue queue,DirectExchange directExchange){ return BindingBuilder.bind(queue).to(directExchange).with("testMq"); } /** * i->replyCode * s->replyText * s1->exchange * s2->routingKey * **/ //消息從交換器發送到隊列失敗時觸發 RabbitTemplate.ReturnCallback msgReturnCallback=new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { logger.info("消息:{},錯誤碼:{},失敗原因:{},交換器:{},路由key:{}",message.getMessageProperties().getCorrelationId(),i,s,s1,s2); } }; //消息發送到交換器時觸發 RabbitTemplate.ConfirmCallback msgConfirmCallback=new RabbitTemplate.ConfirmCallback() { @Override public void confirm(@Nullable CorrelationData correlationData, boolean b, @Nullable String s) { if(b){ logger.info("消息{}發送exchange成功",correlationData.getId()); }else{ logger.info("消息發送到exchange失敗,原因:{}",s); } } }; /*** * 消費者確認(方式二) * **/ @Bean public SimpleMessageListenerContainer listenerContainer(){ SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("testMq"); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(10); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { try{ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); logger.info("接收消息:{}",new String(message.getBody())); }catch (Exception ex){ //channel.basicReject //channel.basicNack } } }); return container; } /** * 生產者的回調都在這里 * **/ @Autowired public RabbitTemplate rabbitTemplate(){ //消息發送失敗后返回到隊列中 rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(msgReturnCallback); rabbitTemplate.setConfirmCallback(msgConfirmCallback); return rabbitTemplate; } }
另一種消費端實現方式
import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class MqConsumer { private Logger logger= LoggerFactory.getLogger(MqConsumer.class); @RabbitListener(queues = "testMq") public void handler(Message message,Channel channel){ try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); logger.info("接收消息:{}",new String(message.getBody())); } catch (IOException e) { e.printStackTrace(); } } }
3、消息生產者
消息發送時注意生成一個消息id。一開始沒用到這個參數,在消息接收時消費者會拋空指針異常
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.util.UUID; @Controller @RequestMapping("/rabbitMq") public class MqController { private Logger logger= LoggerFactory.getLogger(MqController.class); @Autowired RabbitTemplate rabbitTemplate; @RequestMapping("/sendMq") @ResponseBody public String sendMq(){ /** * 這里exchange、routingkey都叫testMq * **/ Object message=null; for(int i=0;i<10;i++){ logger.info("生產者:第{}條消息",i); CorrelationData correlationId=new CorrelationData(UUID.randomUUID().toString()); message="第"+i+"條消息"; rabbitTemplate.convertAndSend("testMq","testMq",message,correlationId); } return "sending..."; } }
從運行截圖中可以看到生產者和消費者都收到對應的回調消息。