activemq的事務消息


消息中間件大多支持事務消息,activemq也不例外。

關於事務的定義及ACID特性這里不贅述。


對比Mysql數據庫來說,

Mysql有事務的概念,

Activemq也有事務的概念

這里說的都是本地事務,rocketMq還支持分布式事務


java制定了jdbc來規范對數據庫的訪問

同樣

java也有jms(java message services)來規范對於消息中間件的訪問,activemq是完全支持jms1.1規范的


那java通過mysql來實現事務,

無外乎大概這樣:

 public static void main(String[] args) {
        Connection conn =getConnection();
        try {
            conn.setAutoCommit(false);
            insertUser(conn);
            insertAddress(conn);
            conn.commit();
        } catch (SQLException e) {
            System.out.println("************事務處理出現異常***********");
            e.printStackTrace();
            try {
                conn.rollback();
                System.out.println("*********事務回滾成功***********");
            } catch (Exception e2) {
                // TODO: handle exception
                e2.printStackTrace();
            }finally {
                try {
                    conn.close();
                } catch (SQLException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
            }
        }
    }

那我們接上文:Spring整合Activemq

來看看activemq如何實現事務消息


    @Test
    public void p2pSender() {
        //獲取連接工廠
        ConnectionFactory connectionFactory = jmsQueueTemplate.getConnectionFactory();
        Session session = null;
        try {
            Connection connection = connectionFactory.createConnection();
            // 參數一:是否開啟消息事務
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

            MessageProducer producer = session.createProducer(session.createQueue("test.trsaction"));
            //發送10條消息,開啟事務后,要么一起成功,要么一起失敗
            for (int i = 0; i < 10; i++) {
                if (i == 4) {
                    //出現異常
//                    throw new RuntimeException("i cannot equals 4");
                }
                TextMessage textMessage = session.createTextMessage("消息-----" + i);
                producer.send(textMessage);
            }
            //開啟事務后,需要手動提交
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
            //消息事務回滾
            try {
                session.rollback();
            } catch (JMSException ex) {
                ex.printStackTrace();
            }
        }
    }

上圖我們手動制造了一個異常。

沒有異常時:可以成功發送10條消息

image-20200130211242542

我們把異常那行代碼打開

,再次執行:

從控制台可以發現,信息並沒有發出去。

image-20200130211348932

即現在有事務效果。


另外需要一提的是:

整合入spring后,應該委托spring進行事務管理。

對於Mysql,我們配置了DatasourceTransactionManager

使用@transactional注解即可。

對於activemq也是一樣,不過使用的 就是JmsTransactionManager了。


還需要注意的是,分布式事務這塊,mysql和activemq同為資源管理器(RM),都實現了XA協議,activemq這個用的不多, 支持的也不完全,不贅述。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM