JmsTransactionManager事務:Spring JMS事務類型
- Session管理的事務-原生事務
- 外部管理的事務-JmsTransactionManager、JTA
Srping JMS事務機制過程
session原生事務
代碼測試
pom.xml:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
Receiver:
@Component public class Receiver { @Autowired private JmsTemplate jmsTemplate; @JmsListener(destination = "test.queue.first") public void receive(String msg){ System.out.println("收到的消息:" + msg); jmsTemplate.convertAndSend("test.queue.reply", "reply:" + msg); //發送消息包含error時拋出異常 if(msg.contains("error")){ throw new RuntimeException("data error!!!!!!!!!!!!!!!!!1"); } } }
測試消息發送接口:
@RestController public class DemoController { @Autowired private JmsTemplate jmsTemplate; @Autowired private Receiver receiver; /** * 通過@JmsListener監聽消息並發送出去 */ @RequestMapping("/send") public void send(String msg) { jmsTemplate.convertAndSend("test.queue.first", msg); } /** * 直接發送 */ @RequestMapping("/direct") public void direct(String msg) { receiver.receive(msg); } }
測試結果:
- 瀏覽器輸入http://localhost:8080/send?msg=data_error,通過@JmsListener監聽消息並發送出去,拋出異常時,test.queue.reply沒有收到消息,消息進入ActiveMQ.DLQ,查看控制台,發現消息發送了7次,這是activemq默認的重發次數
- 瀏覽器輸入http://localhost:8080/direct?msg=data_error,直接向test.queue.reply中發送消息,消息發送成功,查看控制台報了一次錯,消息並未回滾
結論:通過@JmsListener監聽的方法受session事務控制,拋出異常時,無論是消息的消費還是生產均能夠回滾,默認嘗試7次后進入死信隊列
如果直接調用該方法,不存在事務。
問題:為什么沒有配置事務,會存在事務?
查看官方文檔
查看 DefaultJmsListenerContainerFactoryConfigurer類源碼,在configure方法中有這么一段,有外部事務的時候使用外部事務,沒有的話使用session本地事務
如果使用自己配置的JmsListenerContainerFactory ,需添加factory.setSessionTransacted(true)這一行代碼
// 這個用於設置 @JmsListener使用的containerFactory @Bean public JmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory (); factory.setSessionTransacted(true); factory.setConnectionFactory(connectionFactory); factory.setReceiveTimeout(1000L); //重連間隔時間 return factory; }
JmsTransactionManager事務
修改上例代碼:
添加JmsConfig
@EnableJms @Configuration public class JmsConfig { //這個用於設置 @JmsListener使用的containerFactory @Bean public JmsListenerContainerFactory<?> msgFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer, PlatformTransactionManager transactionManager) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setTransactionManager(transactionManager); factory.setReceiveTimeout(10000L); configurer.configure(factory, connectionFactory); return factory; } @Bean public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory){ JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setConnectionFactory(connectionFactory); // jmsTemplate.setSessionTransacted(true); return jmsTemplate; } /** * JmsTransactionManager事務管理 */ @Bean public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) { return new JmsTransactionManager(connectionFactory); } }
Receiver:
@Component public class Receiver { @Autowired private JmsTemplate jmsTemplate; @Transactional @JmsListener(destination = "test.queue.first") public void receive(String msg){ System.out.println("收到的消息:" + msg); jmsTemplate.convertAndSend("test.queue.reply", "reply:" + msg); //發送消息包含error時拋出異常 if(msg.contains("error")){ throw new RuntimeException("data error!!!!!!!!!!!!!!!!!1"); } } }
同上測試,過程略:
結論:兩種調用方法都受到事務控制。
通過@JmsListener監聽消息進行調用會rollback,進入死信隊列
直接調用不會進入隊列(受到@Transactional的事務控制)
參考:https://blog.csdn.net/songhaifengshuaige/article/details/54177339
https://blog.csdn.net/songhaifengshuaige/article/details/54177242
http://elim.iteye.com/blog/1983532