RocketMQ事務消息


RocketMQ事務消息(Transactional Message)是指應用本地事務和發送消息操作可以被定義到全局事務中,要么同時成功,要么同時失敗。RocketMQ的事務消息提供類似 X/Open XA 的分布事務功能,通過事務消息能達到分布式事務的最終一致。

Half Message(半消息)

是指暫不能被Consumer消費的消息。Producer 已經把消息成功發送到了 Broker 端,但此消息被標記為暫不能投遞狀態,處於該種狀態下的消息稱為半消息。需要 Producer

對消息的二次確認后,Consumer才能去消費它。

消息回查

由於網絡閃段,生產者應用重啟等原因。導致 Producer 端一直沒有對 Half Message(半消息) 進行 二次確認。這是Brock服務器會定時掃描長期處於半消息的消息,會

主動詢問 Producer端 該消息的最終狀態(Commit或者Rollback),該消息即為 消息回查。

事務消息共有三種狀態,提交狀態、回滾狀態、中間狀態:

  • TransactionStatus.CommitTransaction: 提交事務,它允許消費者消費此消息。
  • TransactionStatus.RollbackTransaction: 回滾事務,它代表該消息將被刪除,不允許被消費。
  • TransactionStatus.Unknown: 中間狀態,它代表需要檢查消息隊列來確定狀態。

事務消息發送及提交:

  1. 發送消息(half消息)
  2. 服務端響應消息寫入結果
  3. 根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行)
  4. 根據本地事務狀態執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)

補償流程:

  1. 對沒有Commit/Rollback的事務消息(pending狀態的消息),從服務端發起一次“回查”
  2. Producer收到回查消息,檢查回查消息對應的本地事務的狀態
  3. 根據本地事務狀態,重新Commit或者Rollback

補償階段用於解決消息Commit或者Rollback發生超時或者失敗的情況。

一階段的消息如何對用戶不可見

事務消息相對普通消息最大的特點就是一階段發送的消息對用戶是不可見的。

如何做到寫入了消息但是對用戶不可見?——寫入消息數據,但是不創建對應的消息的索引信息。

熟悉RocketMQ的同學應該都清楚,消息在服務端的存儲結構如上,每條消息都會有對應的索引信息,Consumer通過索引讀取消息。

那么實現一階段寫入的消息不被用戶消費(需要在Commit后才能消費),只需要寫入Storage Queue,但是不構建Index Queue即可。

RocketMQ中具體實現策略是:寫入的如果事務消息,對消息的Topic和Queue等屬性進行替換,同時將原來的Topic和Queue信息存儲到消息的屬性中。

實例

假如客戶需要在電商購買東西,流程如下

1、購買商品,客戶付款,客戶賬戶金額減少,調用Order API + Pay API;

2、MQ產生Order日志,狀態Ordered;

3、商家減庫存,調用Stock API;

4、MQ產生減庫存Stock日志,狀態Stocked;

5、商家收款,調用Order API + Pay API;

6、MQ產生Order日志,狀態Finished;

現實情況會更復雜,這6個步驟只是簡單處理。

本地事務為:1、3、5

MQ事務為:2、4、6

為了能解決該問題,同時又不和業務耦合,RocketMQ提出了“事務消息”的概念。RocketMQ事務消息(Transactional Message)是指應用本地事務和發送消息操作可以被定義到全局事務中,要么同時成功,要么同時失敗。RocketMQ的事務消息提供類似 X/Open XA 的分布事務功能,通過事務消息能達到分布式事務的最終一致。

具體來說,就是把消息的發送分成了2個階段:Prepare階段和確認階段。

具體來說,上面的2個步驟,被分解成3個步驟: 
(1) 發送Prepared消息 
(2) update DB 
(3) 根據update DB結果成功或失敗,Confirm或者取消Prepared消息。

事務監聽

package com.xin.rocketmq.demo.testrun;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);

        //執行購買商品,客戶付款,客戶賬戶金額減少,調用Order API + Pay API;
        System.out.println("執行客戶付款:" + msg.getTransactionId());
        //商家減庫存,調用Stock API;
        System.out.println("執行庫存:" + msg.getTransactionId());
        //商家收款,調用Order API + Pay API;
        System.out.println("執行商家收款:" + msg.getTransactionId());

        return LocalTransactionState.UNKNOW;
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

事務生產者

package com.xin.rocketmq.demo.testrun;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.*;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("192.168.10.11:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 2; i++) {
            try {
                Message msg =
                        new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

一批次處理2個訂單,結果

SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC2D8C18B4AAC2268193040000, offsetMsgId=null, messageQueue=MessageQueue [topic=TopicTest1234, brokerName=localhost.localdomain, queueId=3], queueOffset=103]
執行客戶付款:A9FEC2CC2D8C18B4AAC2268193A20001
執行庫存:A9FEC2CC2D8C18B4AAC2268193A20001
執行商家收款:A9FEC2CC2D8C18B4AAC2268193A20001
SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC2D8C18B4AAC2268193A20001, offsetMsgId=null, messageQueue=MessageQueue [topic=TopicTest1234, brokerName=localhost.localdomain, queueId=0], queueOffset=104]
執行客戶付款:A9FEC2CC2D8C18B4AAC2268193B10002
執行庫存:A9FEC2CC2D8C18B4AAC2268193B10002
執行商家收款:A9FEC2CC2D8C18B4AAC2268193B10002
......

當發送半消息成功時,我們使用 executeLocalTransaction 方法來執行本地事務。它返回前一節中提到的三個事務狀態之一。checkLocalTranscation 方法用於檢查本地事務狀態,並回應消息隊列的檢查請求。它也是返回前一節中提到的三個事務狀態之一。

模擬兩條記錄全部成功走完事務

package com.xin.rocketmq.demo.testrun;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.System.*;

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);

        //執行購買商品,客戶付款,客戶賬戶金額減少,調用Order API + Pay API;
        out.println("執行客戶付款:" + msg.getTransactionId());
        //商家減庫存,調用Stock API;
        out.println("執行庫存:" + msg.getTransactionId());
        //商家收款,調用Order API + Pay API;
        out.println("執行商家收款:" + msg.getTransactionId());

        return LocalTransactionState.COMMIT_MESSAGE;
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
開啟消費者,發現兩條事務都確實走完了。

執行客戶付款:A9FEC2CC347C18B4AAC2269439D50000
執行庫存:A9FEC2CC347C18B4AAC2269439D50000
執行商家收款:A9FEC2CC347C18B4AAC2269439D50000
SEND_OK -- SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC347C18B4AAC2269439D50000, offsetMsgId=null, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=2], queueOffset=257]
執行客戶付款:A9FEC2CC347C18B4AAC226943A790001
執行庫存:A9FEC2CC347C18B4AAC226943A790001
執行商家收款:A9FEC2CC347C18B4AAC226943A790001
SEND_OK -- SendResult [sendStatus=SEND_OK, msgId=A9FEC2CC347C18B4AAC226943A790001, offsetMsgId=null, messageQueue=MessageQueue [topic=topic_family, brokerName=localhost.localdomain, queueId=3], queueOffset=258]

Receive message[msgId=A9FEC2CC347C18B4AAC2269439D50000] 86728ms later
Receive message[msgId=A9FEC2CC347C18B4AAC226943A790001] 87048ms later

 

模擬1條記錄在本地庫存事務失敗回滾

package com.xin.rocketmq.demo.testrun;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.System.*;

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);

        //執行購買商品,客戶付款,客戶賬戶金額減少,調用Order API + Pay API;
        out.println("執行客戶付款:" + msg.getTransactionId());
        //商家減庫存,調用Stock API;
        out.println("執行庫存:" + msg.getTransactionId());
        //商家收款,調用Order API + Pay API;
        if (status == 1)//第二個訂單庫存異常,模擬庫存失敗,需要回滾
            return LocalTransactionState.ROLLBACK_MESSAGE;
        out.println("執行商家收款:" + msg.getTransactionId());

        return LocalTransactionState.COMMIT_MESSAGE;
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                case 2:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

兩條事務,發現只有一條可以被成功消費。另一條回滾了。

Receive message[msgId=A9FEC2CC355818B4AAC2269681E00000] 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM