〇、什么是消息隊列
一、應答模式
1.什么是應答?
消息投遞到交換器(exchange)中,交換器給我們的反饋,是保障消息投遞成功的一種機制。
2.測試
配置:
1 #選擇確認類型為交互
2 spring.rabbitmq.publisher-confirm-type=correlated
測試方法:
1 @Test 2 /** 3 * the test is testing confirm-function in rabbitmq 4 */ 5 void messageSendTestWithConfirm(){ 6 7 /* 8 * 設置消息確認回調方法; 9 * @ack 為true時,表示投遞成功;為false表示投遞失敗; 10 * @CorrelationData 為自定義反饋信息; 11 * @cause 為投遞失敗的原因; 12 */ 13 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { 14 @Override 15 public void confirm(CorrelationData correlationData, boolean ack, String cause) { 16 if(!ack){ 17 logger.error("correlationData:"+ correlationData); 18 logger.error("ack:"+ack); 19 logger.error("cause:"+cause); 20 } 21 } 22 }); 23 24 //消息內容 25 Map<String,String> map = new HashMap<>(); 26 map.put("message","testing confire function"); 27 28 //設置自定義反饋消息 29 String uuid = UUID.randomUUID().toString(); 30 logger.info("消息唯一ID:"+uuid); 31 CorrelationData correlationData = new CorrelationData(); 32 correlationData.setId(uuid); 33 34 //並不存在名為“exchange-dog”的exchange 35 rabbitTemplate.convertAndSend("exchange-dog","dog",map,correlationData); 36 37 }
測試結果
1 c.d.amqp.SpringBootAmqpApplicationTests : 消息唯一ID:e6601e83-fad7-4b53-9968-c74828e62b23 2 o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.22.130:5672] 3 o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#f7cdf8:0/SimpleConnection@397ef2 [delegate=amqp://guest@192.168.22.130:5672/, localPort= 8055] 4 o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exchange-dog' in vhost '/', class-id=60, method-id=40) 5 c.d.amqp.SpringBootAmqpApplicationTests : correlationData:CorrelationData [id=e6601e83-fad7-4b53-9968-c74828e62b23] 6 c.d.amqp.SpringBootAmqpApplicationTests : ack:false 7 c.d.amqp.SpringBootAmqpApplicationTests : cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exchange-dog' in vhost '/', class-id=60, method-id=40)
注意:Confirm模式只管有無投遞到exchange,而不管有無發送到隊列當中。
二、返回模式
1.什么是返回模式?
當消息未投遞到queue時的反饋。
2.測試
配置:
1 #開啟返回模式
2spring.rabbitmq.publisher-returns=true
測試方法:
1 @Test 2 void messageSendTestWithReturn(){ 3 /* 4 * 設置消息返回回調方法; 5 * 該方法執行時則表示消息投遞失敗 6 * @message 為反饋信息; 7 * @replyCode 一個反饋代碼,表示不同投遞失敗原因; 8 * @replyText 反饋信息 9 */ 10 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { 11 @Override 12 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { 13 logger.error("返回消息配置:"+message.getMessageProperties().toString()); 14 logger.error("反饋代碼:"+replyCode); 15 logger.error("反饋內容:"+replyText); 16 logger.error("exchange:"+exchange); 17 logger.error("routingKey:"+routingKey); 18 } 19 }); 20 21 //消息內容 22 Map<String,String> map = new HashMap<>(); 23 map.put("message","testing return function"); 24 25 //並不存在名為“dog”的routingKey,即投不到現有的queue里 26 rabbitTemplate.convertAndSend("exchange-direct","dog",map); 27 }
測試結果:
1 c.d.amqp.SpringBootAmqpApplicationTests : 返回消息配置:MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0] 2 c.d.amqp.SpringBootAmqpApplicationTests : 反饋代碼:312 3 c.d.amqp.SpringBootAmqpApplicationTests : 反饋內容:NO_ROUTE 4 c.d.amqp.SpringBootAmqpApplicationTests : exchange:exchange-direct 5 c.d.amqp.SpringBootAmqpApplicationTests : routingKey:dog
三、限流策略(手動應答消息)
1.為什么要限流?
若隊列中消息積壓過多,突然開啟監聽,會導致消費端崩潰。
2.如何限流?
使用RabbitMQ提供的Qos(服務質量保證)功能,如果一定數目消息的未被應答前,不再接受新消息。
3.測試
配置
1 #手動消息應答
2 spring.rabbitmq.listener.simple.acknowledge-mode=manual
測試
1 /* 2 * 消息手動應答 3 * @RabbitListener注解監聽來自指定隊列的消息 4 */ 5 6 @RabbitListener(queues = "springboot-queue") 7 public void revice(Message message,Channel channel) throws IOException { 8 try{ 9 logger.info("消息ID:"+message.getMessageProperties().getHeader("spring_returned_message_correlation")); 10 logger.info("消息標簽:"+String.valueOf(message.getMessageProperties().getDeliveryTag())); 11 /* 設置Qos機制 12 * 第一個參數:單條消息的大小(0表示即無限制) 13 * 第二個參數:每次處理消息的數量 14 * 第三個參數:是否為consumer級別(false表示僅當前channel有效) 15 */ 16 channel.basicQos(0,1,false); 17 //手動應答消息 第一個參數是所確認消息的標識,第二參數是是否批量確認 18 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 19 20 }catch (Exception e){ 21 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); 22 logger.error("消息ID:"+message.getMessageProperties().getHeader("spring_returned_message_correlation")); 23 logger.error("接收消息發送錯誤:"+e.getMessage()); 24 } 25 }
四、死信隊列
1.什么是死信?
未被正常處理的消息。
2.出現死信的情況
(1)消息被拒絕(reject、nack),並且不重新放回原隊列(requeue=false);
(2)消息過期(設置了Expiration);
(3)隊列已滿。
3.配置
以下代碼配置了一個正常的exchange、queue和 一個專門接收死信的exchange、queue
在隊列中配置x-dead-letter-exchange參數,表示本隊列出現死信,則轉發到配置指定的交換器中去
1 /* 2 * 創建消息隊列的對象:exchange、queue、綁定規則 3 */ 4 @Test 5 void createObjectOfMQ(){ 6 7 /* 8 * 在普通隊列上配置參數。表示若本隊列有死信,則轉發到配置指定的轉發器中去 9 * @參數鍵:x-dead-letter-exchange 10 * @參數名:接收死信的交換器 11 */ 12 Map<String,Object> arguments = new HashMap<>(); 13 arguments.put("x-dead-letter-exchange","springboot-dlx-exchange"); 14 15 amqpAdmin.declareExchange(new DirectExchange("springboot-direct")); 16 amqpAdmin.declareQueue(new Queue("springboot.queue",true,false,false,arguments)); 17 amqpAdmin.declareBinding(new Binding("springboot.queue", Binding.DestinationType.QUEUE,"springboot-direct","springboot",null)); 18 19 //接收死信的交換器 20 amqpAdmin.declareExchange(new TopicExchange("springboot-dlx-exchange")); 21 //交換器收到的死信都轉發到該隊列,#表示接收所有消息 22 amqpAdmin.declareQueue(new Queue("springboot-dlx.queue",true)); 23 amqpAdmin.declareBinding(new Binding("springboot-dlx.queue", Binding.DestinationType.QUEUE,"springboot-dlx-exchange","#",null)); 24 }
運行以上方法后,可在RabbitMQ管理界面看到兩個exchange、兩個queue成功創建

4.發送消息
1 @Test 2 void sendMessageWithTTL(){ 3 String str = "dlx test"; 4 5 //setExpiration表示設置該消息存活時間 6 //5秒后該消息未被消息,則轉發到死信交換器 7 Message message = MessageBuilder.withBody(str.getBytes()) 8 .setExpiration("5000") 9 .setContentEncoding("UTF-8") 10 .setMessageId("dlx-001") 11 .build(); 12 13 //將消息發送到配置了"x-dead-letter-exchange"參數的隊列 14 rabbitTemplate.convertAndSend("springboot-direct","springboot",message); 15 }
發送完消息,過5秒之后可以看到信息已被投遞到死信隊列中去了

setExpiration
