RocketMQ(5)---RocketMQ重試機制


RocketMQ重試機制

消息重試分為兩種:Producer發送消息的重試Consumer消息消費的重試

一、Producer端重試

Producer端重試是指: Producer往MQ上發消息沒有發送成功,比如網絡原因導致生產者發送消息到MQ失敗。

看一下代碼:

@Slf4j
public class RocketMQTest {
    /**
     * 生產者組
     */
    private static String PRODUCE_RGROUP = "test_producer";
  
    public static void main(String[] args) throws Exception {
        //1、創建生產者對象
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCE_RGROUP);
        //設置重試次數(默認2次)
        producer.setRetryTimesWhenSendFailed(3000);
        //綁定name server
        producer.setNamesrvAddr("74.49.203.55:9876");
        producer.start();
        //創建消息
        Message message = new Message("topic_family", ("小小今年3歲" ).getBytes());
        //發送 這里填寫超時時間是5毫秒 所以每次都會發送失敗
        SendResult sendResult = producer.send(message,5);
        log.info("輸出生產者信息={}",sendResult);
    }
}

超時重試 針對網上說的超時異常會重試的說法都是錯誤的,想想都覺得可怕,我查的所以文章都說超時異常都會重試,難道這么多人都沒有去測試一下 或者去看個源碼。

我發現這個問題,是因為我上面超時時間設置為5毫秒 ,按照正常肯定會報超時異常,但我設置1次重試和3000次的重試,雖然最終都會報下面異常,但輸出錯誤時間報

顯然不應該是一個級別。但測試發現無論我設置的多少次的重試次數,報異常的時間都差不多。

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

針對這個疑惑,我去看了源碼之后,才恍然大悟。

   /**
     * 說明 抽取部分代碼
     */
    private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) {
        
        //1、獲取當前時間
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev ;
        //2、去服務器看下有沒有主題消息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            //3、通過這里可以很明顯看出 如果不是同步發送消息 那么消息重試只有1次
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            //4、根據設置的重試次數,循環再去獲取服務器主題消息
            for (times = 0; times < timesTotal; times++) {
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                beginTimestampPrev = System.currentTimeMillis();
                long costTime = beginTimestampPrev - beginTimestampFirst;
                //5、前后時間對比 如果前后時間差 大於 設置的等待時間 那么直接跳出for循環了 這就說明連接超時是不進行多次連接重試的
                if (timeout < costTime) {
                    callTimeout = true;
                    break;

                }
                //6、如果超時直接報錯
                if (callTimeout) {
                    throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
                }
        }
    }

通過這段源碼很明顯可以看出以下幾點

  1. 如果是異步發送 那么重試次數只有1次
  2. 對於同步而言,超時異常也是不會再去重試
  3. 如果發生重試是在一個for 循環里去重試,所以它是立即重試而不是隔一段時間去重試。

真是實踐出真知!!!


二、 Consumer端重試

消費端比較有意思,而且在實際開發過程中,我們也更應該考慮的是消費端的重試。

消費者端的失敗主要分為2種情況,ExceptionTimeout

1、Exception

@Slf4j
@Component
public class Consumer {
    /**
     * 消費者實體對象
     */
    private DefaultMQPushConsumer consumer;
    /**
     * 消費者組
     */
    public static final String CONSUMER_GROUP = "test_consumer";
    /**
     * 通過構造函數 實例化對象
     */
    public Consumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr("47.99.203.55:9876;47.99.203.55:9877");
        //訂閱topic和 tags( * 代表所有標簽)下信息
        consumer.subscribe("topic_family", "*");
        //注冊消費的監聽 並在此監聽中消費信息,並返回消費的狀態信息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            //1、獲取消息
            Message msg = msgs.get(0);
            try {
                //2、消費者獲取消息
                String body = new String(msg.getBody(), "utf-8");
                //3、獲取重試次數
                int count = ((MessageExt) msg).getReconsumeTimes();
                log.info("當前消費重試次數為 = {}", count);
                //4、這里設置重試大於3次 那么通過保存數據庫 人工來兜底
                if (count >= 2) {
                    log.info("該消息已經重試3次,保存數據庫。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                //直接拋出異常
                throw new Exception("=======這里出錯了============");
                //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        //啟動監聽
        consumer.start();
    }
}

這里的代碼意思很明顯: 主動拋出一個異常,然后如果超過3次,那么就不繼續重試下去,而是將該條記錄保存到數據庫由人工來兜底。

看下運行結果

注意 消費者和生產者的重試還是有區別的,主要有兩點

1、默認重試次數:Product默認是2次,而Consumer默認是16次

2、重試時間間隔:Product是立刻重試,而Consumer是有一定時間間隔的。它照1S,5S,10S,30S,1M,2M····2H進行重試。
3、Product在異步情況重試失效,而對於Consumer在廣播情況下重試失效。

2、Timeout

說明 這里的超時異常並非真正意義上的超時,它指的是指獲取消息后,因為某種原因沒有給RocketMQ返回消費的狀態,即沒有return ConsumeConcurrentlyStatus.CONSUME_SUCCESSreturn ConsumeConcurrentlyStatus.RECONSUME_LATER

那么 RocketMQ會認為該消息沒有發送,會一直發送。因為它會認為該消息根本就沒有發送給消費者,所以肯定沒消費。

做這個測試很簡單。

        //1、消費者獲得消息
        String body = new String(msg.getBody(), "utf-8");
        //2、獲取重試次數
        int count = ((MessageExt) msg).getReconsumeTimes();
        log.info("當前消費重試次數為 = {}", count);
        //3、這里睡眠60秒
        Thread.sleep(60000);
       log.info("休眠60秒 看還能不能走到這里。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body);
        //返回成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

當獲得 當前消費重試次數為 = 0 后 , 關掉該進程。再重新啟動該進程,那么依然能夠獲取該條消息

consumer消費者  當前消費重試次數為 = 0
休眠60秒 看還能不能走到這里。topic=topic_family,keys=1a2b3c4d5f,msg=小小今年3歲


只要自己變優秀了,其他的事情才會跟着好起來(上將2)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM