一般的,我們在RocketMQ處理消息的時候,可能會在消費者中使用類似下面的代碼。
1 consumer.registerMessageListener(new MessageListenerConcurrently() { 2 @Override 3 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 4 ConsumeConcurrentlyContext context) { 5 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); 6 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 7 } 8 });
如果消息被成功消費的話,會返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態,但是如果消息消費失敗的話,又會怎么處理呢?
其實我們只要找到ConsumeConcurrentlyStatus這個枚舉就能知道RocketMQ是如何處理了,代碼如下:
1 public enum ConsumeConcurrentlyStatus { 2 /** 3 * Success consumption 4 */ 5 CONSUME_SUCCESS, 6 /** 7 * Failure consumption,later try to consume 8 */ 9 RECONSUME_LATER; 10 }
rocketmq保持的是分布式數據最終一致性的,而不是強一致性的。如果消費者確實重試了15次之后還是沒有消費成功,還是最終交給由人工處理。
