一般的,我们在RocketMQ处理消息的时候,可能会在消费者中使用类似下面的代码。
1 consumer.registerMessageListener(new MessageListenerConcurrently() { 2 @Override 3 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 4 ConsumeConcurrentlyContext context) { 5 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); 6 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 7 } 8 });
如果消息被成功消费的话,会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态,但是如果消息消费失败的话,又会怎么处理呢?
其实我们只要找到ConsumeConcurrentlyStatus这个枚举就能知道RocketMQ是如何处理了,代码如下:
1 public enum ConsumeConcurrentlyStatus { 2 /** 3 * Success consumption 4 */ 5 CONSUME_SUCCESS, 6 /** 7 * Failure consumption,later try to consume 8 */ 9 RECONSUME_LATER; 10 }
rocketmq保持的是分布式数据最终一致性的,而不是强一致性的。如果消费者确实重试了15次之后还是没有消费成功,还是最终交给由人工处理。