分布式事務(4)---RocketMQ實現分布式事務項目


RocketMQ實現分布式事務

有關RocketMQ實現分布式事務前面寫了一篇博客

1、RocketMQ實現分布式事務原理

下面就這個項目做個整體簡單介紹,並在文字最下方附上項目Github地址。

一、項目概述

1、技術架構

項目總體技術選型

SpringCloud(Finchley.RELEASE) + SpringBoot2.0.4 + Maven3.5.4 + RocketMQ4.3 +MySQL + lombok(插件)

有關SpringCloud主要用到以下四個組建

Eureka Server +config-server(配置中心)+ Eureka Client + Feign(服務間調用) 

配置中心是用MySQL存儲數據。

2、項目整體結構

config-service  # 配置中心
eureka          # 注冊中心
service-order   #訂單微服務
service-produce #商品微服務

各服務的啟動順序就安裝上面的順序啟動。

大致流程

啟動后,配置中心、訂單微服務、商品微服務都會將信息注冊到注冊中心。

如果訪問:localhost:7001(注冊中心地址),以上服務都出現說明啟動成功。

3、分布式服務流程

用戶在訂單微服務下單后,會去回調商品微服務去減庫存。這個過程需要事務的一致性。

4、測試流程

頁面輸入:

http://localhost:9001/api/v1/order/save?userId=1&productId=1&total=4	

訂單微服務執行情況(訂單服務事務執行成功)

商品微服務執行情況(商品服務事務執行成功)

當然你也可以通過修改參數來模擬分布式事務出現的各種情況。


二、MQ中生產者核心代碼

這里展示下,生產者發送消息核心代碼。

@Slf4j
@Component
public class TransactionProducer {

    /**
     * 需要自定義事務監聽器 用於 事務的二次確認 和 事務回查
     */
    private TransactionListener transactionListener ;
    /**
     * 這里的生產者和之前的不一樣
     */
    private TransactionMQProducer producer = null;
    /**
     * 官方建議自定義線程 給線程取自定義名稱 發現問題更好排查
     */
    private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }
    });

    public TransactionProducer(@Autowired Jms jms, @Autowired ProduceOrderService produceOrderService) {
        transactionListener = new TransactionListenerImpl(produceOrderService);
        // 初始化 事務生產者
        producer = new TransactionMQProducer(jms.getOrderTopic());
        // 添加服務器地址
        producer.setNamesrvAddr(jms.getNameServer());
        // 添加事務監聽器
        producer.setTransactionListener(transactionListener);
        // 添加自定義線程池
        producer.setExecutorService(executorService);

        start();
    }

    public TransactionMQProducer getProducer() {
        return this.producer;
    }

    /**
     * 對象在使用之前必須要調用一次,只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 一般在應用上下文,使用上下文監聽器,進行關閉
     */
    public void shutdown() {
        this.producer.shutdown();
    }
}

/**
 * @author xub
 * @Description: 自定義事務監聽器
 * @date 2019/7/15 下午12:20
 */
@Slf4j
class TransactionListenerImpl implements TransactionListener {

    @Autowired
    private ProduceOrderService produceOrderService ;

    public TransactionListenerImpl( ProduceOrderService produceOrderService) {
        this.produceOrderService = produceOrderService;
    }

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info("=========本地事務開始執行=============");
        String message = new String(msg.getBody());
        JSONObject jsonObject = JSONObject.parseObject(message);
        Integer productId = jsonObject.getInteger("productId");
        Integer total = jsonObject.getInteger("total");
        int userId = Integer.parseInt(arg.toString());
        //模擬執行本地事務begin=======
        /**
         * 本地事務執行會有三種可能
         * 1、commit 成功
         * 2、Rollback 失敗
         * 3、網絡等原因服務宕機收不到返回結果
         */
        log.info("本地事務執行參數,用戶id={},商品ID={},銷售庫存={}",userId,productId,total);
        int result = produceOrderService.save(userId, productId, total);
        //模擬執行本地事務end========
        //TODO 實際開發下面不需要我們手動返回,而是根據本地事務執行結果自動返回
        //1、二次確認消息,然后消費者可以消費
        if (result == 0) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        //2、回滾消息,Broker端會刪除半消息
        if (result == 1) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        //3、Broker端會進行回查消息
        if (result == 2) {
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    /**
     * 只有上面接口返回 LocalTransactionState.UNKNOW 才會調用查接口被調用
     *
     * @param msg 消息
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        log.info("==========回查接口=========");
        String key = msg.getKeys();
        //TODO 1、必須根據key先去檢查本地事務消息是否完成。
        /**
         * 因為有種情況就是:上面本地事務執行成功了,但是return LocalTransactionState.COMMIT_MESSAG的時候
         * 服務掛了,那么最終 Brock還未收到消息的二次確定,還是個半消息 ,所以當重新啟動的時候還是回調這個回調接口。
         * 如果不先查詢上面本地事務的執行情況 直接在執行本地事務,那么就相當於成功執行了兩次本地事務了。
         */
        // TODO 2、這里返回要么commit 要么rollback。沒有必要在返回 UNKNOW
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

三、MQ消費端核心代碼

這里展示下,消費端消費消息核心代碼。消費端和普通消費一樣。

@Slf4j
@Component
public class OrderConsumer {
    private DefaultMQPushConsumer consumer;
    private String consumerGroup = "produce_consumer_group";

    public OrderConsumer(@Autowired Jms jms,@Autowired ProduceService produceService) throws MQClientException {
        //設置消費組
        consumer = new DefaultMQPushConsumer(consumerGroup);
        // 添加服務器地址
        consumer.setNamesrvAddr(jms.getNameServer());
        // 添加訂閱號
        consumer.subscribe(jms.getOrderTopic(), "*");
        // 監聽消息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            MessageExt msg = msgs.get(0);
            String message = new String(msgs.get(0).getBody());
            JSONObject jsonObject = JSONObject.parseObject(message);
            Integer productId = jsonObject.getInteger("productId");
            Integer total = jsonObject.getInteger("total");
            String key = msg.getKeys();
            log.info("消費端消費消息,商品ID={},銷售數量={}",productId,total);
            try {
                produceService.updateStore(productId, total, key);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                log.info("消費失敗,進行重試,重試到一定次數 那么將該條記錄記錄到數據庫中,進行如果處理");
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();
        System.out.println("consumer start ...");
    }
}

至於完整的項目地址見GitHub。

如果對您能有幫助,就給個星星吧,哈哈!

GitHubd地址 https://github.com/yudiandemingzi/spring-cloud-rocketmq-transaction.git



晚安!

只要自己變優秀了,其他的事情才會跟着好起來(上將8)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM