RocketMq发送事务消息


一、RocketMq事务消息流程:

       1、首先会向broker发送一个预请求消息,消费者不可见

         2、回调执行本地事务(比如操作数据库)

         3、事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见。如果本地事务执行超时,会返回一个unknowbroker会发送一个消息回查,检查消息是否执行成功。

二、RocketMq事务消息实例:

  1、引入rocketMq相关的依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

 

     2、创建一个TransactionProducer类:

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
        //创建生产者并制定组名
        TransactionMQProducer producer = new TransactionMQProducer("rocketMQ_transaction_producer_group");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("192.168.***.***:9876");
        //3、指定消息监听对象用于执行本地事务和消息回查
        TransactionListener listener = new TransactionListenerImol();
        producer.setTransactionListener(listener);
        //4、线程池
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = newThread(r);
                thread.setName("client-tanscation-msg-check-thread");
                return thread;
            }
        });
        producer.setExecutorService(executorService);
        //5、启动producer
        producer.start();

        //6.创建消息对象,指定主题Topic、Tag和消息体 String topic, String tags, String keys, byte[] body
        Message message = new Message("Topic_transaction_demo", //主题
                "Tags", //主要用于消息过滤
                "Key_1", //消息唯一值
                ("hello-transaction").getBytes(RemotingHelper.DEFAULT_CHARSET));

        //7、发送事务消息
        TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");

        producer.shutdown();
    }
}

  3、发送事务消息还需要一个事务监听对象,它实现TransactionListener 接口,其中有两个方法作用分别是执行本地事务和消息回查:

public class TransactionListenerImol implements TransactionListener {
    //存储事务状态信息  key:事务id  value:当前事务执行的状态
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    //执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        //事务id
        String transactionId = message.getTransactionId();
        //0:执行中,状态未知 1:执行成功 2:执行失败
        localTrans.put(transactionId, 0);
        //业务执行,本地事务,service
        System.out.println("hello-demo-transaction");
        try {
            System.out.println("正在执行本地事务---");
            Thread.sleep(60000*2);
            System.out.println("本地事务执行成功---");
            localTrans.put(transactionId, 1);
        } catch (InterruptedException e) {
            e.printStackTrace();
            localTrans.put(transactionId, 2);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    //消息回查
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        //获取对应事务的状态信息
        String transactionId = messageExt.getTransactionId();
        //获取对应事务id执行状态
        Integer status = localTrans.get(transactionId);
        //消息回查
        System.out.println("消息回查---transactionId:" + transactionId + "状态:" + status);
        switch (status) {
            case 0:
                return LocalTransactionState.UNKNOW;
            case 1:
                return LocalTransactionState.COMMIT_MESSAGE;
            case 2:
                return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return LocalTransactionState.UNKNOW;
    }
}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM