RocketMq發送事務消息


一、RocketMq事務消息流程:

       1、首先會向broker發送一個預請求消息,消費者不可見

         2、回調執行本地事務(比如操作數據庫)

         3、事務執行成功后,再次發送消息給broker,告訴broker事務執行成功這個消息要提交,讓消費者可見。如果本地事務執行超時,會返回一個unknowbroker會發送一個消息回查,檢查消息是否執行成功。

二、RocketMq事務消息實例:

  1、引入rocketMq相關的依賴:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

 

     2、創建一個TransactionProducer類:

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
        //創建生產者並制定組名
        TransactionMQProducer producer = new TransactionMQProducer("rocketMQ_transaction_producer_group");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("192.168.***.***:9876");
        //3、指定消息監聽對象用於執行本地事務和消息回查
        TransactionListener listener = new TransactionListenerImol();
        producer.setTransactionListener(listener);
        //4、線程池
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = newThread(r);
                thread.setName("client-tanscation-msg-check-thread");
                return thread;
            }
        });
        producer.setExecutorService(executorService);
        //5、啟動producer
        producer.start();

        //6.創建消息對象,指定主題Topic、Tag和消息體 String topic, String tags, String keys, byte[] body
        Message message = new Message("Topic_transaction_demo", //主題
                "Tags", //主要用於消息過濾
                "Key_1", //消息唯一值
                ("hello-transaction").getBytes(RemotingHelper.DEFAULT_CHARSET));

        //7、發送事務消息
        TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");

        producer.shutdown();
    }
}

  3、發送事務消息還需要一個事務監聽對象,它實現TransactionListener 接口,其中有兩個方法作用分別是執行本地事務和消息回查:

public class TransactionListenerImol implements TransactionListener {
    //存儲事務狀態信息  key:事務id  value:當前事務執行的狀態
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    //執行本地事務
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        //事務id
        String transactionId = message.getTransactionId();
        //0:執行中,狀態未知 1:執行成功 2:執行失敗
        localTrans.put(transactionId, 0);
        //業務執行,本地事務,service
        System.out.println("hello-demo-transaction");
        try {
            System.out.println("正在執行本地事務---");
            Thread.sleep(60000*2);
            System.out.println("本地事務執行成功---");
            localTrans.put(transactionId, 1);
        } catch (InterruptedException e) {
            e.printStackTrace();
            localTrans.put(transactionId, 2);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    //消息回查
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        //獲取對應事務的狀態信息
        String transactionId = messageExt.getTransactionId();
        //獲取對應事務id執行狀態
        Integer status = localTrans.get(transactionId);
        //消息回查
        System.out.println("消息回查---transactionId:" + transactionId + "狀態:" + status);
        switch (status) {
            case 0:
                return LocalTransactionState.UNKNOW;
            case 1:
                return LocalTransactionState.COMMIT_MESSAGE;
            case 2:
                return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return LocalTransactionState.UNKNOW;
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM