ActiveMQ持久化到mysql


ActiveMQ持久化到mysql

配置

1.找到apache-activemq-5.15.2/examples/conf下面的activemq-jdbc-performance.xml

2.打開activemq-jdbc-performance.xml,在persistenceAdapter節點后面添加dataSource="#mysql-ds"

並配置你的數據庫

其實可以直接更改apache-activemq-5.15.2/conf/activemq.xml的persistenceAdapter節點.配置下數據庫也是可以的

用activemq-jdbc-performance.xml 我的理解應該是高性能模式,連 都沒有(這句是添加localhost:8161的管理頁面,),並且只能用openwire傳輸協議,默認的配置文件傳輸協議是全開的,如果需要用到其他的傳輸協議可以自己在transportConnectors節點上添加

3.把activemq-jdbc-performance.xml復制到apache-activemq-5.15.2/conf目錄下,從命名為activemq.xml,覆蓋原來的activemq.xml

4.在對應的數據庫創建activemq庫,然后重啟ActiveMQ

我們這里用debug模式啟動,提示沒有mysql的jar包

5.我們在apache-activemq-5.15.2/lib下面添加mysql的jar包,再次啟動,就不會報錯了

6.這時可以看到剛才創建的activemq庫多了三張表,說明配置成功了

點對點測試

生產者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
         public static void main(String[] args) {
//                   String user = ActiveMQConnection.DEFAULT_USER;
//                   String password = ActiveMQConnection.DEFAULT_PASSWORD;
//                   String url = ActiveMQConnection.DEFAULT_BROKER_URL;
                   String subject = "test.queue";
                   ConnectionFactory contectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.109:61616");
                //   ConnectionFactory contectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
                   try{
                            Connection connection = contectionFactory.createConnection();
                            connection.start();
                            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                            Destination destination = session.createQueue(subject);
                            MessageProducer producer = session.createProducer(destination);
                         //   producer.setDeliveryMode(DeliveryMode.PERSISTENT);//設置為持久化
                            for(int i = 0; i < 20;) {
                                     TextMessage createTextMessage = session.createTextMessage("這是要發送的第"+ ++i +"條消息消息");
                                     producer.send(createTextMessage);
                                     System.out.println("第"+ i +"條消息已發送");
                            }
                            Thread.sleep(2000);
                            session.commit();
                            session.close();
                            connection.close();
                   }catch (JMSException e) {
                      //      e.printStackTrace();
                   }catch (InterruptedException e) {
                       //     e.printStackTrace();
                   }

         }

}

消費者

import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;


public class Customer {
	
	
    public static void main(String[] args) {

//        String user = ActiveMQConnection.DEFAULT_USER;
//
//        String password = ActiveMQConnection.DEFAULT_PASSWORD;
//
//        String url = ActiveMQConnection.DEFAULT_BROKER_URL;

        String subject = "test.queue";

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.109:61616");
      //  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        Connection connection;

        try {
            connection= connectionFactory.createConnection();

            connection.start();

            final Session session =connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

            Destination destination = session.createQueue(subject);

            MessageConsumer message = session.createConsumer(destination);

            message.setMessageListener(new MessageListener() {
                public void onMessage(Message msg){
                	TextMessage message = (TextMessage) msg;
                    try {
                        System.out.println("--收到消息:" +new Date()+message.getText());
                        session.commit();
                    }catch(JMSException e) {
                 //       e.printStackTrace();
                    }

                }

            });
//            Thread.sleep(30000);
//
//            session.close();
//
//            Thread.sleep(30000);
//
//            connection.close();
//
//            Thread.sleep(30000);

        }catch(Exception e) {
        //    e.printStackTrace();
        }

    }

}

這時生產者生產數據,消費者一直不在線,數據就會持久化到數據庫的activemq_msgs表,就算ActiveMQ的服務掛了,再次啟動后,等消費者在線了就可以再次獲取生產者生產的數據(消費之后數據庫的數據會自動刪除)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM