Rabbitmq接收方消息確認


​ 所謂的消費方消息確認就是簽收模式ack,Rabbitmq默認開啟的是自動簽收模式,也就是消費者監聽到消息到達,就會自動發送ack給隊列,告訴隊列這條消息可以刪除了,這種自動簽收的模式存在消息丟失的可能,出現異常的話這條消息就丟了,要保證消息不會丟失,還是建議開啟手動簽收的模式。

一、三種簽收模式

public enum AcknowledgeMode {
    //自動確認
    NONE,
    //手動確認
    MANUAL,
    //根據情況確認
    AUTO;

    private AcknowledgeMode() {
    }

    public boolean isTransactionAllowed() {
        return this == AUTO || this == MANUAL;
    }

    public boolean isAutoAck() {
        return this == NONE;
    }

    public boolean isManual() {
        return this == MANUAL;
    }
}

二、配置文件開啟手動簽收模式

spring:
    rabbitmq:
        host: 192.168.31.70
        port: 5672
        username: guest
        password: guest
        # 發送確認
        publisher-confirms: true
        # 路由失敗回調
        publisher-returns: true
        template:
            # 必須設置成true 消息路由失敗通知監聽者,false 將消息丟棄
            mandatory: true
        #消費端
        listener:
            simple:
                # 每次從RabbitMQ獲取的消息數量
                prefetch: 1
                default-requeue-rejected: false
                # 每個隊列啟動的消費者數量
                concurrency: 1
                # 每個隊列最大的消費者數量
                max-concurrency: 1
                # 簽收模式為手動簽收-那么需要在代碼中手動ACK
                acknowledge-mode: manual

三、消費方手動簽收

@Component
@Slf4j
public class MessageHandler {


    /**
     * 郵件發送
     * @param message
     * @param channel
     * @param headers
     * @throws IOException
     */
    @RabbitListener(queues ="demo.email")
    @RabbitHandler
    public void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {

        try {

            String msg=new String(message.getBody(), CharEncoding.UTF_8);
            JSONObject jsonObject = JSON.parseObject(msg);
            jsonObject.put("messageId",headers.get("spring_returned_message_correlation"));
            log.info("---接受到消息---{}",jsonObject);
			//主動異常
			int m=1/0;
            //手動簽收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
        catch (Exception e) {
            //異常,ture 重新入隊,或者false,進入死信隊列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);

        }
    }
   }

四、channel接口的實現類

里面有三個手動簽收的方法

public class PublisherCallbackChannelImpl implements PublisherCallbackChannel, ConfirmListener, ReturnListener, ShutdownListener {

    private static final MessagePropertiesConverter converter = new DefaultMessagePropertiesConverter();
    private final Log logger;
    private final Channel delegate;
    private final ConcurrentMap<String, Listener> listeners;
    private final Map<Listener, SortedMap<Long, PendingConfirm>> pendingConfirms;
    private final Map<String, PendingConfirm> pendingReturns;
    private final SortedMap<Long, Listener> listenerForSeq;
    private final ExecutorService executor;
    private volatile Consumer<Channel> afterAckCallback;
    
    //......省略
    
    public void basicAck(long deliveryTag, boolean multiple) throws IOException {
        this.delegate.basicAck(deliveryTag, multiple);
    }

    public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
        this.delegate.basicNack(deliveryTag, multiple, requeue);
    }

    public void basicReject(long deliveryTag, boolean requeue) throws IOException {
        this.delegate.basicReject(deliveryTag, requeue);
    }
    
    //.......省略
    
    }

4.1、三個方法區別

  • basicAck 同意簽收 支持批量,設置入參mutiple為true
  • basicReject 拒絕簽收,不支持批量,支持是否重新入隊,設置入參requeue為true
  • basicNack 拒絕簽收,支持批量,支持是否重新入隊,設置入參requeue為true


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM