RocketMQ消息零丟失解決方案


發送丟失

  我們發送消息時,broker寫入到cache后就返回成功了,而producer只要獲取到ACK就說明消息發送成功了,反之肯定會收到一個異常,比如網絡錯誤、請求超時之內的。而當我們發送失敗后一直重試發送,能保證消息一定到達MQ嗎?比如這樣:

  

   本地事務執行完之后、數據庫已經更新了,還沒來的及發送消息,producer就掛了。那現在就數據不一致了。於是現在又改造,將本地業務和消息發送放入同一個事務中,只要有異常那就回滾。

  

   這樣看起來好像是沒有問題了,不過業務邏輯中如果涉及到redis、es的操作呢?拋出異常他們也不會回滾的。所以剛才的方式只適合簡單的業務。

  

 解決方案

   先發送half消息到broker中去,然后執行本地業務邏輯,最后再確定狀態commit/rollbact。即使確定狀態失敗了,broker也會主動回查消息狀態。從而保證了消息一定會發送到broker中去。

    /**
     * 事務消息
     *
     * @throws Exception
     */
    public static void tranProducer()throws Exception{
        //1.創建消息生產者producer,並制定生產者組名
        TransactionMQProducer producer = new TransactionMQProducer("test-group-1");
        //2. 設置NameServer的地址
        producer.setNamesrvAddr("192.168.200.100:9876");
        // 添加事務監聽器
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 在該方法中執行本地事務
             * @param msg
             * @param arg
             * @return
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                if (StringUtils.equals("TAGA", msg.getTags())) {
                    // 確認
                    System.out.println("A 提交。。。。。。可以被消費");
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (StringUtils.equals("TAGB", msg.getTags())) {
                    // 回滾
                    System.out.println("B 回滾。。。。。。無法被消費");
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else{
                    // 無狀態
                    System.out.println("C 忽略。。。。。。無法被消費");
                    return LocalTransactionState.UNKNOW;
                }
            }
            /**
             * 該方法時MQ進行消息事務狀態回查
             * @param msg
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("回查的消息的Tag:" + msg.getTags()+" 。。。。。。可以被消費");
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        //3.啟動producer
        producer.start();

        String[] tags = {"TAGA", "TAGB", "TAGC"};

        for (int i = 0; i < 3; i++) {
            //4.創建消息對象,指定主題Topic、Tag和消息體
            /**
             * 參數一:消息主題Topic
             * 參數二:消息Tag
             * 參數三:消息內容
             */
            Message msg = new Message("TransactionTopic", tags[i], ("Hello World" + i).getBytes());
            //5.發送消息
            SendResult result = producer.sendMessageInTransaction(msg, null);
            //發送狀態
            SendStatus status = result.getSendStatus();

            System.out.println("發送結果:" + result);

            //線程睡1秒
            TimeUnit.SECONDS.sleep(2);
        }

        //6.關閉生產者producer(不能關閉,不然拿不到回傳結果了)
        //producer.shutdown();
    }

  

Broker消息丟失

   現在我們的事務消息commit之后,那這條消息就發送成功可以消費了,但是如果這條消息僅僅只是剛寫入pageCache,還沒有寫入到磁盤文件文件呢?還是會丟失的。是的,可以用同步發送,直接寫入CommitLog不經過PageCache,那這樣就一定安全了么?如果寫入成功后恰好機器磁盤壞了,那照樣數據找不回來了。所以一定要明確一點:消息寫入MQ成功不代表消息一定不丟失。

解決方案

   此時可以采用DLedger技術實現主從同步。commit后,消息必須寫入broker group中的大多數節點才算成功。這樣同步發送后可以保證有多分數據,並且即使master宕機,DLedger也會自動選舉出一個master來。這樣就做到了broker不丟失數據。

消費丟失

  Consumer拿到消息后,先進行業務處理,然后再手動提交offset,告訴broker我處理完了。可能還沒來得及提交消息狀態consume就掛了,所以我們可以將consume做集群部署,只要沒有提交offset消息成功,broker就不認為你處理了這條消息,它會把消息交給consumer group內的其他機器去執行。但是千萬要做好冪等處理,不然重復消費的結果也是很嚴重的。

推薦架構解決方案

   魚與熊掌不可兼得,非要100%的消息不丟失,那吞吐量肯定是很低的。因為同步刷盤沒有經過os cache那么性能肯定是很低的,。一般都是  同步復制+異步刷盤 。nameserver、broker、producer、consumer 全部集群部署,nameserver保存broker節點信息;broker主動同步復制消息,但是master寫入能力有限,可以采用多台master提升寫入能力。producer、consumer作為上下游服務肯定也是要保證高可用的。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM