RocketMq的事務消息發送方法,消息零丟失的實現方式,代碼流程講解,干貨分享


1.消息發送mq不丟失實現方式
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @description RocketMQ事務消息發送
 */
public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
        //用來接收RocketMQ回調的監聽接口
        //這里是我們自己定義的實現執行本地事務,commit.rollback,回調查詢等邏輯
        TransactionListener transactionListener = new TransactionListenerImpl();
        //下面就是創建一個支持事務消息的Producer
        //對這個Producer指定一個生產者分組
        TransactionMQProducer producer = new TransactionMQProducer("test");

        //下面指定了一個線程池,里面包含一些線程
        //這個線程就是用來處理RocketMQ的回調函數
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                1,//核心線程數
                2,//線程池最大線程數
                1000,//超時時間
                TimeUnit.SECONDS,//時間的單位
                new ArrayBlockingQueue<Runnable>(2000),//存放阻塞線程的列表
                new ThreadFactory() {//線程創建工廠
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("TestThread");
                        return thread;
                    }
                }
        );
        //給事務消息生產者設置對應的線程池,負責執行RocketMQ的回調請求
        producer.setExecutorService(poolExecutor);
        //給生產者設置對應的回調函數
        producer.setTransactionListener(transactionListener);
        //啟動消息生產者
        producer.start();
        //虛擬一條成功的消息
        Message message = new Message(
                "successTopic"
                , "tag"
                , "key",
                ("成功的消息").getBytes(RemotingHelper.DEFAULT_CHARSET));
        //這里存放發送的half消息到內存或者磁盤文件中,后台開啟一個線程,掃描這個文件,如果超過一定時間沒有收到響應,就回滾業務
        //save(message)
        //將消息作為half消息的模式發送出去,如果發送失敗,則會收到一個異常,我們捕獲異常進行對應的異常處理即可
        try {
            TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);
            //這里可以更新或者刪除存放在本地內存或者磁盤文件的消息記錄
            //delete(message)
        }catch (Exception e){
            //half消息發送失敗
            //本地系統執行業務回滾,更新數據庫信息等操作
        }
    }
}

上面是發送RocketMq的事務消息發送方法

下面是RocketMq的事務消息發送方法的回調函數的實現類

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * @description 事務監聽實現類
 */
public class TransactionListenerImpl implements TransactionListener {

    //如果half消息發送成功了就會回調這個函數,就可以執行本地事務
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        //執行本地事務
        //根據本地一連串的事務執行結果,去選擇commit或者rollback
        try{
            //如果本地事務都執行成功了,返回commit
            return LocalTransactionState.COMMIT_MESSAGE;
        }catch (Exception e){
            //本地事務都失敗了,回滾所有的執行過的操作
            //返回rollback,標記half消息無效
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    //如果因為各種原因生產者沒有返回commit或者rollback給Broker,
    //broker會定時掃描沒有回應的half消息,然后回調這個函數
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        //查詢本地事務,是否都執行成功
        Integer status = 0;
        //Integer status = localTrans.get(messageExt.getTransactionId());
        //根據本地事務情況選擇執行commit或者rollback
        if (null != status){
            switch(status){
                case 0:return LocalTransactionState.UNKNOW;
                case 1:return LocalTransactionState.COMMIT_MESSAGE;
                case 2:return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

 以上就是RocketMq的事務消息發送方法,可以實現發送消息的零丟失,以事務的方式確保消息一定可以發送到RocketMQ

2.mq消息不丟失實現方式

  以上僅僅只能保證消息發送到mq成功,但是一定能保證消息不丟失嗎?顯然是不行的;

  假設1:消息發送到了mq,就一定進入到了磁盤文件了嗎?rocketmq是先存入os cache中,也就是內存,如果這個時候機器宕機,內存上的數據也就全部丟失了,顯然消息也會丟失

  解決方案:rocketmq默認是異步刷盤的模式,保證了數據的高吞吐量,但是有可能出現消息丟失的情況,可以改為同步刷盤策略,通過修改配置文件broker.config中flushDiskType參數為SYNC_FLUSH即可,這樣,只要mq告訴我們half消息響應成功了,就代表成功寫入了磁盤中了;

  假設2:消息寫入磁盤就一定不會丟失嗎?顯然也不能;如果磁盤損壞,那么消息也會丟失

  解決方案:基於Dledger和Raft協議的rocketmq主從架構,只要消息寫入master成功了,那么就一定會基於raft協議同步給其他的broker,就算master機器磁盤損壞,那么一定有broker存儲了同樣的消息,可以確保消息在mq上不會丟失;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM