重復消費的問題的一個可能的問題:消費者消費消息時產生了異常,並沒有返回CONSUME_SUCCESS標志。
重復消費的消息和第一次消費的消息不同,多了一些重復消費的信息:
reconsumeTimes=1,2,…10
REAL_TOPIC也會是:%RETRY%XXXXX
這就是因為消息處理異常導致的消息重新消費,無路時重啟服務端,還是通過mqadmin刪除都沒用,RocketMQ可以很好的保持消息,一定要消費成功才可以!
官方對comsumerMessage方法的實現建議是:
It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure
無論如何,都不要拋出異常,如果需要重新消費,可以返回RECONSUME_LATER主動要求重新消費。
要加入catch Exception根據異常來捕獲業務處理的異常。
1 consumer.registerMessageListener(new MessageListenerConcurrently() { 2 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 3 ConsumeConcurrentlyContext context) { 4 logger.debug(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); 5 MessagePack msgpack = new MessagePack(); 6 for (MessageExt msg : msgs){ 7 byte[] data = msg.getBody(); 8 try { 9 RTMsgPack rtmsg = msgpack.read(data, RTMsgPack.class); 10 logger.debug("Receive a message:" + rtmsg); 11 anlysisRTMsgPack(rtmsg, engine); 12 } catch (IOException e) { 13 logger.error("Unpack RTMsg:", e); 14 } catch (Exception e1){ 15 logger.warn("Unexcepted exception.", e1); 16 } 17 } 18 logger.debug("RETURN CONSUME SUCCESS."); 19 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 20 } 21 });
