RocketMQ 是一個來自阿里巴巴的分布式消息中間件,於 2012 年開源,並在 2017 年正式成為 Apache 頂級項目。據了解,包括阿里雲上的消息產品以及收購的子公司在內,阿里集團的消息產品全線都運行在 RocketMQ 之上,並且最近幾年的雙十一大促中,RocketMQ 都有搶眼表現。Apache RocketMQ 4.3之后的版本正式支持事務消息,為分布式事務實現提供了便利性支持。
RocketMQ 事務消息設計則主要是為了解決 Producer 端的消息發送與本地事務執行的原子性問題,RocketMQ 的設計中 broker 與 producer 端的雙向通信能力,使得 broker 天生可以作為一個事務協調者存在;而 RocketMQ 本身提供的存儲機制為事務消息提供了持久化能力;RocketMQ 的高可用機制以及可靠消息設計則為事務消息在系統發生異常時依然能夠保證達成事務的最終一致性。
在RocketMQ 4.3后實現了完整的事務消息,實際上其實是對本地消息表的一個封裝,將本地消息表移動到了MQ內部,解決 Producer 端的消息發送與本地事務執行的原子性問題。
執行流程如下:
為方便理解我們還以注冊送積分的例子來描述 整個流程。
Producer 即MQ發送方,本例中是用戶服務,負責新增用戶。MQ訂閱方即消息消費方,本例中是積分服務,負責新增積分。
1、Producer 發送事務消息
Producer (MQ發送方)發送事務消息至MQ Server,MQ Server將消息狀態標記為Prepared(預備狀態),注意此時這條消息消費者(MQ訂閱方)是無法消費到的。
本例中,Producer 發送 ”增加積分消息“ 到MQ Server。
2、MQ Server回應消息發送成功
MQ Server接收到Producer 發送給的消息則回應發送成功表示MQ已接收到消息。
3、Producer 執行本地事務
Producer 端執行業務代碼邏輯,通過本地數據庫事務控制。
本例中,Producer 執行添加用戶操作。
4、消息投遞
若Producer 本地事務執行成功則自動向MQServer發送commit消息,MQ Server接收到commit消息后將”增加積分消息“ 狀態標記為可消費,此時MQ訂閱方(積分服務)即正常消費消息;
若Producer 本地事務執行失敗則自動向MQServer發送rollback消息,MQ Server接收到rollback消息后 將刪除”增加積分消息“ 。
MQ訂閱方(積分服務)消費消息,消費成功則向MQ回應ack,否則將重復接收消息。這里ack默認自動回應,即程序執行正常則自動回應ack。
5、事務回查
如果執行Producer端本地事務過程中,執行端掛掉,或者超時,MQ Server將會不停的詢問同組的其他 Producer來獲取事務執行狀態,這個過程叫事務回查。MQ Server會根據事務回查結果來決定是否投遞消息。
以上主干流程已由RocketMQ實現,對用戶側來說,用戶需要分別實現本地事務執行以及本地事務回查方法,因此只需關注本地事務的執行狀態即可。
RoacketMQ提供RocketMQLocalTransactionListener接口:
public interface RocketMQLocalTransactionListener { /** - 發送prepare消息成功此方法被回調,該方法用於執行本地事務 - @param msg 回傳的消息,利用transactionId即可獲取到該消息的唯一Id - @param arg 調用send方法時傳遞的參數,當send時候若有額外的參數可以傳遞到send方法中,這里能獲取到 - @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調 */ RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg); /** - @param msg 通過獲取transactionId來判斷這條消息的本地事務執行狀態 - @return 返回事務狀態,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調 */ RocketMQLocalTransactionState checkLocalTransaction(Message msg); }
發送事務消息:
以下是RocketMQ提供用於發送事務消息的API:
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); //設置TransactionListener實現 producer.setTransactionListener(transactionListener); //發送事務消息 SendResult sendResult = producer.sendMessageInTransaction(msg, null);
下面給出代碼示例
package com.example.rocketmq.common; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import javax.annotation.Resource; @Slf4j @Service public class SyncProducer { @Resource private RocketMQTemplate rocketMQTemplate; public TransactionSendResult sendSyncMessage(String msg, String topic, String tag){ log.info("【發送消息】:{}", msg); //構建消息體 JSONObject jsonObject = new JSONObject(); jsonObject.put("message",msg); Message<String> message = MessageBuilder.withPayload(jsonObject.toJSONString()).build(); TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("tx_group", topic, message, tag); log.info("【發送狀態】:{}", result.getLocalTransactionState()); return result; } }
package com.example.rocketmq.common; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import java.util.concurrent.ConcurrentHashMap; @Slf4j @Component @RocketMQTransactionListener(txProducerGroup = "tx_group") public class SyncProducerListener implements RocketMQLocalTransactionListener { private ConcurrentHashMap<String, Object> localTrans = new ConcurrentHashMap<>(); @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { try { log.info("【本地業務執行完畢】 msg:{}, Object:{}", message, o); localTrans.put(message.getHeaders().getId()+"", message.getPayload()); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { e.printStackTrace(); log.error("【執行本地業務異常】 exception message:{}", e.getMessage()); return RocketMQLocalTransactionState.ROLLBACK; } } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { log.info("【執行檢查任務】"); return RocketMQLocalTransactionState.UNKNOWN; } }
package com.example.rocketmq.common; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "TransactionTopic",consumerGroup = "spring_boot_consumer_group") @Slf4j public class TxmsgConsumer implements RocketMQListener<String> { @Override public void onMessage(String s) { log.info("開始消費消息:{}",s); } }
application.propertis
rocketmq.name-server=localhost:9876
rocketmq.producer.group = spring_boot_producer_group
pom.xml
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
測試:
package com.example.rocketmq.controller; import com.example.rocketmq.common.SyncProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class TestController { @Autowired private SyncProducer syncProducer; @GetMapping("test") String test() { syncProducer.sendSyncMessage("我隨便傳的一條測試消息,內容保密","TransactionTopic","TransactionTAG"); return "ok"; } }
結果: