1. 消息發送重試機制
1. 簡介
producer對發送失敗的消息進行重新發送的機制,稱為消息發送重試機制,也稱為消息重投機制。
有一些限制:
- 生產者在發送消息時,若采用同步或異步發送方式,發送失敗會重試,但oneway 消息發送方式發送失敗是沒有重試機制的。
- 只有普通消息有重試,順序消息沒有重試
- 消息重投機制會造成消費消息重復消費。一般不會發送消息重復,在出現消息量大、網絡抖動,消息重復就成為大概率事件。producer主動重發、consumer負載變化(發生Rebalance,不會導致消息重復,但可能出現重復消費)也會導致重復消息。消息重復無法避免,需要避免消息的重復消費
- 避免消息重復消費的解決方案:為消息添加唯一標識(例如消息key),使消費者進行判斷
- 消息發送重試有三種策略可以選擇:同步發送失敗策略、異步發送失敗策略、消息刷盤失敗策略
2. 同步發送失敗策略
對於普通消息,消息發送默認采用輪詢策略來選擇發送到的隊列。如果發送失敗,默認重試2次。在重試時如果有其他broker的時候會選擇其他broker;當只有一個broker的時候也只能發送到該broker,會盡量選擇該Broker的其他Queue。同時,broker還具有失敗隔離功能,使producer盡量選擇未失敗的Broker 作為目標Broker。超過重試次數,則拋出異常,由producer 去保證消息不丟失。源代碼如下:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 默認次數是重試次數2次 + 1次發送次數
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
......
producer也可以修改一些默認參數:
DefaultMQProducer producer = new DefaultMQProducer("syncProducer");
producer.setNamesrvAddr("192.168.13.111:9876");
// 設置發送失敗的重試次數,默認是2次
producer.setRetryTimesWhenSendFailed(4);
// 設置消息發送超時時長是5s,默認是3s
producer.setSendMsgTimeout(5 * 1000);
3. 異步發送失敗策略
異步失敗發送失敗時,異步重試不會選擇其他broker,僅在一個broker 做重試,所以該策略無法保證消息不丟失。
DefaultMQProducer producer = new DefaultMQProducer("asyncProducer");
producer.setNamesrvAddr("192.168.13.111:9876");
// 指定異步發送失敗后不進行消息重試
producer.setRetryTimesWhenSendAsyncFailed(0);
producer.start();
4. 消息刷盤失敗策略
消息刷盤超時(master或者slave)或slave不可用時,默認是不會將消息嘗試發送到其他broker的。可以在配置文件設置 restryAnotherBrokerWhenNotStoreOK=true 來開啟。
2. 消息消費重試機制
1. 順序消息消費重試
為了保證順序消息的順序性,消費失敗后會自動不斷地進行消息重試,直到消費成功。消費重試,默認間隔時間為1000ms。重試期間應該會出現消費被阻塞的情況。
注意: 順序消息沒有發送重試,但是有消費重試。對於順序消息的消費,要注意其一直重復消費,避免永久性阻塞。
2. 無序消息消費重試
(1) 簡介
對於無序消息(普通消息、延時消息、事務消息),可以通過設置返回狀態達到消息重試的效果。不過,需要注意的是,無序消息的重試只對集群消費方式生效。也就是廣播消費模式下,消費失敗的消息沒有重試。
(2) 重試次數與間隔
默認最多重試16次,每次就間隔不同,會逐漸變長。重試完之后仍然失敗,消息會投遞到死信隊列。其消息重試時間如下:
10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
也可以設置消息的最大消費次數:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myTestConsumerGroup");
/**
* 默認時間: 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 修改后時間規則: 如果次數小於十六,按原來時間執行; 超過16 每次都是2小時
* 對於ConsumerGroup, 修改一個會影響當前consumerGroup的所有實例,采用覆蓋的方式以最后一次修改為准(因為規則跑在mq服務端)
*/
consumer.setMaxReconsumeTimes(10);
(3) 簡單理解
對於需要重試消費的消息,是將這些需要重試的消息放入到了一個特殊Topic的隊列中,這個隊列就是重試隊列。
當出現需要進行重試消費時,broker會為每個消費組都設置一個名稱為%RETRY%consumerGroupName的重試隊列。
這個重試隊列是為消費者組設置的,而不是topic(一個topic可以被多個組進行消費)
只有當出現重試消費的消息時,才會為該組創建重試隊列
測試如下:
[root@redisnode01 consumequeue]# pwd
/root/store/consumequeue
[root@redisnode01 consumequeue]# ll | grep myTest
drwxr-xr-x. 3 root root 15 Mar 24 08:18 %DLQ%myTestConsumerGroup
drwxr-xr-x. 3 root root 15 Mar 22 22:39 %RETRY%myTestConsumerGroup
也可以通過偏移量文件進行查看:(/root/store/config/consumerOffset.json)
{
"offsetTable":{
"syncTopic@LitePullConsumer":{0:115,1:118,2:117,3:115
},
"dlqTopic@myTestConsumerGroup2":{0:5,1:7,2:6,3:7
},
"txTopic@myTestConsumerGroup":{0:4,1:3,2:4,3:4
},
"syncTopic@myTestConsumerGroup":{0:145,1:146,2:145,3:144
},
"%RETRY%myTestConsumerGroup@myTestConsumerGroup":{0:335
},
"syncTopic@myTestConsumerGroup2":{0:145,1:146,2:145,3:144
},
"%RETRY%myTestConsumerGroup2@myTestConsumerGroup2":{0:75
},
"RMQ_SYS_TRANS_HALF_TOPIC@CID_RMQ_SYS_TRANS":{0:48
},
"dlqTopic@myTestConsumerGroup":{0:4,1:6,2:5,3:5
},
"RMQ_SYS_TRANS_OP_HALF_TOPIC@CID_RMQ_SYS_TRANS":{0:32
},
"batchTest@myTestConsumerGroup":{0:145567,1:176453,2:169162,3:122128
},
"filterTopic@myTestConsumerGroup":{0:22,1:23,2:17,3:18
}
}
}
(4) 實現原理
從上面可以看出消息重試的時間間隔與延遲消息的延遲等級十分相似(除了沒有延遲消息的前兩個等級)。broker對於重試消息的處理是通過延時消息實現的。先將消息按保存到主題 SCHEDULE_TOPIC_XXXX 的隊列中(根據等級對應選擇隊列),延遲時間到了后會將消息重新投遞到主題為%RETRY%consumerGroupName 的隊列中。可以查看SCHEDULE_TOPIC_XXXX 隊列目錄:
[root@redisnode01 consumequeue]# pwd
/root/store/consumequeue
[root@redisnode01 consumequeue]# ls SCHEDULE_TOPIC_XXXX/
0 10 11 12 13 14 15 16 17 2 3 4 5 6 7 8 9
3. 消息重試返回碼
集群消費模式下監聽器重復消費如下:
- 返回org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus#RECONSUME_LATER(建議這種)
- 拋出異常
- 返回null
集群消費模式下監聽器取消重復消費:
- 自己try...catch 直接返回 org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus#CONSUME_SUCCESS
3. 死信隊列
1. 簡介
一條消息達到指定的重試次數之后,依賴消費失敗,則消息進入一個特殊的隊列。這個隊列就是死信隊列(Dead-Letter Queue),里面的消息稱為死信消息。死信隊列是針對消費者組的,其名稱為%DLQ%consumerGroupName。比如:
[root@redisnode01 consumequeue]# ll | grep myTest
drwxr-xr-x. 3 root root 15 Mar 24 08:18 %DLQ%myTestConsumerGroup
drwxr-xr-x. 3 root root 15 Mar 22 22:39 %RETRY%myTestConsumerGroup
drwxr-xr-x. 3 root root 15 Mar 28 08:44 %RETRY%myTestConsumerGroup2
[root@redisnode01 consumequeue]# ls %DLQ%myTestConsumerGroup/
0
2. 死信隊列特征
- 死信隊列的消息不會被消費者正常消費,即DLQ對於消費者不可見
- 死信存儲有效期與正常消息一個,均為3天,3天后會被自動刪除
- 死信隊列其實就是一個特殊的topic,名稱為%DLQ%consumerGroupName, 也就是每個消費者組都有一個死信隊列
- 如果一個消費者組未產生死信消息,不會為其創建該topic
3. 測試
如果想一個消息進入死信隊列,可以指定消息重復消費次數,然后返回非正常的狀態碼:
package com.zd.bx.rocketmq.dlq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myTestConsumerGroup3");
/**
* 默認時間: 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 修改后時間規則: 如果次數小於十六,按原來時間執行; 超過16 每次都是2小時
* 對於ConsumerGroup, 修改一個會影響當前consumerGroup的所有實例,采用覆蓋的方式以最后一次修改為准(因為規則跑在mq服務端)
*/
consumer.setMaxReconsumeTimes(2);
// 設置線程數量
consumer.setConsumeThreadMax(4);
consumer.setConsumeThreadMin(2);
// 指定nameserver
consumer.setNamesrvAddr("192.168.13.111:9876");
// 指定消費的topic與tag
consumer.subscribe("dlqTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages, body: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
}
return null;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
服務器查看其主題下隊列:
[root@redisnode01 consumequeue]# ls -R | grep myTestConsumerGroup3
%DLQ%myTestConsumerGroup3
%RETRY%myTestConsumerGroup3
./%DLQ%myTestConsumerGroup3:
./%DLQ%myTestConsumerGroup3/0:
./%RETRY%myTestConsumerGroup3:
./%RETRY%myTestConsumerGroup3/0: