發送消息
在Spring消息編程模型下,使用RocketMQ收發消息 一文中,發送消息使用的是RocketMQTemplate類.
在集成了Spring Cloud Stream之后,我們可以使用Source實現消息的發送,代碼如下
private final Source source;
......
source.output().send(
MessageBuilder
.withPayload(Demo.builder().demoId(1).remark("哈哈哈").build())
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
.setHeader("comment", JSON.toJSONString(forObject))
.build()
);
在使用rocketMQTemplate類時,sendMessageInTransaction方法的第四個參數可以幫助我們傳遞對象,source接口的send方法沒有多余參數,所以我們利用MessageBuilder將對象信息放在消息頭里面.因為setHeader只能傳遞字符串,所以我們將對象轉換為Json字符串,然后在處理本地事務從消息頭中取出來,轉換回來就可以了.
修改配置
在使用rocketMQTemplate類時,我們使用sendMessageInTransaction的txProducerGroup參數設置txProducerGroup信息,在引入了Spring Cloud Stream之后,我們在配置文件中配置該信息.配置如下
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
producer:
transactional: true
# txProducerGroup
group: test-stream-rocketmq-transactional
bindings:
# 生產者
output:
# 指定topic
destination: test-topic
本地業務處理
import com.alibaba.fastjson.JSON;
import com.example.study01.domain.dto.DemoComment;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "test-stream-rocketmq-transactional")
public class demoTransactionalListener implements RocketMQLocalTransactionListener {
/**
* 處理本地事務
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
// 消息頭
MessageHeaders headers = message.getHeaders();
String transactionalId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
DemoComment comment = JSON.parseObject(headers.get("comment").toString(), DemoComment.class);
try {
log.info("1111111");
// 本地業務
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 若在本地事務執行過程中缺少二次確認消息或生產者處於等待狀態
* MQ Server將向同一組中的每個生產者發送檢查消息
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
log.info("222222");
try {
// 檢查業務
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}