在Spring中,我們要實現事務,一般通過@Transactional注解實現。這在引入RocketMQ之前沒有問題,但是在引入了RocketMQ之后,如果消息發送之后的業務邏輯處理發生了異常的話,這時候消息已經發送出去了,就會導致業務的問題。
為了解決這一問題,RocketMQ引入了Transactional Message【事務消息】。
- 生產者向MQServer發送半消息【半消息:會存儲進MQ Server,但是被標記為不能投遞狀態】
- 發送半消息成功,生產者實行本地事務
- 根據本地事務結果向MQ Server發送二次確認請求
- MQ Server根據接受到的消息投遞或者丟棄消息
- 若在本地事務執行過程中缺少二次確認消息或生產者處於等待狀態,MQ服務器將向同一組中的每個生產者發送檢查消息,然后繼續3,4的操作
PS:消息三態
- Commit:提交事務信息,消費者可以消費此消息
- Rollback:回滾事務消息,broker會刪除這條消息,消費者不能消費
- UNKNOWN:broker需要回查確認消息狀態
代碼實現
發送半消息
利用rocketMQTemplate類的sendMessageInTransaction實現半消息發送
第一個參數為txProducerGroup:就是group名稱,根據業務自定義
第二次信息為destination:topic名稱
第三個信息為message:消息體,利用MessageBuilder.withPayload構建
第四個信息為arg:業務對象,用於處理本地業務
代碼如下:
// 發送半消息
rocketMQTemplate.sendMessageInTransaction(
"test-transactional",
"test-topic",
MessageBuilder.withPayload(
Demo.builder().demoId(1).remark("哈哈哈").build()
).setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString()).build(),
forObject
);
實現RocketMQLocalTransactionListener接口
新建demoTransactionalListener類,繼承RocketMQLocalTransactionListener接口,實現了兩個方法
executeLocalTransaction:處理本地業務
checkLocalTransaction:MQ Server發送檢查信息相應
添加@RocketMQTransactionListener注解,txProducerGroup屬性值與半消息的txProducerGroup參數值相同
代碼如下:
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@RocketMQTransactionListener(txProducerGroup = "test-transactional")
public class demoTransactionalListener implements RocketMQLocalTransactionListener {
/**
* 處理本地事務
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
// 消息頭
MessageHeaders headers = message.getHeaders();
String transactionalId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
// arg:sendMessageInTransaction方法的第四個參數,用於處於本地業務
try {
// 本地業務
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 若在本地事務執行過程中缺少二次確認消息或生產者處於等待狀態
* MQ Server將向同一組中的每個生產者發送檢查消息
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
try {
// 檢查
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
PS:MQ Server發送檢查:可以通過新建一個表,記錄本地業務成功或者失敗,然后檢查相應只需要查一下數據就可以了