消息手動確認模式的幾點說明
-
監聽的方法內部必須使用channel進行消息確認,包括消費成功或消費失敗
-
如果不手動確認,也不拋出異常,消息不會自動重新推送(包括其他消費者),因為對於rabbitmq來說始終沒有接收到消息消費是否成功的確認,並且Channel是在消費端有緩存的,沒有斷開連接
-
如果rabbitmq斷開,連接后會自動重新推送(不管是網絡問題還是宕機)
-
如果消費端應用重啟,消息會自動重新推送
-
如果消費端處理消息的時候宕機,消息會自動推給其他的消費者
-
如果監聽消息的方法拋出異常,消息會按照listener.retry的配置進行重發,但是重發次數完了之后還拋出異常的話,消息不會重發(也不會重發到其他消費者),只有應用重啟后會重新推送。因為retry是消費端內部處理的,包括異常也是內部處理,對於rabbitmq是不知道的(此場景解決方案后面有)
-
spring.rabbitmq.listener.retry配置的重發是在消費端應用內處理的,不是rabbitqq重發
-
可以配置MessageRecoverer對異常消息進行處理,此處理會在listener.retry次數嘗試完並還是拋出異常的情況下才會調用,默認有兩個實現:
- RepublishMessageRecoverer:將消息重新發送到指定隊列,需手動配置,如:
-
-
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
-
return new RepublishMessageRecoverer(rabbitTemplate, "exchangemsxferror", "routingkeymsxferror");
-
}
- RejectAndDontRequeueRecoverer:如果不手動配置MessageRecoverer,會默認使用這個,實現僅僅是將異常打印拋出,源碼如下:
-
public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
-
-
protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class);
-
-
-
public void recover(Message message, Throwable cause) {
-
if (this.logger.isWarnEnabled()) {
-
this.logger.warn("Retries exhausted for message " + message, cause);
-
}
-
throw new ListenerExecutionFailedException("Retry Policy Exhausted", new AmqpRejectAndDontRequeueException(cause), message);
-
}
-
-
}
-
可以通過給隊列(Queue)綁定死信隊列,使用nack反饋給mq,會將消息轉發到死信隊列里面,此種方式需要自己在消費消息的方法內部將異常處理掉
-
//聲明隊列,並給隊列增加x-dead-letter-exchange和x-dead-letter-routing-key參數,用於指定死信隊列的路由和routingKey
-
@Bean
-
public Queue queue(){
-
Map<String, Object> args = new HashMap<String, Object>();
-
args.put( "x-dead-letter-exchange",IntegralConstant.DEAD_EXCHANGE_NAME);
-
args.put( "x-dead-letter-routing-key",IntegralConstant.DEAD_ROUTING_KEY);
-
return new Queue(IntegralConstant.QUEUE_NAME, true, false, false, args);
-
}
-
-
//消息確認時使用nack,並且requeue參數傳false
-
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
- 消息變成死信有以下幾種情況:
- 消息被拒絕(basic.reject/ basic.nack)並且requeue=false
- 消息TTL過期(參考:RabbitMQ之TTL(Time-To-Live 過期時間))
- 隊列達到最大長度
總結
- 消息監聽內必須使用channel對消息進行確認,不管是確認消費成功還是確認消費失敗
- 消息監聽內的異常處理有兩種方式:
- 內部catch后直接處理,然后使用channel對消息進行確認
- 配置RepublishMessageRecoverer將處理異常的消息發送到指定隊列專門處理或記錄
- 監聽的方法內拋出異常貌似沒有太大用處。因為拋出異常就算是重試也非常有可能會繼續出現異常,當重試次數完了之后消息就只有重啟應用才能接收到了,很有可能導致消息消費不及時。當然可以配置RepublishMessageRecoverer來解決,但是萬一RepublishMessageRecoverer發送失敗了呢。。那就可能造成消息消費不及時了。所以即使需要將處理出現異常的消息統一放到另外隊列去處理,個人建議兩種方式:
- catch異常后,手動發送到指定隊列,然后使用channel給rabbitmq確認消息已消費
- 給Queue綁定死信隊列,使用nack(requque為false)確認消息消費失敗