RabbitMQ - 怎么避免消息丟失?


一、數據丟失的三個場景

一條消息從生產者發送到消費者消費的過程:

可以看出,一條消息整個過程要經歷兩次的網絡傳輸:

  • 從生產者發送到RabbitMQ服務器,從RabbitMQ服務器發送到消費者
  • 在消費者未消費前存儲在隊列(Queue)中

所以可以知道,有三個場景下是會發生消息丟失的

  1. 生產者發送消息到RabbitMQ服務器過程中,RabbitMQ服務器如果宕機停止服務,消息會丟失。
  2. 存儲在隊列中,如果隊列沒有對消息持久化,RabbitMQ服務器宕機重啟會丟失數據。
  3. 消費者從RabbitMQ服務器獲取隊列中存儲的數據消費,但是消費者程序出錯或者宕機而沒有正確消費,導致數據丟失。

針對以上三種場景,RabbitMQ提供了三種解決的方式,分別是confirm機制,消息持久化,ACK事務機制。

 

二、消息發送端:confirm機制(生產者-->MQ server)

生產者發送到RabbitMQ Server時,有可能因為網絡問題導致投遞失敗,從而丟失數據。我們可以使用confirm模式防止數據丟失。工作流程是怎么樣的呢,看以下圖解:

  • 第一步,一條消息從生產者發送到RabbitMQ,首先會發送到Exchange,對應回調函數confirm()
  • 第二步,從Exchange路由分配到Queue中,對應回調函數則是returnedMessage()

注意:

  • exchange無論是否收到消息,都會回調confirm()函數
  • 只有當exchange無法路由到queue,才會調用returnedMessage()

 

代碼實現

代碼怎么實現呢,請看演示:

首先在application.yml配置文件中加上如下配置:

spring:
  rabbitmq:
    publisher-confirms: true
    publisher-returns: true
    template:
      mandatory: true

//publisher-confirms:設置為true時。當消息投遞到Exchange后,會回調confirm()方法進行通知生產者
//publisher-returns:設置為true時。當消息匹配到Queue並且失敗時,會通過回調returnedMessage()方法返回消息
//spring.rabbitmq.template.mandatory: 設置為true時。指定消息在沒有被隊列接收時會通過回調returnedMessage()方法退回。

有個小細節,publisher-returns和mandatory如果都設置的話,優先級是以mandatory優先。可以看源碼:

 

接着我們需要定義回調方法:

@Component
public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class);

    /**
     * 監聽消息是否到達Exchange
     *
     * @param correlationData 包含消息的唯一標識的對象
     * @param ack             true 標識 ack,false 標識 nack
     * @param cause           nack 投遞失敗的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("消息投遞成功~消息Id:{}", correlationData.getId());
        } else {
            logger.error("消息投遞失敗,Id:{},錯誤提示:{}", correlationData.getId(), cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.info("消息沒有路由到隊列,獲得返回的消息");
        Map map = byteToObject(message.getBody(), Map.class);
        logger.info("message body: {}", map == null ? "" : map.toString());
        logger.info("replyCode: {}", replyCode);
        logger.info("replyText: {}", replyText);
        logger.info("exchange: {}", exchange);
        logger.info("routingKey: {}", exchange);
        logger.info("------------> end <------------");
    }

    @SuppressWarnings("unchecked")
    private <T> T byteToObject(byte[] bytes, Class<T> clazz) {
        T t;
        try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
             ObjectInputStream ois = new ObjectInputStream(bis)) {
            t = (T) ois.readObject();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
        return t;
    }
}

 

我這里就簡單地打印回調方法返回的消息,在實際項目中,可以把返回的消息存儲到日志表中,使用定時任務進行進一步的處理。

我這里是使用RabbitTemplate進行發送,所以在Service層的RabbitTemplate需要設置一下:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
 @Resource
    private RabbitmqConfirmCallback rabbitmqConfirmCallback;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        //指定 ConfirmCallback
        rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback);
        //指定 ReturnCallback
        rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback);
    }
    
    @Override
    public String sendMsg(String msg) throws Exception {
        Map<String, Object> message = getMessage(msg);
        try {
            CorrelationData correlationData = (CorrelationData) message.remove("correlationData");
            rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
    
 private Map<String, Object> getMessage(String msg) {
        String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
        CorrelationData correlationData = new CorrelationData(msgId);
        String sendTime = sdf.format(new Date());
        Map<String, Object> map = new HashMap<>();
        map.put("msgId", msgId);
        map.put("sendTime", sendTime);
        map.put("msg", msg);
        map.put("correlationData", correlationData);
        return map;
    }
}

 

大功告成!接下來我們進行測試,發送一條消息,我們可以控制台:

假設發送一條信息沒有路由匹配到隊列,可以看到如下信息:

這就是confirm模式。它的作用是為了保障生產者投遞消息到RabbitMQ不會出現消息丟失。

 

 

三、消息中間件端:消息持久化(exchange 和 queue 的持久化)

RabbitMQ是支持消息持久化的,消息持久化需要設置:Exchange為持久化和Queue持久化,這樣當消息發送到RabbitMQ服務器時,消息就會持久化。

3.1 exchange的持久化

首先看Exchange交換機的類圖:

看這個類圖其實是要說明上一篇文章介紹的四種交換機都是AbstractExchange抽象類的子類,所以根據java的特性,創建子類的實例會先調用父類的構造器,父類也就是AbstractExchange的構造器是怎么樣的呢?

從上面的注釋可以看到durable參數表示是否持久化。默認是持久化(true)。創建持久化的Exchange可以這樣寫:

@Bean
    public DirectExchange rabbitmqDemoDirectExchange() {
        //Direct交換機
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
    }

 

3.2 queue的持久化

接着是Queue隊列,我們先看看Queue的構造器是怎么樣的:

也是通過durable參數設置是否持久化,默認是true。所以創建時可以不指定:

 @Bean
    public Queue fanoutExchangeQueueA() {
     //只需要指定名稱,默認是持久化的
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A);
    }

 

3.3 怎么證明持久化成功

怎么證明是已經持久化了呢,實際上可以找到對應的文件:

找到對應磁盤中的目錄:

消息持久化可以防止消息在RabbitMQ Server中不會因為宕機重啟而丟失。

 

 

四、消息消費端:ACK事務機制(隊列-->消費者)

原本:消費者從隊列中獲取到消息后,會直接確認簽收,假設消費者宕機或者程序出現異常,數據沒有正常消費,這種情況就會出現數據丟失。

修改:所以關鍵在於把自動簽收改成手動簽收,正常消費則返回確認簽收,如果出現異常,則返回拒絕簽收重回隊列。

 

代碼實現

首先在消費者的application.yml文件中設置事務提交為manual手動模式:

spring:
  rabbitmq:
    listener:
      simple:
  acknowledge-mode: manual # 手動ack模式
        concurrency: 1 # 最少消費者數量
        max-concurrency: 10 # 最大消費者數量

然后編寫消費者的監聽器:

@Component
public class RabbitDemoConsumer {

    enum Action {
        //處理成功
        SUCCESS,
        //可以重試的錯誤,消息重回隊列
        RETRY,
        //無需重試的錯誤,拒絕消息,並從隊列中刪除
        REJECT
    }

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
    public void process(String msg, Message message, Channel channel) {
        long tag = message.getMessageProperties().getDeliveryTag();
        Action action = Action.SUCCESS;
        try {
            System.out.println("消費者RabbitDemoConsumer從RabbitMQ服務端消費消息:" + msg);
            if ("bad".equals(msg)) {
                throw new IllegalArgumentException("測試:拋出可重回隊列的異常");
            }
            if ("error".equals(msg)) {
                throw new Exception("測試:拋出無需重回隊列的異常");
            }
        } catch (IllegalArgumentException e1) {
            e1.printStackTrace();
            //根據異常的類型判斷,設置action是可重試的,還是無需重試的
            action = Action.RETRY;
        } catch (Exception e2) {
            //打印異常
            e2.printStackTrace();
            //根據異常的類型判斷,設置action是可重試的,還是無需重試的
            action = Action.REJECT;
        } finally {
            try {
                if (action == Action.SUCCESS) {
                    //multiple 表示是否批量處理。true表示批量ack處理小於tag的所有消息。false則處理當前消息
                    channel.basicAck(tag, false);
                } else if (action == Action.RETRY) {
                    //Nack,拒絕策略,消息重回隊列
                    channel.basicNack(tag, false, true);
                } else {
                    //Nack,拒絕策略,並且從隊列中刪除
                    channel.basicNack(tag, false, false);
                }
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

解釋一下上面的代碼:

  • 如果沒有異常,則手動確認回復RabbitMQ服務端basicAck(消費成功)。
  • 如果拋出某些可以重回隊列的異常,我們就回復basicNack並且設置重回隊列。
  • 如果是拋出不可重回隊列的異常,就回復basicNack並且設置從RabbitMQ的隊列中刪除。

接下來進行測試,發送一條普通的消息"hello":

 

解釋一下ack返回的三個方法的意思。

①成功確認

void basicAck(long deliveryTag, boolean multiple) throws IOException;

消費者成功處理后調用此方法對消息進行確認。

  • deliveryTag:該消息的index
  • multiple:是否批量.。true:將一次性ack所有小於deliveryTag的消息。

②失敗確認

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  • deliveryTag:該消息的index。
  • multiple:是否批量。true:將一次性拒絕所有小於deliveryTag的消息。
  • requeue:被拒絕的是否重新入隊列。

③失敗確認

void basicReject(long deliveryTag, boolean requeue) throws IOException;
  • deliveryTag:該消息的index。
  • requeue:被拒絕的是否重新入隊列。

basicNack()和basicReject()的區別在於:basicNack()可以批量拒絕,basicReject()一次只能拒接一條消息。

 

五、補充方案1:設置集群鏡像模式

我們先來介紹下RabbitMQ三種部署模式:

1)單節點模式:最簡單的情況,非集群模式,節點掛了,消息就不能用了。業務可能癱瘓,只能等待。
2)普通模式:默認的集群模式,某個節點掛了,該節點上的消息不能用,有影響的業務癱瘓,只能等待節點恢復重啟可用(必須持久化消息情況下)。
3)鏡像模式:把需要的隊列做成鏡像隊列(互為鏡像的是隊列,並非節點),一個消息會存在於多個節點的隊列中,屬於RabbitMQ的HA方案(高可用方案)

下面介紹下鏡像模式的三種高可用策略模式:

1)同步至所有的broker節點
2)同步最多N個機器
3)只同步至符合指定名稱的nodes

命令處理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

1)為每個以“rock.wechat”開頭的隊列設置所有節點的鏡像,並且設置為自動同步模式
rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

2)為每個以“rock.wechat.”開頭的隊列設置兩個節點的鏡像,並且設置為自動同步模式
rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

3)為每個以“node.”開頭的隊列分配指定的節點做鏡像
rabbitmqctl set_policy ha-nodes "^nodes\." \
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

但是:HA 鏡像隊列有一個很大的缺點就是:系統的吞吐量會有所下降

 

六、補充方案2:消息補償機制

我們通過之前的方案 , 基本上已經能夠保證消息投遞成功了 ! 為什么還要消息補償機制呢? 難道消息還會丟失,沒錯,系統是在一個復雜的環境,不要想的太簡單了,雖然以上的三種方案,基本可以保證消息的高可用不丟失的問題。但是事無完全:

比如:持久化的消息,保存到硬盤過程中,當前隊列節點掛了,存儲節點硬盤又壞了,消息丟了,怎么辦?

產線網絡環境太復雜,所以不知數太多,所以要做消息補償機制 !

消息補償機制需要建立在業務數據庫和MQ數據庫的基礎之上 , 當我們發送消息時 , 需要同時將消息數據保存在數據庫中, 兩者的狀態必須記錄。 然后通過業務數據庫和MQ數據庫的對比檢查消費是否成功,不成功,進行消息補償措施,重新發送消息處理。


 

參考文獻

https://zhuanlan.zhihu.com/p/166426241 

https://www.cnblogs.com/flyrock/p/8859203.html

https://juejin.cn/post/6866647684682350600


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM