Spring Cloud Alibaba學習筆記(11) - RocketMQ事務消息


在Spring中,我們要實現事務,一般通過@Transactional注解實現。這在引入RocketMQ之前沒有問題,但是在引入了RocketMQ之后,如果消息發送之后的業務邏輯處理發生了異常的話,這時候消息已經發送出去了,就會導致業務的問題。
為了解決這一問題,RocketMQ引入了Transactional Message【事務消息】。
RocketMQ事務消息流程圖

  1. 生產者向MQServer發送半消息【半消息:會存儲進MQ Server,但是被標記為不能投遞狀態】
  2. 發送半消息成功,生產者實行本地事務
  3. 根據本地事務結果向MQ Server發送二次確認請求
  4. MQ Server根據接受到的消息投遞或者丟棄消息
  5. 若在本地事務執行過程中缺少二次確認消息或生產者處於等待狀態,MQ服務器將向同一組中的每個生產者發送檢查消息,然后繼續3,4的操作

PS:消息三態

  • Commit:提交事務信息,消費者可以消費此消息
  • Rollback:回滾事務消息,broker會刪除這條消息,消費者不能消費
  • UNKNOWN:broker需要回查確認消息狀態

代碼實現

發送半消息

利用rocketMQTemplate類的sendMessageInTransaction實現半消息發送
第一個參數為txProducerGroup:就是group名稱,根據業務自定義
第二次信息為destination:topic名稱
第三個信息為message:消息體,利用MessageBuilder.withPayload構建
第四個信息為arg:業務對象,用於處理本地業務
代碼如下:

// 發送半消息
rocketMQTemplate.sendMessageInTransaction(
        "test-transactional",
        "test-topic",
        MessageBuilder.withPayload(
                Demo.builder().demoId(1).remark("哈哈哈").build()
        ).setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString()).build(),
        forObject
);

實現RocketMQLocalTransactionListener接口

新建demoTransactionalListener類,繼承RocketMQLocalTransactionListener接口,實現了兩個方法
executeLocalTransaction:處理本地業務
checkLocalTransaction:MQ Server發送檢查信息相應
添加@RocketMQTransactionListener注解,txProducerGroup屬性值與半消息的txProducerGroup參數值相同
代碼如下:

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

@RocketMQTransactionListener(txProducerGroup = "test-transactional")
public class demoTransactionalListener implements RocketMQLocalTransactionListener {
    /**
     * 處理本地事務
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        // 消息頭
        MessageHeaders headers = message.getHeaders();
        String transactionalId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

        // arg:sendMessageInTransaction方法的第四個參數,用於處於本地業務

        try {
            // 本地業務
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 若在本地事務執行過程中缺少二次確認消息或生產者處於等待狀態
     * MQ Server將向同一組中的每個生產者發送檢查消息
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

        try {
            // 檢查
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

PS:MQ Server發送檢查:可以通過新建一個表,記錄本地業務成功或者失敗,然后檢查相應只需要查一下數據就可以了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM