ActiveMQ使用JDBC持久化


    步驟一:創建一個數據庫
            步驟二:配置activemq.xml配置文件
                1.在persistenceAdapter加入如下配置
                 

<!--createTablesOnStartup    啟動是否創建表  第一次為true 后續為false-->
<jdbcPersistenceAdapter dataSource="#activemq-db" createTablesOnStartup="true" />

      第一次為true是為了創建表,之后的每次都不創建,使用第一次創建的表保存數據


    
                2.配置數據源
                    在beans節點中
                  

 <bean id="activemq-db" class="org.apache.commons.dbcp.BasicDataSource">
                      <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
                      <property name="url" value="jdbc:mysql://127.0.0.1:3306/activemq"/>
                      <property name="username" value="root"/>
                      <property name="password" value="root"/>
                      <property name="maxActive" value="200"/>
                      <property name="poolPreparedStatements" value="true"/>
                    </bean>

 

 


                    
                3.將數據庫連接Jar放到activemq解壓的lib文件夾下

 

 


                
            步驟三:重新啟動activemq

    重啟后刷新數據庫,會生成三張表

 

 

activemq_msgs用於存儲消息,Queue和Topic都存儲在這個表中:

ID:自增的數據庫主鍵

CONTAINER:消息的Destination

MSGID_PROD:消息發送者客戶端的主鍵

MSG_SEQ:是發送消息的順序,MSGID_PROD+MSG_SEQ可以組成JMS的MessageID

EXPIRATION:消息的過期時間,存儲的是從1970-01-01到現在的毫秒數

MSG:消息本體的Java序列化對象的二進制數據

PRIORITY:優先級,從0-9,數值越大優先級越高

 

activemq_acks用於存儲訂閱關系。如果是持久化Topic,訂閱者和服務器的訂閱關系在這個表保存:

主要的數據庫字段如下:

CONTAINER:消息的Destination

SUB_DEST:如果是使用Static集群,這個字段會有集群其他系統的信息

CLIENT_ID:每個訂閱者都必須有一個唯一的客戶端ID用以區分

SUB_NAME:訂閱者名稱

SELECTOR:選擇器,可以選擇只消費滿足條件的消息。條件可以用自定義屬性實現,可支持多屬性AND和OR操作

LAST_ACKED_ID:記錄消費過的消息的ID。

 

表activemq_lock在集群環境中才有用,只有一個Broker可以獲得消息,稱為Master Broker,

其他的只能作為備份等待Master Broker不可用,才可能成為下一個Master Broker。
這個表用於記錄哪個Broker是當前的Master Broker。

啟動生產者生產一條消息

public static void main(String[] args) throws JMSException {
        //創建MQ連接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
        //創建連接
        Connection connection = activeMQConnectionFactory.createConnection();
        //啟動連接
        connection.start();
        //創建會話工廠
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        //創建隊列
        Queue queue = session.createQueue("wdksoft_queue");
        //創建消息生產者
        MessageProducer producer = session.createProducer(queue);
        //消息持久化
        producer.setDeliveryMode(2);
        //模擬消息
        TextMessage message = session.createTextMessage("hello activeMQ");
        //發送消息
        producer.send(message);

        System.out.println("生產者發送消息完畢~");

        session.close();
        connection.close();
    }

刷新msgs數據表

 

 數據表中就多了一條剛才的信息

  當啟動消費者消費了這條信息時,數據庫便不再保存這條記錄

public static void main(String[] args) throws JMSException {
        // ConnectionFactory :連接工廠,JMS 用它創建連接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
        // JMS 客戶端到JMS Provider 的連接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Session: 一個發送或接收消息的線程
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // Destination :消息的目的地;消息發送給誰.
        Destination destination = session.createQueue("wdksoft_queue");
        // 消費者,消息接收者
        MessageConsumer consumer = session.createConsumer(destination);
        while (true) {
            //監聽消息
            TextMessage message = (TextMessage) consumer.receive();
            if (null != message) {
                System.out.println("收到消息:" + message.getText());
            } else
                break;
        }
        session.close();
        connection.close();
    }

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM