消費端在處理消息過程中可能會報錯,此時該如何重新處理消息呢?解決方案有以下兩種。
在redis或者數據庫中記錄重試次數,達到最大重試次數以后消息進入死信隊列或者其他隊列,再單獨針對這些消息進行處理;
使用spring-rabbit中自帶的retry功能;
第一種方案我們就不再詳細說了,我們主要來看一下第二種方案,老規矩,先上代碼:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 自動ack
retry:
enabled: true
max-attempts: 5
max-interval: 10000 # 重試最大間隔時間
initial-interval: 2000 # 重試初始間隔時間
multiplier: 2 # 間隔時間乘子,間隔時間*乘子=下一次的間隔時間,最大不能超過設置的最大間隔時間
此時我們的消費者代碼如下所示:
@RabbitHandler
@RabbitListener(queues = {"${platform.queue-name}"},concurrency = "1")
public void msgConsumer(String msg, Channel channel, Message message) throws IOException {
log.info("接收到消息>>>{}",msg);
int temp = 10/0;
log.info("消息{}消費成功",msg);
}
此時啟動程序,發送消息后可以看到控制台輸出內容如下:
可以看到重試次數是5次(包含自身消費的一次),重試時間依次是2s,4s,8s,10s(上一次間隔時間*間隔時間乘子),最后一次重試時間理論上是16s,但是由於設置了最大間隔時間是10s,因此最后一次間隔時間只能是10s,和配置相符合。
注意:
重試並不是RabbitMQ重新發送了消息,僅僅是消費者內部進行的重試,換句話說就是重試跟mq沒有任何關系;
因此上述消費者代碼不能添加try{}catch(){},一旦捕獲了異常,在自動ack模式下,就相當於消息正確處理了,消息直接被確認掉了,不會觸發重試的;
MessageReCoverer
上面的例子在測試中我們還發現了一個問題,就是經過5次重試以后,控制台輸出了一個異常的堆棧日志,然后隊列中的數據也被ack掉了(自動ack模式),首先我們看一下這個異常日志是什么。
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted
出現消息被消費掉並且出現上述異常的原因是因為在構建SimpleRabbitListenerContainerFactoryConfigurer類時使用了MessageRecoverer接口,這個接口有一個cover方法,用來實現重試完成之后對消息的處理,源碼如下:
ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) {
RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()
: RetryInterceptorBuilder.stateful();
RetryTemplate retryTemplate = new RetryTemplateFactory(this.retryTemplateCustomizers)
.createRetryTemplate(retryConfig, RabbitRetryTemplateCustomizer.Target.LISTENER);
builder.retryOperations(retryTemplate);
MessageRecoverer recoverer = (this.messageRecoverer != null) ? this.messageRecoverer
: new RejectAndDontRequeueRecoverer(); //<1>
builder.recoverer(recoverer);
factory.setAdviceChain(builder.build());
注意看<1>處的代碼,默認使用的是RejectAndDontRequeueRecoverer實現類,根據實現類的名字我們就可以看出來該實現類的作用就是拒絕並且不會將消息重新發回隊列,我們可以看一下這個實現類的具體內容:
public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class); // NOSONAR protected
@Override
public void recover(Message message, Throwable cause) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Retries exhausted for message " + message, cause);
}
throw new ListenerExecutionFailedException("Retry Policy Exhausted",
new AmqpRejectAndDontRequeueException(cause), message);
}
}
上述源碼給出了異常的來源,但是未看到拒絕消息的代碼,猜測應該是使用aop的方式實現的,此處不再繼續深究。
MessageRecoverer接口還有另外兩個實現類,分別是RepublishMessageRecoverer和ImmediateRequeueMessageRecoverer,顧名思義就是重新發布消息和立即重新返回隊列,下面我們分別測試一個這兩個實現類:
RepublishMessageRecoverer
先創建一個異常交換機和異常隊列,並將兩者進行綁定:
@Bean
public DirectExchange errorExchange(){
return new DirectExchange("error-exchange",true,false);
}
@Bean
public Queue errorQueue(){
return new Queue("error-queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error-routing-key");
}
創建RepublishMessageRecoverer:
@Bean
public MessageRecoverer messageRecoverer(){
return new RepublishMessageRecoverer(rabbitTemplate,"error-exchange","error-routing-key");
}
此時啟動服務,查看處理結果:
通過控制台可以看到,消息重試5次以后直接以新的routingKey發送到了配置的交換機中,此時再查看監控頁面,可以看原始隊列中已經沒有消息了,但是配置的異常隊列中存在一條消息。
ImmediateRequeueMessageRecoverer
再測試一下ImmediateRequeueMessageRecoverer:
@Bean
public MessageRecoverer messageRecoverer(){
return new ImmediateRequeueMessageRecoverer();
}
重試5次之后,返回隊列,然后再重試5次,周而復始直到不拋出異常為止,這樣還是會影響后續的消息消費。
總結:
通過上面的測試,對於重試之后仍然異常的消息,可以采用RepublishMessageRecoverer,將消息發送到其他的隊列中,再專門針對新的隊列進行處理。
死信隊列
除了可以采用上述RepublishMessageRecoverer,還可以采用死信隊列的方式處理重試失敗的消息。
首先創建死信交換機、死信隊列以及兩者的綁定
/**
* 死信交換機
* @return
*/
@Bean
public DirectExchange dlxExchange(){
return new DirectExchange(dlxExchangeName);
}
/**
* 死信隊列
* @return
*/
@Bean
public Queue dlxQueue(){
return new Queue(dlxQueueName);
}
/**
* 死信隊列綁定死信交換機
* @param dlxQueue
* @param dlxExchange
* @return
*/
@Bean
public Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);
}
業務隊列的創建需要做一些修改,添加死信交換機以及死信路由鍵的配置
/**
* 業務隊列
* @return
*/
@Bean
public Queue queue(){
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",dlxExchangeName);//聲明當前隊列綁定的死信交換機
params.put("x-dead-letter-routing-key",dlxRoutingKey);//聲明當前隊列的死信路由鍵
return QueueBuilder.durable(queueName).withArguments(params).build();
//return new Queue(queueName,true);
}
此時啟動服務,可以看到同時創建了業務隊列以及死信隊列
在業務隊列上出現了DLX以及DLK的標識,標識已經綁定了死信交換機以及死信路由鍵,此時調用生產者發送消息,消費者在重試5次后,由於MessageCover默認的實現類是RejectAndDontRequeueRecoverer,也就是requeue=false,又因為業務隊列綁定了死信隊列,因此消息會從業務隊列中刪除,同時發送到死信隊列中。
注意:
如果ack模式是手動ack,那么需要調用channe.nack方法,同時設置requeue=false才會將異常消息發送到死信隊列中
retry使用場景
上面說了什么是重試,以及如何解決重試造成的數據丟失,那么怎么來選擇重試的使用場景呢?
是否是消費者只要發生異常就要去重試呢?其實不然,假設下面的兩個場景:
- http下載視頻或者圖片或者調用第三方接口
- 空指針異常或者類型轉換異常(其他的受檢查的運行時異常)
很顯然,第一種情況有重試的意義,第二種沒有。
對於第一種情況,由於網絡波動等原因造成請求失敗,重試是有意義的;
對於第二種情況,需要修改代碼才能解決的問題,重試也沒有意義,需要的是記錄日志以及人工處理或者輪詢任務的方式去處理。
retry最佳實踐
對於消費端異常的消息,如果在有限次重試過程中消費成功是最好的,如果有限次重試之后仍然失敗的消息,不管是采用RejectAndDontRequeueRecoverer還是使用私信隊列都是可以的,同時也可以采用折中的方法,先將消息從業務隊列中ack掉,再將消息發送到另外的一個隊列中,后續再單獨處理異常數據的隊列。
另外,看到有人說retry只能在自動ack模式下使用,經過測試在手動ack模式下retry也是生效的,只不過不能使用catch捕獲異常,即使在自動ack模式下使用catch捕獲異常也是會導致不觸發重試的。當然,在手動ackm模式下要記得確認消息,不管是確認消費成功還是確認消費失敗,不然消息會一直處於unack狀態,直到消費者進程重啟或者停止。
如果一定要在手動ack模式下使用retry功能,最好還是確認在有限次重試過程中可以重試成功,否則超過重試次數,又沒辦法執行nack,就會出現消息一直處於unack的問題,我想這也就是所說的retry只能在自動ack模式下使用的原因,測試代碼如下:
@RabbitHandler
@RabbitListener(queues = {"${platform.queue-name}"},concurrency = "1")
public void msgConsumer(String msg, Channel channel, Message message) throws IOException {
log.info("接收到消息>>>{}",msg);
if(msg.indexOf("0")>-1){
throw new RuntimeException("拋出異常");
}
log.info("消息{}消費成功",msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}