一、可靠性問題分析
在前面說過消息的傳遞過程中有三個對象參與分別是:生產者、RabbitMQ(broker)、消費者;接下來就是要圍繞這三個對象來分析消息在傳遞過程中會在哪些環節出來可靠性問題;
1.1、生產者丟失消息
生產者發送消息到broker時,要保證消息的可靠性,主要的方案有以下2種:
1.事務
2.confirm機制
1.1.1、事務
RabbitMQ提供了事務功能,也即在生產者發送數據之前開啟RabbitMQ事務,然后再發送消息,如果消息沒有成功發送到RabbitMQ,那么就拋出異常,然后進行事務回滾,回滾之后再重新發送消息,如果RabbitMQ接收到了消息,那么進行事務提交,再開始發送下一條數據。
優點
保證消息一定能夠發送到RabbitMQ中,發送端不會出現消息丟失的情況;
缺點
事務機制是阻塞(同步)的,每次發送消息必須要等到mq回應之后才能繼續發送消息,比較耗費性能,會導致吞吐量降下來
1.1.2、confirm模式
基於事務的特性,作為補償,RabbitMQ添加了消息確認機制,也即confirm機制。confirm機制和事務機制最大的不同就是事務是同步的,confirm是異步的,發送完一個消息后可以繼續發送下一個消息,mq接收到消息后會異步回調接口告知消息接收結果。生產者開啟confirm模式后,每次發送的消息都會分配一個唯一id,如果消息成功發送到了mq中,那么就會返回一個ack消息,表示消息接收成功,反之會返回一個nack,告訴你消息接收失敗,可以進行重試。依據這個機制,我們可以維護每個消息id的狀態,如果超過一定時間還是沒有接收到mq的回調,那么就重發消息。
1.1.3、confirm模式代碼演示
其實這塊代碼在前面幾篇文章的代碼中有體現過;下面以springboot集成的方式再演示一種
pom.xml文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--web包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application.yml
spring: rabbitmq: #rabbitmq 連接配置 publisher-confirm-type: correlated # 開啟confirm確認模式 host: 192.168.0.1 port: 5672 username: admin password: admin server: port: 8081
@Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { /*** * @param correlationData 相關配置信息 * @param ack exchange交換機 是否成功收到了消息。true 成功,false代表失敗 * @param cause 失敗原因 * */ @Override public void confirm(CorrelationData correlationData ,boolean ack ,String cause) { if (ack){ //消息發送成功 System.out.println ("消息發送成功到交換機"); }else{ System.out.println ("發送失敗"+cause); } } }
/** * 定義隊列和交換機 */ @Configuration public class QueueConfig { @Bean(name="confirmTestExchange") public FanoutExchange confirmTestExchange(){ return new FanoutExchange("confirmTestExchange",true,false); } @Bean(name = "confirmTestQueue") public Queue confirmTestQueue(){ return new Queue("confirm_test_queue",true,false,false); } @Bean public Binding confirmTestFanoutExcangeAndQueue(@Qualifier("confirmTestQueue")Queue queue,@Qualifier("confirmTestExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } }
@RestController @RequestMapping(value = "/producer") @CrossOrigin public class Producer { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ConfirmCallbackService confirmCallbackService; @Autowired private ReturnCallbackService returnCallbackService; @GetMapping public void producer(){ rabbitTemplate.setConfirmCallback ( confirmCallbackService ); rabbitTemplate.convertAndSend ( "confirmTestExchange","","測試RabbitTemplate功能" ); } }

上面演示了消息投放到交換機的案例,下面演示一個消息從 exchange–>queue 投遞失敗則會返回一個 returnCallback的案例;生產端通過實現ReturnCallback接口,啟動消息失敗返回,消息路由不到隊列時會觸發該回調接口
spring: rabbitmq: #rabbitmq 連接配置 publisher-confirm-type: correlated # 開啟confirm確認模式 publisher-returns: true #開啟退回模式 host: 192.168.0.1 port: 5672 username: admin password: admin server: port: 8081
rabbitTemplate.setMandatory(true);
@Component public class ReturnCallbackService implements RabbitTemplate.ReturnCallback { /** * * @param message 消息對象 * @param i 錯誤碼 * @param s 錯誤信息 * @param s1 交換機 * @param s2 路由鍵 */ @Override public void returnedMessage(Message message ,int i ,String s ,String s1 ,String s2) { System.out.println("消息對象:" + message); System.out.println("錯誤碼:" + i); System.out.println("錯誤信息:" + s); System.out.println("消息使用的交換器:" + s1); System.out.println("消息使用的路由key:" + s2); //業務代碼處理 } }
yml配置
spring: rabbitmq: #rabbitmq 連接配置 publisher-confirm-type: correlated # 開啟confirm確認模式 publisher-returns: true #開啟退回模式 host: 124.71.33.75 port: 5672 username: admin password: ghy20200707rabbitmq server: port: 8081
public void producerLose(){ /** *確保消息發送失敗后可以重新返回到隊列中 */ rabbitTemplate.setMandatory(true); /** * 消息投遞確認模式 */ rabbitTemplate.setConfirmCallback(confirmCallbackService); /** * 消息投遞到隊列失敗回調處理 */ rabbitTemplate.setReturnCallback(returnCallbackService); CorrelationData correlationData = new CorrelationData("id_"+System.currentTimeMillis()+""); //發送消息 rabbitTemplate.convertAndSend("directExchange", "RabbitTemplate","測試RabbitTemplate功能" ,correlationData); }
測試接口

1.2、消費者丟失消息
其實在生產者和消費者中間,rabbitmq也是會丟失消息的,解決方案就是持久化存儲,這個方案在前面有講過;所以在這里就跳過;下面直接說消息確認機制ack,ack指Acknowledge確認。 表示消費端收到消息后的確認方式
- AcknowledgeMode.NONE:不確認
- AcknowledgeMode.AUTO:自動確認
- AcknowledgeMode.MANUAL:手動確認
spring: rabbitmq: #rabbitmq 連接配置 publisher-confirm-type: correlated # 開啟confirm確認模式 publisher-returns: true #開啟退回模式 host: 124.71.33.75 port: 5672 username: admin password: ghy20200707rabbitmq listener: simple: acknowledge-mode: manual #手動確認 server: port: 8081
/** * 消費者消息確認機制 */ @Component @RabbitListener(queues = "confirm_test_queue") public class ReceiverMessage { @RabbitHandler public void processHandler(String msg,Channel channel,Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("消息內容" + new String(message.getBody())); //TODO 具體業務邏輯 // 手動簽收[參數1:消息投遞序號,參數2:批量簽收] channel.basicAck(deliveryTag, true); } catch (Exception e) { //拒絕簽收[參數1:消息投遞序號,參數2:批量拒絕,參數3:是否重新加入隊列] channel.basicNack(deliveryTag, true, true); } } }
要想測試異常很簡單,在代碼加一個報錯語句就可以測試了,我這里就不搞事了;
二、消費端限流
2.1、TTL
三、死信隊列
- 隊列消息長度到達限制;
- 消費者拒接消費消息,basicNack/basicReject,並且不把消息重新放入原目標隊列,requeue=false;
- 原隊列存在消息過期設置,消息到達超時時間未被消費;
四、延遲隊列
