RocketMQ消費者-重復消費的問題解決


重復消費的問題的一個可能的問題:消費者消費消息時產生了異常,並沒有返回CONSUME_SUCCESS標志。

重復消費的消息和第一次消費的消息不同,多了一些重復消費的信息:
reconsumeTimes=1,2,…10
REAL_TOPIC也會是:%RETRY%XXXXX
這就是因為消息處理異常導致的消息重新消費,無路時重啟服務端,還是通過mqadmin刪除都沒用,RocketMQ可以很好的保持消息,一定要消費成功才可以!


官方對comsumerMessage方法的實現建議是:

It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure

無論如何,都不要拋出異常,如果需要重新消費,可以返回RECONSUME_LATER主動要求重新消費。

要加入catch Exception根據異常來捕獲業務處理的異常。

 1 consumer.registerMessageListener(new MessageListenerConcurrently() {
 2                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
 3                     ConsumeConcurrentlyContext context) {
 4                     logger.debug(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
 5                     MessagePack msgpack = new MessagePack();
 6                     for (MessageExt msg : msgs){
 7                         byte[] data = msg.getBody();
 8                         try {
 9                             RTMsgPack rtmsg = msgpack.read(data, RTMsgPack.class);
10                             logger.debug("Receive a message:" + rtmsg);
11                             anlysisRTMsgPack(rtmsg, engine);
12                         } catch (IOException e) {
13                             logger.error("Unpack RTMsg:", e);
14                         } catch (Exception e1){
15                             logger.warn("Unexcepted exception.", e1);
16                         }
17                     }
18                     logger.debug("RETURN CONSUME SUCCESS.");
19                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
20                 }
21             });

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM