所謂的消費方消息確認就是簽收模式ack,Rabbitmq默認開啟的是自動簽收模式,也就是消費者監聽到消息到達,就會自動發送ack給隊列,告訴隊列這條消息可以刪除了,這種自動簽收的模式存在消息丟失的可能,出現異常的話這條消息就丟了,要保證消息不會丟失,還是建議開啟手動簽收的模式。
一、三種簽收模式
public enum AcknowledgeMode {
//自動確認
NONE,
//手動確認
MANUAL,
//根據情況確認
AUTO;
private AcknowledgeMode() {
}
public boolean isTransactionAllowed() {
return this == AUTO || this == MANUAL;
}
public boolean isAutoAck() {
return this == NONE;
}
public boolean isManual() {
return this == MANUAL;
}
}
二、配置文件開啟手動簽收模式
spring:
rabbitmq:
host: 192.168.31.70
port: 5672
username: guest
password: guest
# 發送確認
publisher-confirms: true
# 路由失敗回調
publisher-returns: true
template:
# 必須設置成true 消息路由失敗通知監聽者,false 將消息丟棄
mandatory: true
#消費端
listener:
simple:
# 每次從RabbitMQ獲取的消息數量
prefetch: 1
default-requeue-rejected: false
# 每個隊列啟動的消費者數量
concurrency: 1
# 每個隊列最大的消費者數量
max-concurrency: 1
# 簽收模式為手動簽收-那么需要在代碼中手動ACK
acknowledge-mode: manual
三、消費方手動簽收
@Component
@Slf4j
public class MessageHandler {
/**
* 郵件發送
* @param message
* @param channel
* @param headers
* @throws IOException
*/
@RabbitListener(queues ="demo.email")
@RabbitHandler
public void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
try {
String msg=new String(message.getBody(), CharEncoding.UTF_8);
JSONObject jsonObject = JSON.parseObject(msg);
jsonObject.put("messageId",headers.get("spring_returned_message_correlation"));
log.info("---接受到消息---{}",jsonObject);
//主動異常
int m=1/0;
//手動簽收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
catch (Exception e) {
//異常,ture 重新入隊,或者false,進入死信隊列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
}
四、channel接口的實現類
里面有三個手動簽收的方法
public class PublisherCallbackChannelImpl implements PublisherCallbackChannel, ConfirmListener, ReturnListener, ShutdownListener {
private static final MessagePropertiesConverter converter = new DefaultMessagePropertiesConverter();
private final Log logger;
private final Channel delegate;
private final ConcurrentMap<String, Listener> listeners;
private final Map<Listener, SortedMap<Long, PendingConfirm>> pendingConfirms;
private final Map<String, PendingConfirm> pendingReturns;
private final SortedMap<Long, Listener> listenerForSeq;
private final ExecutorService executor;
private volatile Consumer<Channel> afterAckCallback;
//......省略
public void basicAck(long deliveryTag, boolean multiple) throws IOException {
this.delegate.basicAck(deliveryTag, multiple);
}
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
this.delegate.basicNack(deliveryTag, multiple, requeue);
}
public void basicReject(long deliveryTag, boolean requeue) throws IOException {
this.delegate.basicReject(deliveryTag, requeue);
}
//.......省略
}
4.1、三個方法區別
- basicAck 同意簽收 支持批量,設置入參mutiple為true
- basicReject 拒絕簽收,不支持批量,支持是否重新入隊,設置入參requeue為true
- basicNack 拒絕簽收,支持批量,支持是否重新入隊,設置入參requeue為true