rocketmq消費者消費事務消息失敗總結


一般的,我們在RocketMQ處理消息的時候,可能會在消費者中使用類似下面的代碼。

1 consumer.registerMessageListener(new MessageListenerConcurrently() { 2  @Override 3     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 4  ConsumeConcurrentlyContext context) { 5         System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); 6         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 7  } 8 });

如果消息被成功消費的話,會返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態,但是如果消息消費失敗的話,又會怎么處理呢?
其實我們只要找到ConsumeConcurrentlyStatus這個枚舉就能知道RocketMQ是如何處理了,代碼如下:

 1 public enum ConsumeConcurrentlyStatus {
 2     /**
 3      * Success consumption
 4      */
 5     CONSUME_SUCCESS,
 6     /**
 7      * Failure consumption,later try to consume
 8      */
 9     RECONSUME_LATER;
10 }

 rocketmq保持的是分布式數據最終一致性的,而不是強一致性的。如果消費者確實重試了15次之后還是沒有消費成功,還是最終交給由人工處理。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM