持久化消息和非持久化消息的存儲原理:
正常情況下,非持久化消息是存儲在內存中的,持久化消息是存儲在文件中的。能夠存儲的最大消息數據在${ActiveMQ_HOME}/conf/activemq.xml文件中的systemUsage節點SystemUsage配置設置了一些系統內存和硬盤容量。
<systemUsage>
<systemUsage>
<memoryUsage>
//該子標記設置整個ActiveMQ節點的“可用內存限制”。這個值不能超過ActiveMQ本身設置的最大內存大小。其中的
//percentOfJvmHeap屬性表示百分比。占用70%的堆內存
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
//該標記設置整個ActiveMQ節點,用於存儲“持久化消息”的“可用磁盤空間”。該子標記的limit屬性必須要進行設置
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
//一旦ActiveMQ服務節點存儲的消息達到了memoryUsage的限制,非持久化消息就會被轉儲到 temp store區域,雖然
//我們說過非持久化消息不進行持久化存儲,但是ActiveMQ為了防止“數據洪峰”出現時非持久化消息大量堆積致使內存耗
//盡的情況出現,還是會將非持久化消息寫入到磁盤的臨時區域——temp store。這個子標記就是為了設置這個temp
//store區域的“可用磁盤空間限制
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
Ø 從上面的配置我們需要得到一個結論,當非持久化消息堆積到一定程度的時候,也就是內存超過指定的設置閥值時,ActiveMQ會將內存中的非持久化消息寫入到臨時文件,以便騰出內存。但是它和持久化消息的區別是,重啟之后,持久化消息會從文件中恢復,非持久化的臨時文件會直接刪除。
消息的持久化策略:
消息持久性對於可靠消息傳遞來說是一種比較好的方法,即時發送者和接受者不是同時在線或者消息中心在發送者發送消息后宕機了,在消息中心重啟后仍然可以將消息發送出去。消息持久性的原理很簡單,就是在發送消息出去后,消息中心首先將消息存儲在本地文件、內存或者遠程數據庫,然后把消息發送給接受者,發送成功后再把消息從存儲中刪除,失敗則繼續嘗試。接下來我們來了解一下消息在broker上的持久化存儲實現方式。
持久化存儲支持類型:
ActiveMQ支持多種不同的持久化方式,主要有以下幾種,不過,無論使用哪種持久化方式,消息的存儲邏輯都是一致的。
Ø KahaDB存儲(默認存儲方式)。
Ø JDBC存儲。
Ø Memory存儲。
Ø LevelDB存儲。
Ø JDBC With ActiveMQ Journal。
KahaDB存儲:
KahaDB是目前默認的存儲方式,可用於任何場景,提高了性能和恢復能力。消息存儲使用一個事務日志和僅僅用一個索引文件來存儲它所有的地址。KahaDB是一個專門針對消息持久化的解決方案,它對典型的消息使用模式進行了優化。在Kaha中,數據被追加到data logs中。當不再需要log文件中的數據的時候,log文件會被丟棄。
配置方式:在${ActiveMQ_HOME}/conf/activemq.xml文件中:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
KahaDB的存儲原理:
在data/kahadb這個目錄下,會生成四個文件
Ø db.data 它是消息的索引文件,本質上是B-Tree(B樹),使用B-Tree作為索引指向db-*.log里面存儲的消息。
Ø db.redo 用來進行消息恢復。
Ø db-*.log 存儲消息內容。新的數據以APPEND的方式追加到日志文件末尾。屬於順序寫入,因此消息存儲是比較快的。默認是32M,達到閥值會自動遞增。
Ø lock文件 鎖,表示當前獲得kahadb讀寫權限的broker。
JDBC存儲:
使用JDBC持久化方式,數據庫會創建3個表:activemq_msgs,activemq_acks和activemq_lock。
ACTIVEMQ_MSGS :消息表,queue和topic都存在這個表中
ACTIVEMQ_ACKS :存儲持久訂閱的信息和最后一個持久訂閱接收的消息ID
ACTIVEMQ_LOCKS :鎖表,用來確保某一時刻,只能有一個ActiveMQ broker實例來訪問數據庫
JDBC存儲實踐:
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="# MySQL-DS " createTablesOnStartup="true" />
</persistenceAdapter>
dataSource指定持久化數據庫的bean,createTablesOnStartup是否在啟動的時候創建數據表,默認值是true,這樣每次啟動都會去創建數據表了,一般是第一次啟動的時候設置為true,之后改成false
Mysql持久化Bean配置:
<bean id="Mysql-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://192.168.11.156:3306/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="root"/> </bean>
配置完以后需要往 ${ActiveMQ_HOME}/lib 文件夾中添加相應 jar 包:然后重啟就OK了。

LevelDB存儲:
LevelDB持久化性能高於KahaDB,雖然目前默認的持久化方式仍然是KahaDB。並且,在ActiveMQ 5.9版本提供了基於LevelDB和Zookeeper的數據復制方式,用於Master-slave方式的首選數據復制方案。不過,據ActiveMQ官網對LevelDB的表述:LevelDB官方建議使用以及不再支持,推薦使用的是KahaDB。
<persistenceAdapter>
<levelDBdirectory="activemq-data"/>
</persistenceAdapter>
Memory 消息存儲:
基於內存的消息存儲,內存消息存儲主要是存儲所有的持久化的消息在內存中。persistent=”false”,表示不設置持久化存儲,直接存儲到內存中。
<beans> <broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core"> <transportConnectors> <transportConnector uri="tcp://localhost:61635"/> </transportConnectors> </broker> </beans>
JDBC Message store with ActiveMQ Journal:
這種方式克服了JDBC Store的不足,JDBC每次消息過來,都需要去寫庫和讀庫。ActiveMQ Journal,使用高速緩存寫入技術,大大提高了性能。當消費者的消費速度能夠及時跟上生產者消息的生產速度時,journal文件能夠大大減少需要寫入到DB中的消息。舉個例子,生產者生產了1000條消息,這1000條消息會保存到journal文件,如果消費者的消費速度很快的情況下,在journal文件還沒有同步到DB之前,消費者已經消費了90%的以上的消息,那么這個時候只需要同步剩余的10%的消息到DB。如果消費者的消費速度很慢,這個時候journal文件可以使消息以批量方式寫到DB。
Ø 將原來的 persistenceAdapter 標簽注釋掉
Ø 添加如下標簽
<persistenceFactory>
<journalPersistenceAdapterFactory dataSource="#Mysql-DS" dataDirectory="activemqdata"/>
</persistenceFactory>
