rabbitmq消息ACK確認機制及發送失敗處理


rabbitmq為確保消息發送和接收成功,采用ack機制。
(1)生產者producter發送消息到mq時,mq會發送ack給producter告知消息是否投遞成功;
(2)消費者consumer接收處理消息后,consumer會發送ack給mq告知消息是否處理成功;
通過ack機制,確保消息能夠被producter成功發送和consumer成功接收處理,保證消息不丟失。

1、消息發送
rabbitmq消息發送分為兩個階段:
(1)producter將消息發送到broker,即發送到exchage交換機;
(2)消息通過交換機exchange被路由到隊列queue;
消息只有被正確投遞到隊列queue中,才算發送成功。

消息發送代碼:


    public boolean send(String queueName, String json, String msgId){
        Message message = MessageBuilder.withBody(json.getBytes()).setCorrelationId(msgId).build();
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//設置消息持久化
        CorrelationDataExt correlationData = new CorrelationDataExt();
        correlationData.setId(msgId);
        correlationData.setData(json);
        rabbitTemplate.setEncoding("UTF-8");
        rabbitTemplate.setMandatory(true);//設置手工ack確認
        rabbitTemplate.setConfirmCallback(this);//ack回調
        rabbitTemplate.setReturnCallback(this);//回退回調
        rabbitTemplate.convertAndSend(queueName, message, correlationData);
        return true;
    }



在消息發送之前,我們要設置ack機制相關參數:
setMandatory:設置手工確認ack;
setConfirmCallback:設置消息發送到exchange結果回調;
setReturnCallback:設置消息投遞到queue失敗回退時回調;

通過上述兩個回調方法,我們能夠對發送失敗的消息進行重發處理,確保消息不丟失。

2、消息發送失敗
根據rabbitmq發送過程,消息發送失敗的有三種情況會出現:
(1)producter連接mq失敗,消息沒有發送到mq
(2)producter連接mq成功,但是發送到exchange失敗
(3)消息發送到exchange成功,但是路由到queue失敗;

3、發送失敗處理
(1)producter連接mq失敗,消息沒有發送到mq
這種情況下,在發送消息時可以通過捕捉AmqpException異常,將消息保存db中后續進行重發處理。

        try{
            rabbitTemplate.convertAndSend(queueName, message, correlationData);
        }catch (Exception e){
            logger.error("連接MQ失敗", e);
            //todo 存儲到db中進行重發
        }



(2)producter連接mq成功,但是發送到exchange失敗
通過實現ConfirmCallback接口,對發送結果進行處理。

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String msgId = correlationData.getId();
        if(ack){
            //發送成功
            logger.debug("ack,消息投遞到exchange成功,msgId:{}",msgId);
        }else{
            //發送失敗,重試
            logger.error("ack,消息投遞exchange失敗,msgId:{},原因{}" ,msgId, cause);
           
        }
    }

confirm方法有3個參數,correlationData是消息發送時攜帶的數據對象,ack消息是否成功發送到exchange,cause是發送失敗時的原因。
通過ack我們可以判斷發送到exchange是否成功,如果ack=false,則我們進行失敗處理。
但是這里存在一個問題,correlationData里面只有一個id屬性,沒有關於消息內容的屬性,對於數據失敗處理非常不方便。
為解決此問題,我們可以自定義一個CorrelationData擴展對象,繼承CorrelationData,並添加自己想要保存數據的屬性,在消息發送時,攜帶相關數據在該對象上即可。

自定義CorrelationData對象:

/**
 * CorrelationData的自定義實現,用於拿到消息內容
 */
public class CorrelationDataExt extends CorrelationData {
    //數據
    private volatile Object data;
    //隊列
    private String queueName;

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }

    public String getQueueName() {
        return queueName;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }
}


重寫發送方法,使用CorrelationDataExt對象攜帶數據:

    public boolean send(String queueName, String json, String msgId){
        Message message = MessageBuilder.withBody(json.getBytes()).setCorrelationId(msgId).build();
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//設置消息持久化

        //使用自定義的數據對象
        CorrelationDataExt correlationData = new CorrelationDataExt();
        correlationData.setId(msgId);
        correlationData.setData(json);
        correlationData.setQueueName(queueName);

        rabbitTemplate.setEncoding("UTF-8");
        rabbitTemplate.setMandatory(true);//設置手工ack確認
        rabbitTemplate.setConfirmCallback(this);//設置發送成功回調
        rabbitTemplate.setReturnCallback(this);//設置消息回退回調
        try{
            rabbitTemplate.convertAndSend(queueName, message, correlationData);//使用amqp default exchange direct
        }catch (Exception e){
            logger.error("MQ連接失敗,請聯系管理員處理!!!!");
            //保存到db重發
            saveToDB(msgId, json, queueName, "90");
        }
        return true;
    }

重寫confirm方法,對CorrelationData進行處理:


    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String msgId = correlationData.getId();
        if(ack){
            //發送成功
            logger.debug("ack,消息投遞到exchange成功,msgId:{}",msgId);
        }else{
            //發送失敗,重試
            logger.error("ack,消息投遞exchange失敗,msgId:{},原因{}" ,msgId, cause);
            if(correlationData instanceof CorrelationDataExt){
                CorrelationDataExt correlationDataExt = (CorrelationDataExt) correlationData;
                String message = (String) correlationDataExt.getData();
                String queueName = ((CorrelationDataExt) correlationData).getQueueName();
                saveToDB(msgId, message, queueName, "91");
            }else{
                logger.info("correlationData對象不包含數據");
            }
        }
    }

 

(3)消息發送到exchange成功,但是路由到queue失敗
通過實現ReturnCallback接口,對回退消息進行重發處理。

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.error("消息發送失敗-消息回退,應答碼:{},原因:{},交換機:{},路由鍵:{}", replyCode, replyText, exchange, routingKey);
        String msgId = message.getMessageProperties().getCorrelationId();
        String data = new String(message.getBody());
        saveToDB(msgId, data, routingKey, "92");
    }



關於對失敗消息的處理,我這里是統一保存到DB中,后續通過定時任務進行重發處理的。

通過以上3個方面對失敗消息的處理,可以確保消息能夠成功發送到mq,確保不丟失。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM