重試機制
由於MQ經常處於復雜的分布式系統中,考慮網絡波動,服務宕機,程序異常因素,很有可能出現消息發送或者消費失敗的問題。因此,消息的重試就是所有MQ中間件必須考慮到的一個關鍵點。如果沒有消息重試,就可能產生消息丟失的問題,可能對系統產生很大的影響。所以,秉承寧可多發消息,也不可丟失消息的原則,大部分MQ都對消息重試提供了很好的支持。
RocketMQ為了使用者封裝了消息重試的處理流程,無需開發人員手動處理。RocketMQ支持了生產端和消費端兩類重試機制。
模擬生產端重試
Consumer端消息消費兩種狀態:
package com.alibaba.rocketmq.client.consumer.listener;
public enum ConsumeConcurrentlyStatus {
CONSUME_SUCCESS,
RECONSUME_LATER;
private ConsumeConcurrentlyStatus() {
}
}
一個是成功(CONSUME_SUCCESS),一個是失敗&重試(RECONSUME_LATER);
Consumer為了保證消息消費成功,只有使用方明確表示消費成功,返回CONSUME_SUCCESS,RocketMQ才會認為消息消費成功。
如果消息消費失敗,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會認為消息消費失敗了,需要重新投遞。
1.出現異常
package com.wn.consumer;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.util.List;
public class MQConsumer {
public static void main(String[] args) throws MQClientException {
//創建消費者
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
//設置NameServer地址
consumer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
//設置消費者實例名稱
consumer.setInstanceName("consumer");
//訂閱topic
consumer.subscribe("wn02","TagA");
//監聽消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//獲取消息
for (MessageExt msg:list){
System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
}
try {
int i=1/0;
}catch (Exception e){
e.printStackTrace();
//需要重試
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//消息成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started...");
}
}
2.網絡延遲
package com.wn.consumer;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.util.List;
public class MQConsumer {
public static void main(String[] args) throws MQClientException {
//創建消費者
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
//設置NameServer地址
consumer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
//設置消費者實例名稱
consumer.setInstanceName("consumer");
//訂閱topic
consumer.subscribe("wn03","TagA");
//監聽消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//獲取消息
for (MessageExt msg:list){
System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
}
//網絡延遲
try {
Thread.sleep(600000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//消息成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started...");
}
}

