Spring Boot系列(8)——RabbitMQ確認、退回模式及死信隊列


〇、什么是消息隊列

  參考:新手也能看懂,消息隊列其實很簡單

        RabbitMQ運行模型與名詞解釋

 

一、應答模式

  1.什么是應答?

    消息投遞到交換器(exchange)中,交換器給我們的反饋,是保障消息投遞成功的一種機制。

  2.測試

  配置:

 1 #選擇確認類型為交互
 2 spring.rabbitmq.publisher-confirm-type=correlated

 

  測試方法:

 1     @Test
 2     /**
 3      * the test is testing confirm-function in rabbitmq
 4      */
 5     void messageSendTestWithConfirm(){
 6 
 7         /*
 8          * 設置消息確認回調方法;
 9          * @ack 為true時,表示投遞成功;為false表示投遞失敗;
10          * @CorrelationData 為自定義反饋信息;
11          * @cause 為投遞失敗的原因;
12          */
13         rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
14             @Override
15             public void confirm(CorrelationData correlationData, boolean ack, String cause) {
16                 if(!ack){
17                     logger.error("correlationData:"+ correlationData);
18                     logger.error("ack:"+ack);
19                     logger.error("cause:"+cause);
20                 }
21             }
22         });
23 
24         //消息內容
25         Map<String,String> map = new HashMap<>();
26         map.put("message","testing confire function");
27 
28         //設置自定義反饋消息
29         String uuid = UUID.randomUUID().toString();
30         logger.info("消息唯一ID:"+uuid);
31         CorrelationData correlationData = new CorrelationData();
32         correlationData.setId(uuid);
33 
34         //並不存在名為“exchange-dog”的exchange
35         rabbitTemplate.convertAndSend("exchange-dog","dog",map,correlationData);
36 
37     }

 

  測試結果

1  c.d.amqp.SpringBootAmqpApplicationTests  : 消息唯一ID:e6601e83-fad7-4b53-9968-c74828e62b23 2  o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.22.130:5672]
3  o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#f7cdf8:0/SimpleConnection@397ef2 [delegate=amqp://guest@192.168.22.130:5672/, localPort= 8055]
4  o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exchange-dog' in vhost '/', class-id=60, method-id=40)
5  c.d.amqp.SpringBootAmqpApplicationTests  : correlationData:CorrelationData [id=e6601e83-fad7-4b53-9968-c74828e62b23]
6  c.d.amqp.SpringBootAmqpApplicationTests  : ack:false
7  c.d.amqp.SpringBootAmqpApplicationTests  : cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exchange-dog' in vhost '/', class-id=60, method-id=40)

 

  注意:Confirm模式只管有無投遞到exchange,而不管有無發送到隊列當中。

 

二、返回模式

  1.什么是返回模式?

    當消息未投遞到queue時的反饋。

 

  2.測試

  配置:

1 #開啟返回模式
2spring.rabbitmq.publisher-returns=true

 

  測試方法:

 1     @Test
 2     void messageSendTestWithReturn(){
 3         /*
 4          * 設置消息返回回調方法;
 5          * 該方法執行時則表示消息投遞失敗
 6          * @message 為反饋信息;
 7          * @replyCode 一個反饋代碼,表示不同投遞失敗原因;
 8          * @replyText 反饋信息
 9          */
10         rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
11             @Override
12             public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
13                 logger.error("返回消息配置:"+message.getMessageProperties().toString());
14                 logger.error("反饋代碼:"+replyCode);
15                 logger.error("反饋內容:"+replyText);
16                 logger.error("exchange:"+exchange);
17                 logger.error("routingKey:"+routingKey);
18             }
19         });
20 
21         //消息內容
22         Map<String,String> map = new HashMap<>();
23         map.put("message","testing return function");
24 
25         //並不存在名為“dog”的routingKey,即投不到現有的queue里
26         rabbitTemplate.convertAndSend("exchange-direct","dog",map);
27     }

 

  測試結果:

1 c.d.amqp.SpringBootAmqpApplicationTests  : 返回消息配置:MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]
2 c.d.amqp.SpringBootAmqpApplicationTests  : 反饋代碼:312 3 c.d.amqp.SpringBootAmqpApplicationTests  : 反饋內容:NO_ROUTE 4 c.d.amqp.SpringBootAmqpApplicationTests  : exchange:exchange-direct
5 c.d.amqp.SpringBootAmqpApplicationTests  : routingKey:dog

 

三、限流策略(手動應答消息)

  1.為什么要限流?

    若隊列中消息積壓過多,突然開啟監聽,會導致消費端崩潰。

 

  2.如何限流?

    使用RabbitMQ提供的Qos(服務質量保證)功能,如果一定數目消息的未被應答前,不再接受新消息。

 

  3.測試

  配置

1 #手動消息應答
2 spring.rabbitmq.listener.simple.acknowledge-mode=manual

  

  測試

 1     /*
 2      * 消息手動應答
 3      * @RabbitListener注解監聽來自指定隊列的消息
 4      */
 5 
 6     @RabbitListener(queues = "springboot-queue")
 7     public void revice(Message message,Channel channel) throws IOException {
 8         try{
 9             logger.info("消息ID:"+message.getMessageProperties().getHeader("spring_returned_message_correlation"));
10             logger.info("消息標簽:"+String.valueOf(message.getMessageProperties().getDeliveryTag()));
11             /* 設置Qos機制
12              * 第一個參數:單條消息的大小(0表示即無限制)
13              * 第二個參數:每次處理消息的數量
14              * 第三個參數:是否為consumer級別(false表示僅當前channel有效)
15              */
16             channel.basicQos(0,1,false);
17             //手動應答消息  第一個參數是所確認消息的標識,第二參數是是否批量確認
18             channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
19             
20         }catch (Exception e){
21             channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
22             logger.error("消息ID:"+message.getMessageProperties().getHeader("spring_returned_message_correlation"));
23             logger.error("接收消息發送錯誤:"+e.getMessage());
24         }
25     }

 

四、死信隊列

  1.什么是死信?

    未被正常處理的消息。

 

  2.出現死信的情況

    (1)消息被拒絕(reject、nack),並且不重新放回原隊列(requeue=false);

    (2)消息過期(設置了Expiration);

    (3)隊列已滿。

 

  3.配置

  以下代碼配置了一個正常的exchange、queue和 一個專門接收死信的exchange、queue

  在隊列中配置x-dead-letter-exchange參數,表示本隊列出現死信,則轉發到配置指定的交換器中去

 1     /*
 2      * 創建消息隊列的對象:exchange、queue、綁定規則
 3      */
 4     @Test
 5     void createObjectOfMQ(){
 6 
 7         /*
 8          * 在普通隊列上配置參數。表示若本隊列有死信,則轉發到配置指定的轉發器中去
 9          * @參數鍵:x-dead-letter-exchange
10          * @參數名:接收死信的交換器
11          */
12         Map<String,Object> arguments = new HashMap<>();
13         arguments.put("x-dead-letter-exchange","springboot-dlx-exchange"); 14 
15         amqpAdmin.declareExchange(new DirectExchange("springboot-direct"));
16         amqpAdmin.declareQueue(new Queue("springboot.queue",true,false,false,arguments));
17         amqpAdmin.declareBinding(new Binding("springboot.queue", Binding.DestinationType.QUEUE,"springboot-direct","springboot",null));
18 
19         //接收死信的交換器
20         amqpAdmin.declareExchange(new TopicExchange("springboot-dlx-exchange"));
21         //交換器收到的死信都轉發到該隊列,#表示接收所有消息
22         amqpAdmin.declareQueue(new Queue("springboot-dlx.queue",true));
23         amqpAdmin.declareBinding(new Binding("springboot-dlx.queue", Binding.DestinationType.QUEUE,"springboot-dlx-exchange","#",null));
24     }

 

  運行以上方法后,可在RabbitMQ管理界面看到兩個exchange、兩個queue成功創建

 

  4.發送消息

 1    @Test
 2     void sendMessageWithTTL(){
 3         String str = "dlx test";
 4 
 5         //setExpiration表示設置該消息存活時間
 6         //5秒后該消息未被消息,則轉發到死信交換器
 7         Message message = MessageBuilder.withBody(str.getBytes())
 8                 .setExpiration("5000")  9                 .setContentEncoding("UTF-8")
10                 .setMessageId("dlx-001")
11                 .build();
12 
13         //將消息發送到配置了"x-dead-letter-exchange"參數的隊列
14         rabbitTemplate.convertAndSend("springboot-direct","springboot",message);
15     }

 

   發送完消息,過5秒之后可以看到信息已被投遞到死信隊列中去了

 

 

 

 

setExpiration


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM