消息中間件大多支持事務消息,activemq也不例外。
關於事務的定義及ACID特性這里不贅述。
對比Mysql數據庫來說,
Mysql有事務的概念,
Activemq也有事務的概念
這里說的都是本地事務,rocketMq還支持分布式事務
java制定了jdbc來規范對數據庫的訪問
同樣
java也有jms(java message services)來規范對於消息中間件的訪問,activemq是完全支持jms1.1規范的
那java通過mysql來實現事務,
無外乎大概這樣:
public static void main(String[] args) {
Connection conn =getConnection();
try {
conn.setAutoCommit(false);
insertUser(conn);
insertAddress(conn);
conn.commit();
} catch (SQLException e) {
System.out.println("************事務處理出現異常***********");
e.printStackTrace();
try {
conn.rollback();
System.out.println("*********事務回滾成功***********");
} catch (Exception e2) {
// TODO: handle exception
e2.printStackTrace();
}finally {
try {
conn.close();
} catch (SQLException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
}
那我們接上文:Spring整合Activemq
來看看activemq如何實現事務消息
@Test
public void p2pSender() {
//獲取連接工廠
ConnectionFactory connectionFactory = jmsQueueTemplate.getConnectionFactory();
Session session = null;
try {
Connection connection = connectionFactory.createConnection();
// 參數一:是否開啟消息事務
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue("test.trsaction"));
//發送10條消息,開啟事務后,要么一起成功,要么一起失敗
for (int i = 0; i < 10; i++) {
if (i == 4) {
//出現異常
// throw new RuntimeException("i cannot equals 4");
}
TextMessage textMessage = session.createTextMessage("消息-----" + i);
producer.send(textMessage);
}
//開啟事務后,需要手動提交
session.commit();
} catch (JMSException e) {
e.printStackTrace();
//消息事務回滾
try {
session.rollback();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
上圖我們手動制造了一個異常。
沒有異常時:可以成功發送10條消息
我們把異常那行代碼打開
,再次執行:
從控制台可以發現,信息並沒有發出去。
即現在有事務效果。
另外需要一提的是:
整合入spring后,應該委托spring進行事務管理。
對於Mysql,我們配置了DatasourceTransactionManager
,
使用@transactional注解即可。
對於activemq也是一樣,不過使用的 就是JmsTransactionManager了。
還需要注意的是,分布式事務這塊,mysql和activemq同為資源管理器(RM),都實現了XA協議,activemq這個用的不多, 支持的也不完全,不贅述。