RocketMQ學習筆記(10)----RocketMQ的Producer 事務消息使用


1. 事務消息原理圖

 RocketMQ除了支持普通消息,順序消息之外,還支持了事務消息。

1. 什么是分布式事務?

  分布式事務就是指事務的參與者、支持事務的服務器、資源服務器以及事務管理器分別位於不同的分布式系統的不同節點之上。以上是百度百科的解釋,簡單的說,就是一次大的操作由不同的小操作組成,這些小的操作分布在不同的服務器上,且屬於不同的應用,分布式事務需要保證這些小操作要么全部成功,要么全部失敗。本質上來說,分布式事務就是為了保證不同數據庫的數據一致性。

2. RocketMQ中分布式事務的使用

  在RocketMQ中分布式事務執行過程分為三個階段,RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段通過第一階段拿到的地址去訪問消息,並修改消息的狀態。當RocketMQ確認消息發送失敗時,RocketMQ會定期掃描消息集群中的事物消息,如果發現了Prepared消息,它會向消息發送端(生產者)確認,RocketMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。

  實現方式:

  Producer實現:

package com.wangx.rocketmq.transaction;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        //初始化TransactionListenerImpl
        TransactionListener transactionListener = new TransactionListenerImpl();

        //創建事物transactionProducer
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        //由於本地回調監聽跟消息的發送會並發進行,所以可以使用線程池來執行操作
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setNamesrvAddr("47.105.149.61:9876;47.105.145.123:9876");
        //設置線程池
        producer.setExecutorService(executorService);
        //設置事物監聽
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

  事務監聽實現方式:

package com.wangx.rocketmq.transaction;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    /**
     * send()
     * @param msg send中的message對象,
     * @param arg send方法中回調函數之后的傳入的參數
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        //提交本地事物
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }

    //更新本地事物的最終狀態
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

RocketMQ事物的三種狀態:

  ROLLBACK_MESSAGE:回滾事務

  COMMIT_MESSAGE: 提交事務

  UNKNOW: broker會定時的回查Producer消息狀態,直到徹底成功或失敗。

     當executeLocalTransaction方法返回ROLLBACK_MESSAGE時,表示直接回滾事務,當返回COMMIT_MESSAGE提交事務

  當返回UNKNOW時,Broker會在一段時間之后回查checkLocalTransaction,根據checkLocalTransaction返回狀態執行事務的操作(回滾或提交),

  如示例中,當返回ROLLBACK_MESSAGE時消費者不會收到消息,且不會調用回查函數,當返回COMMIT_MESSAGE時事務提交,消費者收到消息,當返回UNKNOW時,在一段時間之后調用回查函數,並根據status判斷返回提交或回滾狀態,返回提交狀態的消息將會被消費者消費,所以此時消費者可以消費部分消息。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM