ActiveMQ消息存儲持久化


  ActiveMQ不僅支持persistent和non-persistent兩種方式,還支持消息的recovery(恢復)方式。

1.PTP

  Queue的存儲是很簡單的,就是一個FIFO的Queue

  

2.PUB/SUB

  對於持久化訂閱主題,每一個消費者將獲得一個消息的復制。

  

3.有效的消息存儲

  ActiveMQ提供了一個插件式的消息存儲,類似於消息的多點傳播,主要實現了如下幾種:

1.AMQ消息存儲-基於文件的存儲,以前默認的存儲方式

2.KahaDB消息存儲-提供了容量的提升和恢復能力,現在的默認方式

3.JDBC消息存儲-消息基於JDBC存儲

4.Memory消息存儲-基於內存的消息存儲

 

3.1 kahaDB消息存儲--目前默認的存儲方式

(1)kahaDB Message Store概述

  KahaDB是目前默認的存儲方式,可用於任何場景,提高了性能和恢復能力。消息存儲使用一個事務日志和僅僅用一個索引文件來存儲它所有的地址。
  KahaDB是一個專門針對消息持久化的解決方案,它對典型的消息使用模式進行了優化。在kaha中,數據被追加到 data logs中。 當不再需要log文件中的數據的時候,log文件會被丟棄。

(2)基本配置方式:    在activemq的安裝目錄下的:conf/activemq.xml中有如下配置:(默認配置)

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>

   如上面配置了文件的位置,現在我們查看kahadb的文件位置:

(3)可用的屬性有:

 1.   director: KahaDB存放的路徑,默認值activemq-data
2.   indexWriteBatchSize: 批量寫入磁盤的索引page數量,默認值為1000
3.   indexCacheSize: 內存中緩存索引page的數量,默認值10000
4.   enableIndexWriteAsync: 是否異步寫出索引,默認false
5.   journalMaxFileLength: 設置每個消息data log的大小,默認是32MB
6.   enableJournalDiskSyncs: 設置是否保證每個沒有事務的內容,被同步寫入磁盤,JMS持久化的時候需要,默認為true
7.   cleanupInterval: 在檢查到不再使用的消息后,在具體刪除消息前的時間,默認30000
8.   checkpointInterval: checkpoint的間隔時間,默認是5000
9.   ignoreMissingJournalfiles: 是否忽略丟失的消息日志文件,默認false
10.  checkForCorruptJournalFiles: 在啟動的時候,將會驗證消息文件是否損壞,默認false
11.  checksumJournalFiles: 是否為每個消息日志文件提供checksum,默認false
12.  archiveDataLogs: 是否移動文件到特定的路徑,而不是刪除它們,默認false
13.  directoryArchive: 定義消息已經被消費過后,移動data log到的路徑,默認null
14.  databaseLockedWaitDelay: 獲得數據庫鎖的等待時間(used by shared master/slave),默認10000。用於之后主從復制的時候配置
15.  maxAsyncJobs: 設置最大的可以存儲的異步消息隊列,默認值10000,可以和concurrent MessageProducers設置成一樣的值。
16.  concurrentStoreAndDispatchTransactions:是否分發消息到客戶端,同時事務存儲消息,默認true
17.  concurrentStoreAndDispatchTopics: 是否分發Topic消息到客戶端,同時進行存儲,默認true
18.  concurrentStoreAndDispatchQueues: 是否分發queue消息到客戶端,同時進行存儲,默認true

(4)Java內嵌Broker使用kahadb的例子:

package cn.qlq.activemq.broker;

import java.io.File;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBStore;

public class BrokerUeingKahadb {
    public static void main(String[] args) throws Exception {
        BrokerService brokerService = new BrokerService();
        File dataKahadbFile = new File("data/kahadb");

        KahaDBStore kahaDBStore = new KahaDBStore();
        kahaDBStore.setDirectory(dataKahadbFile);
        kahaDBStore.setJournalDiskSyncInterval(1024 * 1000);
        kahaDBStore.setIndexWriteBatchSize(100);
        kahaDBStore.setEnableIndexWriteAsync(true);

        brokerService.setPersistenceAdapter(kahaDBStore);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }
}

 

3.2  AMQ Message Store消息存儲

(1)概述:

  AMQ Message Store是ActiveMQ5.0缺省的持久化存儲,它是一個基於文件、事務存儲設計為快速消息存儲的一個結構,該結構是以流的形式來進行消息交互的。
  這種方式中,Messages被保存到data logs中,同時被reference store進行索引以提高存取速度。Data logs由一些簡單的data log文件組成,缺省的文件大小是32M,如果某個消息的大小超過了data log文件的大小,那么可以修改配置以增加data log文件的大小。如果某個data log文件中所有的消息都被成功消費了,那么這個data log文件將會被標記,以便在下一輪的清理中被刪除或者歸檔。

(2)配置示例如下:(用下面的配置方式替換掉activemq.xml的 persistenceAdapter 即可,也就是該配置需要作為broker的屬性)

        <persistenceAdapter>
            <amqPersistenceAdapter directory="${activemq.data}/data" maxFileLength="32mb"/>
        </persistenceAdapter>

 

3.3  JDBC持久化存儲

   ActiveMQ支持使用JDBC來持久化消息,我們只需要配置JDBC驅動即可,至於表結構activemq會自動幫我們建好表結構。

我使用的activemq的版本是:5.15.8,我的配置方式如下:

(1)拷貝mysql的驅動包  mysql-connector-java-5.1.37-bin.jar   到     apache-activemq-5.15.8\lib\optional\  目錄下

(2)修改apache-activemq-5.15.8\conf\activemq.xml

首先定義dataSource(該dataSource位於apache-activemq-5.15.8\lib\optional\ commons-dbcp2-2.1.1.jar包內,當然可以換成我們自己的c3p0,durid等連接池)

    <bean id="mysql_ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
        <property name="driverClassName">
            <value>com.mysql.jdbc.Driver</value>
        </property>
        <property name="url">
            <value>jdbc:mysql://localhost:3306/activemq</value>
        </property>
        <property name="username">
            <value>root</value>
        </property>
        <property name="password">
            <value>123456</value>
        </property>
    </bean>

 

修改broker的persistenceAdapter持久化方式:

        <persistenceAdapter>
            <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql_ds"/>
        </persistenceAdapter>

 

(3)啟動項目發現activemq會自動幫我們創建好表結構,如下三張表:

mysql> show tables;
+--------------------+
| Tables_in_activemq |
+--------------------+
| activemq_acks      |
| activemq_lock      |
| activemq_msgs      |
+--------------------+
3 rows in set (0.00 sec)

mysql> desc activemq_msgs;
+------------+--------------+------+-----+---------+-------+
| Field      | Type         | Null | Key | Default | Extra |
+------------+--------------+------+-----+---------+-------+
| ID         | bigint(20)   | NO   | PRI | NULL    |       |
| CONTAINER  | varchar(250) | NO   | MUL | NULL    |       |
| MSGID_PROD | varchar(250) | YES  | MUL | NULL    |       |
| MSGID_SEQ  | bigint(20)   | YES  |     | NULL    |       |
| EXPIRATION | bigint(20)   | YES  | MUL | NULL    |       |
| MSG        | longblob     | YES  |     | NULL    |       |
| PRIORITY   | bigint(20)   | YES  | MUL | NULL    |       |
| XID        | varchar(250) | YES  | MUL | NULL    |       |
+------------+--------------+------+-----+---------+-------+
8 rows in set (0.00 sec)

mysql> desc activemq_acks;
+---------------+--------------+------+-----+---------+-------+
| Field         | Type         | Null | Key | Default | Extra |
+---------------+--------------+------+-----+---------+-------+
| CONTAINER     | varchar(250) | NO   | PRI | NULL    |       |
| SUB_DEST      | varchar(250) | YES  |     | NULL    |       |
| CLIENT_ID     | varchar(250) | NO   | PRI | NULL    |       |
| SUB_NAME      | varchar(250) | NO   | PRI | NULL    |       |
| SELECTOR      | varchar(250) | YES  |     | NULL    |       |
| LAST_ACKED_ID | bigint(20)   | YES  |     | NULL    |       |
| PRIORITY      | bigint(20)   | NO   | PRI | 5       |       |
| XID           | varchar(250) | YES  | MUL | NULL    |       |
+---------------+--------------+------+-----+---------+-------+
8 rows in set (0.00 sec)

mysql> desc activemq_lock;
+-------------+--------------+------+-----+---------+-------+
| Field       | Type         | Null | Key | Default | Extra |
+-------------+--------------+------+-----+---------+-------+
| ID          | bigint(20)   | NO   | PRI | NULL    |       |
| TIME        | bigint(20)   | YES  |     | NULL    |       |
| BROKER_NAME | varchar(250) | YES  |     | NULL    |       |
+-------------+--------------+------+-----+---------+-------+
3 rows in set (0.00 sec)

 

其建表語句如下:(可以看出acks表是多列做聯合主鍵)

mysql> show create table activemq_acks\G
*************************** 1. row ***************************
       Table: activemq_acks
Create Table: CREATE TABLE `activemq_acks` (
  `CONTAINER` varchar(250) NOT NULL,
  `SUB_DEST` varchar(250) DEFAULT NULL,
  `CLIENT_ID` varchar(250) NOT NULL,
  `SUB_NAME` varchar(250) NOT NULL,
  `SELECTOR` varchar(250) DEFAULT NULL,
  `LAST_ACKED_ID` bigint(20) DEFAULT NULL,
  `PRIORITY` bigint(20) NOT NULL DEFAULT '5',
  `XID` varchar(250) DEFAULT NULL,
  PRIMARY KEY (`CONTAINER`,`CLIENT_ID`,`SUB_NAME`,`PRIORITY`),
  KEY `ACTIVEMQ_ACKS_XIDX` (`XID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
1 row in set (0.00 sec)

mysql> show create table activemq_msgs\G
*************************** 1. row ***************************
       Table: activemq_msgs
Create Table: CREATE TABLE `activemq_msgs` (
  `ID` bigint(20) NOT NULL,
  `CONTAINER` varchar(250) NOT NULL,
  `MSGID_PROD` varchar(250) DEFAULT NULL,
  `MSGID_SEQ` bigint(20) DEFAULT NULL,
  `EXPIRATION` bigint(20) DEFAULT NULL,
  `MSG` longblob,
  `PRIORITY` bigint(20) DEFAULT NULL,
  `XID` varchar(250) DEFAULT NULL,
  PRIMARY KEY (`ID`),
  KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`),
  KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`),
  KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`),
  KEY `ACTIVEMQ_MSGS_PIDX` (`PRIORITY`),
  KEY `ACTIVEMQ_MSGS_XIDX` (`XID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
1 row in set (0.00 sec)

mysql> show create table activemq_lock\G
*************************** 1. row ***************************
       Table: activemq_lock
Create Table: CREATE TABLE `activemq_lock` (
  `ID` bigint(20) NOT NULL,
  `TIME` bigint(20) DEFAULT NULL,
  `BROKER_NAME` varchar(250) DEFAULT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
1 row in set (0.00 sec)

 

解釋上面三張表:

  1.消息表,缺省表明為ACTIVEMQ_MSGS, queue和topic都存儲在里面

    ID:自增主鍵;  CONTAINER:所屬類型以及所在隊列或者主題;  MSGID_PROD:生產者ID;MSGID_SEQ:在同一批次消息中的序號;  EXPIRATION:過期時間(表示永不過期);  MSG:序列化之后的消息;  PRIORITY優先級;  XID:暫時不知道這個有什么用。

  2.ACTIVEMQ_ACKS表存儲持久訂閱的信息和最后一個持久訂閱接收的消息ID

    CONTAINER:與上面消息表的CONTAINER一樣;  SUB_DEST:子目的地,與CONTAINER一樣; 

    CLIENT_ID: 鏈接的客戶端ID,也就是我們程序:connection.setClientID("cc1"); 產生的ID

    SUB_NAME:持久訂閱者的名稱.也就是我們程序: session.createDurableSubscriber(destination, "C11"); 產生的名稱

    SELECTOR:消息選擇器,consumer可以選擇自己想要的 

    LAST_ACKED_ID:最后一次確認ID,這個字段存的該該訂閱者最后一次收到的消息的ID 

    XID:暫時不知道這個有什么用。

  3.鎖定表,缺省表名為ACTIVEMQ_LOCK,用來確保在某一時刻,只能有一個ActiveMQ broker實例來訪問數據庫

     XID:自增的主鍵

    TIME:日期

    BROKER_NAME:占用數據庫的brokerName

 

(4)測試持久化存儲和訂閱

lock表一直是只有一條數據:

mysql> select * from activemq_lock;
+----+------+-------------+
| ID | TIME | BROKER_NAME |
+----+------+-------------+
|  1 | NULL | NULL        |
+----+------+-------------+
1 row in set (0.00 sec)

 

  我們按照如下順序進行操作:

  • 發送2條 消息到queue,查看數據庫結構:

 

  • 啟動持久訂閱主題消費者

 

  • 發布5條消息都主題中:

消息表如下:

 acks表如下:

 

  •    啟動隊列消費者消費隊列信息(發現隊列的消息被刪除)

 

  •  再次啟動主題生產者生產消息

 

補充:Java內嵌Broker使用JDBC持久化存儲

(1)依賴的jar包

(2)broker啟動的代碼

package cn.qlq.activemq;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.commons.dbcp2.BasicDataSource;

public class BrokerUsingJDBC {
    public static void main(String[] args) throws Exception {
        BrokerService brokerService = new BrokerService();

        // 1.創建數據源
        BasicDataSource dataSource = new BasicDataSource();
        dataSource.setUrl("jdbc:mysql://localhost:3306/activemq");
        dataSource.setUsername("root");
        dataSource.setPassword("123456");
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        
        // 2.創建JDBCPersistenceAdapter
        JDBCPersistenceAdapter Adapter = new JDBCPersistenceAdapter();
        Adapter.setDataSource(dataSource);

        brokerService.setBrokerName("brokerName");
        brokerService.setUseJmx(true);
        brokerService.setPersistenceAdapter(Adapter);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }
}

 

  測試結果以及代碼同上面。關於整合spring的broker啟動方式也是類似注入正確的bean即可。

 

補充:JDBC Message Store with ActiveMQ Journal====優化版的JDBC存儲

  這種方式克服了JDBC Store的不足,使用快速的緩存寫入技術,大大提高了性能。

  JDBC 配合其自帶的 high performance journal;根據官方說法,它內置的高性能journal的工作類似於在緩存層工作,消息會優先寫入到journal,后台的定時任務會每隔一段時間間隔去。

JDBC Store和JDBC Message Store with ActiveMQ Journal的區別

    1. JDBC with journal的性能優於jdbc
  2. JDBC用於master/slave模式的數據庫分享
  3. JDBC with journal不能用於master/slave模式
  4. 一般情況下,推薦使用jdbc with journal

 

其配置方式如下:注釋掉原來的持久化適配器,並注入持久化工廠

        <!--
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql_ds"/>
        </persistenceAdapter>
        -->
        
        <persistenceFactory>
            <journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768" useJournal="true" useQuickJournal="true" dataSource="#mysql_ds" dataDirectory="activemq-data"/>
        </persistenceFactory>

 

其java啟動broker的使用方式如下:

(1)需要的jar包:

(2)Broker啟動類:

package cn.qlq.activemq;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory;
import org.apache.commons.dbcp2.BasicDataSource;

public class BrokerUsingJDBC {
    public static void main(String[] args) throws Exception {
        BrokerService brokerService = new BrokerService();

        // 1.創建數據源
        BasicDataSource dataSource = new BasicDataSource();
        dataSource.setUrl("jdbc:mysql://localhost:3306/activemq");
        dataSource.setUsername("root");
        dataSource.setPassword("123456");
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");

        // 2.創建PersistenceAdapterFactory
        JournalPersistenceAdapterFactory persistenceFactory = new JournalPersistenceAdapterFactory();
        persistenceFactory.setDataSource(dataSource);
        persistenceFactory.setJournalLogFiles(4);
        persistenceFactory.setJournalLogFileSize(32768);
        persistenceFactory.setUseJournal(true);
        persistenceFactory.setUseQuickJournal(true);

        // 3.設置持久化工廠並啟動broker
        brokerService.setBrokerName("brokerName");
        brokerService.setUseJmx(true);
        brokerService.setPersistenceFactory(persistenceFactory);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }
}

 

3.4 Memory Message Store

  內存消息存儲主要是存儲所有的持久化的消息在內存中。這里沒有動態的緩存存在,所以你必須注意設置你的broker所在的JVM和內存限制。

  這種方式的持久化消息只在當前JVM內有效,當重啟JVM之后會丟失持久化的消息。

 

配置方式如下:只需要將  persistent  屬性設為false即可。

    <broker xmlns="http://activemq.apache.org/schema/core" persistent="false" brokerName="brokerName" dataDirectory="${activemq.data}">

 

其內嵌Java的broker方式如下:

        BrokerService brokerService = new BrokerService();

        brokerService.setPersistent(false);
        brokerService.setBrokerName("brokerName");
        brokerService.setUseJmx(true);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM