原文鏈接:https://blog.csdn.net/Timeguys/article/details/107949660
一、使用:
一、引入依賴:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>二、舉例:生產者創建訂單---->生產者發送消息----->MQ服務接受消息----->消費者監聽消息並減庫存
【生產者】:
application.yml
rocketmq: name-server: 192.168.85.128:9876 # rocketMQ地址 producer: group: producer-group-test # 生產者的組名需要和消費者監聽consumerGroup一致業務代碼:
public class OrderServiceImpl extends ServiceImpl<OrderMapper, TbOrder> implements OrderService { private RocketMQTemplate rocketMQTemplate; public void create() { //創建訂單--> 發送消息 --> 消息發送成功后調用本地事務提交 --> TbOrder order = new TbOrder(); order.setCount( 10); order.setMoney(BigDecimal.valueOf( 10)); order.setProductId( 1L); order.setStatus( 1); order.setUserId( 1L); sendMsg(order); } public void sendMsg(TbOrder order){ /** * String txProducerGroup, 生產者分組 * String destination, topic * Message<?> message, 消息 * Object arg 消息參數 */ Message<String> build = MessageBuilder.withPayload(JSONObject.toJSONString(order)).build(); rocketMQTemplate.sendMessageInTransaction( "tx-producer-group","txmsg-topic",build , null); } }創建 ProducerTxmsgListener 並實現 RocketMQLocalTransactionListener:
// txProducerGroup 的值和發送事務消息指定的 txProducerGroup 相同 public class ProducerTxmsgListener implements RocketMQLocalTransactionListener { private OrderService orderService; /** * @Description: 執行本地事務提交 */ public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { try { TbOrder tbOrder = JSONObject.parseObject(message.getPayload().toString(), TbOrder.class); System.out.println(tbOrder); orderService.save(tbOrder); return RocketMQLocalTransactionState.COMMIT; //變更消息狀態為:可消費 } catch (Exception e){ return RocketMQLocalTransactionState.ROLLBACK; //本地事務執行異常,將消息遺棄 } } /** * @Description: 檢查本地事務是否執行成功 */ public RocketMQLocalTransactionState checkLocalTransaction(Message message) { TbOrder tbOrder = JSONObject.parseObject(message.getPayload().toString(), TbOrder.class); TbOrder order = orderService.getById(tbOrder.getId()); // 不為 null 則表示執行成功 if (order != null){ return RocketMQLocalTransactionState.COMMIT; //變更消息狀態為:可消費 } // 執行本地事務發生問題或還沒執行完成, UNKNOWN 表示會繼續回查 return RocketMQLocalTransactionState.UNKNOWN; } }【消費者】:
application.yml
rocketmq: name-server: 127.0.0.1:9876 # rocketMQ地址 producer: group: producer-test-group # 生產者的組名需要和消費者監聽consumerGroup一致創建MyListener 並實現 RocketMQListener 接口:
// topic 對應生產者發消息是的topic public class MyListener implements RocketMQListener<String> { public void onMessage(String message) { //執行 減庫存業務 如果發生異常,則消息會隔段時間再次消費 System.out.println(message); } }
原理圖: