rabbit mq 手動重試機制


消息手動確認模式的幾點說明

  • 監聽的方法內部必須使用channel進行消息確認,包括消費成功或消費失敗

  • 如果不手動確認,也不拋出異常,消息不會自動重新推送(包括其他消費者),因為對於rabbitmq來說始終沒有接收到消息消費是否成功的確認,並且Channel是在消費端有緩存的,沒有斷開連接

  • 如果rabbitmq斷開,連接后會自動重新推送(不管是網絡問題還是宕機)

  • 如果消費端應用重啟,消息會自動重新推送

  • 如果消費端處理消息的時候宕機,消息會自動推給其他的消費者

  • 如果監聽消息的方法拋出異常,消息會按照listener.retry的配置進行重發,但是重發次數完了之后還拋出異常的話,消息不會重發(也不會重發到其他消費者),只有應用重啟后會重新推送。因為retry是消費端內部處理的,包括異常也是內部處理,對於rabbitmq是不知道的(此場景解決方案后面有)

  • spring.rabbitmq.listener.retry配置的重發是在消費端應用內處理的,不是rabbitqq重發

  • 可以配置MessageRecoverer對異常消息進行處理,此處理會在listener.retry次數嘗試完並還是拋出異常的情況下才會調用,默認有兩個實現:

    • RepublishMessageRecoverer:將消息重新發送到指定隊列,需手動配置,如:
    1. @Bean
    2. public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
    3. return new RepublishMessageRecoverer(rabbitTemplate, "exchangemsxferror", "routingkeymsxferror");
    4. }
    • RejectAndDontRequeueRecoverer:如果不手動配置MessageRecoverer,會默認使用這個,實現僅僅是將異常打印拋出,源碼如下:
  1. public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
  2.  
  3. protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class);
  4.  
  5. @Override
  6. public void recover(Message message, Throwable cause) {
  7. if (this.logger.isWarnEnabled()) {
  8. this.logger.warn("Retries exhausted for message " + message, cause);
  9. }
  10. throw new ListenerExecutionFailedException("Retry Policy Exhausted", new AmqpRejectAndDontRequeueException(cause), message);
  11. }
  12.  
  13. }
  • 可以通過給隊列(Queue)綁定死信隊列,使用nack反饋給mq,會將消息轉發到死信隊列里面,此種方式需要自己在消費消息的方法內部將異常處理掉

  1. //聲明隊列,並給隊列增加x-dead-letter-exchange和x-dead-letter-routing-key參數,用於指定死信隊列的路由和routingKey
  2. @Bean
  3. public Queue queue(){
  4. Map<String, Object> args = new HashMap<String, Object>();
  5. args.put( "x-dead-letter-exchange",IntegralConstant.DEAD_EXCHANGE_NAME);
  6. args.put( "x-dead-letter-routing-key",IntegralConstant.DEAD_ROUTING_KEY);
  7. return new Queue(IntegralConstant.QUEUE_NAME, true, false, false, args);
  8. }
  9.  
  10. //消息確認時使用nack,並且requeue參數傳false
  11. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  • 消息變成死信有以下幾種情況:
    • 消息被拒絕(basic.reject/ basic.nack)並且requeue=false
    • 消息TTL過期(參考:RabbitMQ之TTL(Time-To-Live 過期時間))
    • 隊列達到最大長度

總結

  • 消息監聽內必須使用channel對消息進行確認,不管是確認消費成功還是確認消費失敗
  • 消息監聽內的異常處理有兩種方式:
    • 內部catch后直接處理,然后使用channel對消息進行確認
    • 配置RepublishMessageRecoverer將處理異常的消息發送到指定隊列專門處理或記錄
  • 監聽的方法內拋出異常貌似沒有太大用處。因為拋出異常就算是重試也非常有可能會繼續出現異常,當重試次數完了之后消息就只有重啟應用才能接收到了,很有可能導致消息消費不及時。當然可以配置RepublishMessageRecoverer來解決,但是萬一RepublishMessageRecoverer發送失敗了呢。。那就可能造成消息消費不及時了。所以即使需要將處理出現異常的消息統一放到另外隊列去處理,個人建議兩種方式:
    • catch異常后,手動發送到指定隊列,然后使用channel給rabbitmq確認消息已消費
    • 給Queue綁定死信隊列,使用nack(requque為false)確認消息消費失敗


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM