RocketMQ學習筆記(13)----RocketMQ的Consumer消息重試


1. 概念

  Producer端重試:

  生產者端的消息失敗,也就是Producer往MQ上發消息沒有發送成功,比如網絡抖動導致生產者發送消息到MQ失敗。 
這種消息失敗重試我們可以手動設置發送失敗重試的次數。

  Consumer端重試:

  Consumer消費消息失敗后,要提供一種重試機制,令消息再消費一次,Consumer消費消息失敗通常可以認為有以下幾種情況

  1. 由於消息本身的原因,例如反序列化失敗,消息數據本身無法處理(例如話費充值,當前消息的手機被注銷,無法充值)等。這種錯誤通常需要跳過這條消息,再消費其他消息,而且這條失敗消息即使立刻重試消費,99%也不成功,所以最后提供一種定時的重試機制,即過10s再重試。

  2. 由於依賴下游應用服務不可用,例如db連接不可用,外系統網絡不可達等。

  遇到這種錯誤,即使跳過當前失敗的消息,消費其他消息也會報錯,這種情況下建議應用sleep 30s,再消費下一條消息,這樣可以減輕Broker重試消息的壓力。

2. Broker消息重試策略

  查看broker.log文件,可以看到啟動有很多的啟動參數,其中有一條如下:

  這里就表示的是消息重試的時間,1s,5s....的間隔時間后再進行消息的重試,這里是消息消費的消息重試。

3. Producer端消息重試實現

package com.wangx.rocketmq.quickstart;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class Producer {

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("myGroup");

        producer.setNamesrvAddr("localhost:9876");

        producer.start();
     //重試三秒 producer.setRetryTimesWhenSendFailed(
3); for (int i = 0; i < 10; i++) { Message message = new Message("MyTopic", "tabA", ("Hello World" + i).getBytes());
       //超時時間 SendResult result
= producer.send(message,100); System.out.println(result); } producer.shutdown(); } }

如果消息在100ms之內發送失敗,就重試三次

4. Consumer使用方式

  在Consumer中,當消費消息的處理過程中,出現異常時,我們通常返回的是RECONSUME_LATER,表示一會兒之后再重試,當返回了這個狀態之后,broker就會按照2中的時間間隔來重試消息。當然,最大也只能重試到2h.

  在實際的運用場景中,我們並不想要消息無止境的一直重試下去,可能我們回想要消息重試幾次之后,還是不能成功的情況下就將這條消息存儲到db或log文件中,所以此時我們可以這樣實現:

package com.wangx.rocketmq.quickstart;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
public class Consumer {

    public static void main(String[] args) throws MQClientException {
        final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyConsumerGroup");

        consumer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe("MyTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    MessageExt ext = msgs.get(0);
                    int x = 0;
                    String topic = ext.getTopic();
                    String body = new String(ext.getBody(),"utf-8");
                    if (Integer.parseInt(body) % 2 == 0) {
                        //產生異常
                        x = Integer.parseInt(body) / 0;
                    }
                    System.out.println("收到來自topic: " + topic + ",的消息:" + body);
                } catch (Exception e) {
                    try {
                        MessageExt ext = msgs.get(0);
                        String topic = ext.getTopic();
                        String body = new String(ext.getBody(),"utf-8");
                        if (ext.getReconsumeTimes() == 3) {

                            //模擬將消息保存到db或日志文件中,返回成功狀態,使消息不再重試
                            System.out.println("保存成功消息:" + body + "成功!!!");
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        System.err.println("err:收到來自topic: " + topic + ",的消息:" + body);
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }

        });
        consumer.start();
    }
}

  控制台打印結果如下:

收到來自topic: MyTopic,的消息:1
err: 收到來自topic: MyTopic,的消息:2
err: 收到來自topic: MyTopic,的消息:6
err: 收到來自topic: MyTopic,的消息:2
err: 收到來自topic: MyTopic,的消息:6
收到來自topic: MyTopic,的消息:7
err: 收到來自topic: MyTopic,的消息:0
err: 收到來自topic: MyTopic,的消息:8
err: 收到來自topic: MyTopic,的消息:4
收到來自topic: MyTopic,的消息:3
收到來自topic: MyTopic,的消息:5
收到來自topic: MyTopic,的消息:9
err: 收到來自topic: MyTopic,的消息:0
err: 收到來自topic: MyTopic,的消息:8
err: 收到來自topic: MyTopic,的消息:4
err: 收到來自topic: MyTopic,的消息:2
err: 收到來自topic: MyTopic,的消息:6
err: 收到來自topic: MyTopic,的消息:0
err: 收到來自topic: MyTopic,的消息:8
err: 收到來自topic: MyTopic,的消息:4
保存成功消息:2成功!!!
保存成功消息:6成功!!!
保存成功消息:0成功!!!
保存成功消息:8成功!!!
保存成功消息:4成功!!

  可以看到奇數的時候,正常消費,當消息為偶數時,會拋出異常,此時返回的是RECONSUME_LATER,所以消息將會重試消費,在MessageExt中保存了一個屬性叫reconsumeTimes,表示消息重試次數,我們這里使用當消息重試三次之后,模擬將消息保存到db或日志文件中的操作,然后返回CONSUME_SUCCESS,結束消息的重試。這樣就可以保證消息出現異常時我們可以做適當的操作避免消息一直重試或對於消息無法消費情況做一些補償操作。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM