Spring Cloud Alibaba學習筆記(14) - Spring Cloud Stream + RocketMQ實現分布式事務


發送消息

Spring消息編程模型下,使用RocketMQ收發消息 一文中,發送消息使用的是RocketMQTemplate類.
在集成了Spring Cloud Stream之后,我們可以使用Source實現消息的發送,代碼如下

private final Source source;
......
source.output().send(
                MessageBuilder
                        .withPayload(Demo.builder().demoId(1).remark("哈哈哈").build())
                        .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
                        .setHeader("comment", JSON.toJSONString(forObject))
                        .build()
        );

在使用rocketMQTemplate類時,sendMessageInTransaction方法的第四個參數可以幫助我們傳遞對象,source接口的send方法沒有多余參數,所以我們利用MessageBuilder將對象信息放在消息頭里面.因為setHeader只能傳遞字符串,所以我們將對象轉換為Json字符串,然后在處理本地事務從消息頭中取出來,轉換回來就可以了.

修改配置

在使用rocketMQTemplate類時,我們使用sendMessageInTransactiontxProducerGroup參數設置txProducerGroup信息,在引入了Spring Cloud Stream之后,我們在配置文件中配置該信息.配置如下

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              transactional: true
              # txProducerGroup
              group: test-stream-rocketmq-transactional
      bindings:
        # 生產者
        output:
          # 指定topic
          destination: test-topic

本地業務處理

import com.alibaba.fastjson.JSON;
import com.example.study01.domain.dto.DemoComment;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

@Slf4j
@RocketMQTransactionListener(txProducerGroup = "test-stream-rocketmq-transactional")
public class demoTransactionalListener implements RocketMQLocalTransactionListener {
    /**
     * 處理本地事務
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        // 消息頭
        MessageHeaders headers = message.getHeaders();
        String transactionalId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        DemoComment comment = JSON.parseObject(headers.get("comment").toString(), DemoComment.class);

        try {
            log.info("1111111");
            // 本地業務
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 若在本地事務執行過程中缺少二次確認消息或生產者處於等待狀態
     * MQ Server將向同一組中的每個生產者發送檢查消息
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info("222222");
        try {
            // 檢查業務
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM