ActiveMQ入門之四--ActiveMQ持久化方式


  消息持久性對於可靠消息傳遞來說應該是一種比較好的方法,有了消息持久化,即使發送者和接受者不是同時在線或者消息中心在發送者發送消息后宕機了,在消息中心重新啟動后仍然可以將消息發送出去,如果把這種持久化和ReliableMessaging結合起來應該是很好的保證了消息的可靠傳送。

  消息持久性的原理很簡單,就是在發送者將消息發送出去后,消息中心首先將消息存儲到本地數據文件、內存數據庫或者遠程數據庫等,然后試圖將消息發送給接收者,發送成功則將消息從存儲中刪除,失敗則繼續嘗試。消息中心啟動以后首先要檢查制定的存儲位置,如果有未發送成功的消息,則需要把消息發送出去。

ActiveMQ持久化方式:AMQ、KahaDB、JDBC、LevelDB。

1、AMQ

AMQ是一種文件存儲形式,它具有寫入速度快和容易恢復的特點。消息存儲在一個個文件中,文件的默認大小為32M,如果一條消息的大小超過了 32M,那么這個值必須設置大一點。當一個存儲文件中的消息已經全部被消費,那么這個文件將被標識為可刪除,在下一個清除階段,這個文件被刪除。AMQ適用於ActiveMQ5.3之前的版本。默認配置如下:

<persistenceAdapter>
   <amqPersistenceAdapter directory="activemq-data" maxFileLength="32mb"/>
</persistenceAdapter>

屬性如下:

屬性名稱

默認值

描述

directory

activemq-data

消息文件和日志的存儲目錄

useNIO

true

使用NIO協議存儲消息

syncOnWrite

false

同步寫到磁盤,這個選項對性能影響非常大

maxFileLength

32Mb

一個消息文件的大小

persistentIndex

true

消息索引的持久化,如果為false,那么索引保存在內存中

maxCheckpointMessageAddSize

4kb

一個事務允許的最大消息量

cleanupInterval

30000

清除操作周期,單位ms

indexBinSize

1024

索引文件緩存頁面數,缺省為1024,當amq擴充或者縮減存儲時,會鎖定整個broker,導致一定時間的阻塞,所以這個值應該調整到比較大,但是代碼中實現會動態伸縮,調整效果並不理想。

indexKeySize

96

索引key的大小,key是消息ID

indexPageSize

16kb

索引的頁大小

directoryArchive

archive

存儲被歸檔的消息文件目錄

archiveDataLogs

false

當為true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除                    

2、KahaDB

KahaDB是基於文件的本地數據庫儲存形式,雖然沒有AMQ的速度快,但是它具有強擴展性,恢復的時間比AMQ短,從5.4版本之后KahaDB做為默認的持久化方式。默認配置如下:

KahaDB的屬性如下:

屬性名稱

默認值

描述

directory

activemq-data

消息文件和日志的存儲目錄

indexWriteBatchSize

1000

一批索引的大小,當要更新的索引量到達這個值時,更新到消息文件中

indexCacheSize

10000

內存中,索引的頁大小

enableIndexWriteAsync

false

索引是否異步寫到消息文件中

journalMaxFileLength

32mb

一個消息文件的大小

enableJournalDiskSyncs

true

是否講非事務的消息同步寫入到磁盤

cleanupInterval

30000

清除操作周期,單位ms

checkpointInterval

5000

索引寫入到消息文件的周期,單位ms

ignoreMissingJournalfiles

false

忽略丟失的消息文件,false,當丟失了消息文件,啟動異常

checkForCorruptJournalFiles

false

檢查消息文件是否損壞,true,檢查發現損壞會嘗試修復

checksumJournalFiles

false

產生一個checksum,以便能夠檢測journal文件是否損壞。

5.4版本之后有效的屬性:

   

archiveDataLogs

false

當為true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除

directoryArchive

null

存儲被歸檔的消息文件目錄

databaseLockedWaitDelay

10000

在使用負載時,等待獲得文件鎖的延遲時間,單位ms

maxAsyncJobs

10000

同個生產者產生等待寫入的異步消息最大量

concurrentStoreAndDispatchTopics

false

當寫入消息的時候,是否轉發主題消息

concurrentStoreAndDispatchQueues

true

當寫入消息的時候,是否轉發隊列消息

5.6版本之后有效的屬性:

   

archiveCorruptedIndex

false

是否歸檔錯誤的索引

每個KahaDB的實例都可以配置單獨的適配器,如果沒有目標隊列提交給filteredKahaDB,那么意味着對所有的隊列有效。如果一個隊列沒有對應的適配器,那么將會拋出一個異常。配置如下:

<persistenceAdapter>
  <mKahaDBdirectory="${activemq.base}/data/kahadb">
    <filteredPersistenceAdapters>
      <!-- match all queues -->
      <filteredKahaDBqueue=">">
        <persistenceAdapter>
          <kahaDBjournalMaxFileLength="32mb"/>
        </persistenceAdapter>
      </filteredKahaDB>
       
      <!-- match all destinations -->
      <filteredKahaDB>
        <persistenceAdapter>
          <kahaDBenableJournalDiskSyncs="false"/>
        </persistenceAdapter>
      </filteredKahaDB>
    </filteredPersistenceAdapters>
  </mKahaDB>
</persistenceAdapter>

如果filteredKahaDB的perDestination屬性設置為true,那么匹配的目標隊列將會得到自己對應的KahaDB實例。配置如下:

<persistenceAdapter>
  <mKahaDBdirectory="${activemq.base}/data/kahadb">
    <filteredPersistenceAdapters>
      <!-- kahaDB per destinations -->
      <filteredKahaDB perDestination="true">
        <persistenceAdapter>
          <kahaDBjournalMaxFileLength="32mb" />
        </persistenceAdapter>
      </filteredKahaDB>
    </filteredPersistenceAdapters>
  </mKahaDB>
</persistenceAdapter>

3、JDBC

可以將消息存儲到數據庫中,例如:Mysql、SQL Server、Oracle、DB2。

配置JDBC適配器:

<persistenceAdapter>
    <jdbcPersistenceAdapterdataSource="#mysql-ds" createTablesOnStartup="false" />
</persistenceAdapter>

dataSource指定持久化數據庫的bean,createTablesOnStartup是否在啟動的時候創建數據表,默認值是true,這樣每次啟動都會去創建數據表了,一般是第一次啟動的時候設置為true,之后改成false。

Mysql持久化bean:
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>
SQL Server持久化bean:
<bean id="mssql-ds" class="net.sourceforge.jtds.jdbcx.JtdsDataSource" destroy-method="close">
   <property name="serverName" value="SERVERNAME"/>
   <property name="portNumber" value="PORTNUMBER"/>
   <property name="databaseName" value="DATABASENAME"/>
   <property name="user" value="USER"/>
   <property name="password" value="PASSWORD"/>
</bean>
Oracle持久化bean:
<bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
    <property name="url" value="jdbc:oracle:thin:@10.53.132.47:1521:activemq"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>
DB2持久化bean:
<bean id="db2-ds" class="org.apache.commons.dbcp.BasicDataSource"  destroy-method="close">
      <property name="driverClassName" value="com.ibm.db2.jcc.DB2Driver"/>
      <property name="url" value="jdbc:db2://hndb02.bf.ctc.com:50002/activemq"/>
      <property name="username" value="activemq"/>
      <property name="password" value="activemq"/>
      <property name="maxActive" value="200"/>
      <property name="poolPreparedStatements" value="true"/>
  </bean>

4、LevelDB

這種文件系統是從ActiveMQ5.8之后引進的,它和KahaDB非常相似,也是基於文件的本地數據庫儲存形式,但是它提供比KahaDB更快的持久性。與KahaDB不同的是,它不是使用傳統的B-樹來實現對日志數據的提前寫,而是使用基於索引的LevelDB。

默認配置如下:

< persistenceAdapter >
       < levelDBdirectory = "activemq-data" />
</ persistenceAdapter >

 

屬性名稱

默認值

描述

directory

"LevelDB"

數據文件的存儲目錄

readThreads

10

系統允許的並發讀線程數量

sync

true

同步寫到磁盤

logSize

104857600 (100 MB)

日志文件大小的最大值

logWriteBufferSize

4194304 (4 MB)

日志數據寫入文件系統的最大緩存值

verifyChecksums

false

是否對從文件系統中讀取的數據進行校驗

paranoidChecks

false

盡快對系統內部發生的存儲錯誤進行標記

indexFactory

org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory

在創建LevelDB索引時使用

indexMaxOpenFiles

1000

可供索引使用的打開文件的數量

indexBlockRestartInterval

16

Number keys between restart points for delta encoding of keys.

indexWriteBufferSize

6291456 (6 MB)

內存中索引數據的最大值

indexBlockSize

4096 (4 K)

每個數據塊的索引數據大小

indexCacheSize

268435456 (256 MB)

使用緩存索引塊允許的最大內存

indexCompression

snappy

適用於索引塊的壓縮類型

logCompression

none

適用於日志記錄的壓縮類型

屬性如下:

5、  下面詳細介紹一下如何將消息持久化到Mysql數據庫中

Ø        需要將mysql的驅動包放置到ActiveMQ的lib目錄下

Ø        修改activeMQ的配置文件:

<persistenceAdapter>
            <!--<kahaDB directory="${activemq.base}/data/kahadb"/>-->
            <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds" useDatabaseLock="false" createTablesOnStartup="false"/>
        </persistenceAdapter>

在配置文件中的broker節點外增加:

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method ="close">
       <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
       <property name="url" value="jdbc:mysql://192.168.9.65:3306/activemq?relaxAutoCommit=true"/>
       <property name="username" value="root"/>
       <property name="password" value="12345678"/>
       <property name="maxActive" value="20"/>
       <property name="poolPreparedStatements" value="true"/>
    </bean>

從配置中可以看出數據庫的名稱是activemq,你需要手動在MySql中增加這個庫。然后重新啟動activeMQ,會發現activemq多了三張表:從配置中可以看出數據庫的名稱是activemq,需要手動在MySql中建立這個數據庫。

1:activemq_acks

2:activemq_lock

3:activemq_msgs

Ø        點到點類型

Sender類:

package com.dxz.activemq.ex2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
    private static final int SEND_NUMBER = 2000;

    public static void main(String[] args) {
        // ConnectionFactory :連接工廠,JMS用它創建連接
        ConnectionFactory connectionFactory;
        // Connection :JMS客戶端到JMS Provider的連接
        Connection connection = null;
        // Session:一個發送或接收消息的線程
        Session session;
        // Destination :消息的目的地;消息發送給誰.
        Destination destination;
        // MessageProducer:消息發送者
        MessageProducer producer;
        // TextMessage message;
        // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現
        connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        try {
            // 構造從工廠得到連接對象
            connection = connectionFactory.createConnection();
            // 啟動
            connection.start();
            // 獲取操作連接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 獲取session,FirstQueue是一個服務器的queue
            destination = session.createQueue("FirstQueue");
            // 得到消息生成者【發送者】
            producer = session.createProducer(destination);
            // 設置不持久化
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 構造消息
            sendMessage(session, producer);
            // session.commit();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != connection) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    // TODO Auto-generatedcatch block
                    e.printStackTrace();
                }
            }
        }
    }

    public static void sendMessage(Session session, MessageProducer producer) throws Exception {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ發送消息" + i);
            System.out.println("發送消息:ActiveMQ發送的消息" + i);
            producer.send(message);
        }
    }
}
View Code

Receiver類:

package com.dxz.activemq.ex2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {
    public static void main(String[] args) {
        // ConnectionFactory :連接工廠,JMS用它創建連接
        ConnectionFactory connectionFactory;
        // Connection :JMS客戶端到JMS Provider的連接
        Connection connection = null;
        // Session:一個發送或接收消息的線程
        Session session;
        // Destination :消息的目的地;消息發送給誰.
        Destination destination;
        // 消費者,消息接收者
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        try {
            // 得到連接對象
            connection = connectionFactory.createConnection();
            // 啟動
            connection.start();
            // 獲取操作連接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 創建Queue
            destination = session.createQueue("FirstQueue");
            consumer = session.createConsumer(destination);
            while (true) {
                // 設置接收者接收消息的時間,為了便於測試,這里定為100s
                TextMessage message = (TextMessage) consumer.receive(100000);
                if (null != message) {
                    System.out.println("收到消息" + message.getText());
                } else
                    break;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}
View Code

測試一:測試:

A、 先運行Sender類,待運行完畢后,運行Receiver類

B、 在此過程中activemq數據庫的activemq_msgs表中沒有數據

C、 再次運行Receiver,消費不到任何信息

測試二:

A、  先運行Sender類

B、 重啟電腦

C、 運行Receiver類,無任何信息被消費

測試三:

A、   把Sender類中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改為producer.setDeliveryMode(DeliveryMode.PERSISTENT);

B、   先運行Sender類,待運行完畢后,運行Receiver類

C、   在此過程中activemq數據庫的activemq_msgs表中有數據生成,運行完Receiver類后,數據清除

測試四:

A、    把Sender類中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改為producer.setDeliveryMode(DeliveryMode.PERSISTENT);

B、    運行Sender類

C、    重啟電腦

D、    運行Receiver類,有消息被消費

結論:   

通過以上測試,可以發現,在P2P類型中當DeliveryMode設置為NON_PERSISTENCE時,消息被保存在內存中,而當 DeliveryMode設置為PERSISTENCE時,消息保存在broker的相應的文件或者數據庫中。而且P2P中消息一旦被Consumer消 費就從broker中刪除。

Ø        發布/訂閱類型

Sender類:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import  javax.jms.Connection;
import  javax.jms.ConnectionFactory;
import  javax.jms.DeliveryMode;
import  javax.jms.Destination;
import  javax.jms.JMSException;
import  javax.jms.MessageProducer;
import  javax.jms.Session;
import  javax.jms.TextMessage;
import  javax.jms.Topic;
import  org.apache.activemq.ActiveMQConnection;
import  org.apache.activemq.ActiveMQConnectionFactory;
public  class  Sender {
      private  static  final  int  SEND_NUMBER =  100  ;
      public  static  void  main(String[] args) {
         // ConnectionFactory :連接工廠,JMS用它創建連接
         ConnectionFactory connectionFactory;
         // Connection :JMS客戶端到JMS Provider的連接
         Connection connection =  null  ;
          // Session:一個發送或接收消息的線程
         Session session;
         // MessageProducer:消息發送者
         MessageProducer producer;
          // TextMessage message;
          // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現
         connectionFactory =  new  ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                 ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616"  );
         try  {
             //得到連接對象
             connection = connectionFactory.createConnection();
             //啟動
             connection.start();
             //獲取操作連接
             session = connection.createSession(  false  , Session.AUTO_ACKNOWLEDGE);       
             Topic topic = session.createTopic(  "MQ_test"  );      
             // 得到消息生成者【發送者】
             producer = session.createProducer(topic);
             //設置持久化
             producer.setDeliveryMode(DeliveryMode.PERSISTENT);
             //構造消息
             sendMessage(session, producer);
             //session.commit();
             connection.close();
         }
         catch  (Exception e){
             e.printStackTrace();
         }  finally  {
             if  (  null  != connection){
                try  {
                    connection.close();
                catch  (JMSException e) {
                    // TODO Auto-generatedcatch block
                    e.printStackTrace();
                }
             }   
         }
      }
      public  static  void  sendMessage(Session session, MessageProducer producer)  throws  Exception{
         for  (  int  i=  1  ; i<=SEND_NUMBER; i++){
             TextMessage message = session.createTextMessage(  "ActiveMQ發送消息"  +i);
             System.out.println(  "發送消息:ActiveMQ發送的消息"  +i);
             producer.send(message);
         }
      }
}

Receiver類:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import  javax.jms.Connection;
import  javax.jms.ConnectionFactory;
import  javax.jms.Destination;
import  javax.jms.MessageConsumer;
import  javax.jms.Session;
import  javax.jms.TextMessage;
import  javax.jms.Topic;
  
import  org.apache.activemq.ActiveMQConnection;
import  org.apache.activemq.ActiveMQConnectionFactory;
public  class  Receiver {
      public  static  void  main(String[] args) {
         // ConnectionFactory :連接工廠,JMS用它創建連接
          ConnectionFactory connectionFactory;
          // Connection :JMS客戶端到JMS Provider的連接
          Connection connection =  null  ;
          // Session:一個發送或接收消息的線程
          Session session;
          // 消費者,消息接收者
          MessageConsumer consumer;
          connectionFactory = newActiveMQConnectionFactory(
                 ActiveMQConnection.DEFAULT_USER,
                  ActiveMQConnection.DEFAULT_PASSWORD,
                  "tcp://localhost:61616"  );
          try  {
              // 構造從工廠得到連接對象
              connection =connectionFactory.createConnection();
             
              connection.setClientID(  "clientID001"  );
              // 啟動
              connection.start();
              // 獲取操作連接
              session = connection.createSession(  false  ,
                      Session.AUTO_ACKNOWLEDGE);
              // 獲取session
             Topic topic = session.createTopic(  "MQ_test"  );      
             // 得到消息生成者【發送者】
             consumer = session.createDurableSubscriber(topic,  "MQ_sub"  );
            
              while  (  true  ){
                //設置接收者接收消息的時間,為了便於測試,這里誰定為100s
                TextMessagemessage = (TextMessage)consumer.receive(  100000  );
                if  (  null  != message){
                   System.out.println(  "收到消息"  +message.getText());
                }  else  break  ;
              }
          }  catch  (Exception e){
          e.printStackTrace();
          }  finally  {
              try  {
                  if  (  null  != connection)
                      connection.close();
              catch  (Throwable ignore) {
              }
          }
      }
  
}

測試:

測試一:

A、先啟動Sender類

B、再啟動Receiver類

C、結果無任何記錄被訂閱

測試二:

A、先啟動Receiver類,讓Receiver在相關主題上進行訂閱

B、停止Receiver類,再啟動Sender類

C、待Sender類運行完成后,再啟動Receiver類

D、結果發現相應主題的信息被訂閱


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM