RocketMQ重試機制


重試機制

  由於MQ經常處於復雜的分布式系統中,考慮網絡波動,服務宕機,程序異常因素,很有可能出現消息發送或者消費失敗的問題。因此,消息的重試就是所有MQ中間件必須考慮到的一個關鍵點。如果沒有消息重試,就可能產生消息丟失的問題,可能對系統產生很大的影響。所以,秉承寧可多發消息,也不可丟失消息的原則,大部分MQ都對消息重試提供了很好的支持。

  RocketMQ為了使用者封裝了消息重試的處理流程,無需開發人員手動處理。RocketMQ支持了生產端和消費端兩類重試機制。

模擬生產端重試

  Consumer端消息消費兩種狀態:

復制代碼
package com.alibaba.rocketmq.client.consumer.listener;

public enum ConsumeConcurrentlyStatus {
    CONSUME_SUCCESS,
    RECONSUME_LATER;

    private ConsumeConcurrentlyStatus() {
    }
}
復制代碼

  一個是成功(CONSUME_SUCCESS),一個是失敗&重試(RECONSUME_LATER);

  Consumer為了保證消息消費成功,只有使用方明確表示消費成功,返回CONSUME_SUCCESS,RocketMQ才會認為消息消費成功。

  如果消息消費失敗,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會認為消息消費失敗了,需要重新投遞。

  1.出現異常

復制代碼
package com.wn.consumer;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

public class MQConsumer {
    public static void main(String[] args) throws MQClientException {
        //創建消費者
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
        //設置NameServer地址
        consumer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
        //設置消費者實例名稱
        consumer.setInstanceName("consumer");
        //訂閱topic
        consumer.subscribe("wn02","TagA");
        //監聽消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //獲取消息
                for (MessageExt msg:list){
                    System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
                }

                try {
                    int i=1/0;
                }catch (Exception e){
                    e.printStackTrace();
                    //需要重試
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
//消息成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started..."); } }
復制代碼

  2.網絡延遲

復制代碼
package com.wn.consumer;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

public class MQConsumer {
    public static void main(String[] args) throws MQClientException {
        //創建消費者
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
        //設置NameServer地址
        consumer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
        //設置消費者實例名稱
        consumer.setInstanceName("consumer");
        //訂閱topic
        consumer.subscribe("wn03","TagA");
        //監聽消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //獲取消息
                for (MessageExt msg:list){
                    System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
                }
//網絡延遲 try { Thread.sleep(600000); } catch (InterruptedException e) { e.printStackTrace(); } //消息成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started..."); } }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM