深入淺出 JMS(四) - ActiveMQ 消息存儲
一、消息的存儲方式
ActiveMQ 支持 JMS 規范中的持久化消息與非持久化消息
- 持久化消息通常用於不管是否消費者在線,它們都會保證消息會被消費者消費。當消息被確認消費后,會從存儲中刪除
- 非持久化消息通常用於發送通知以及實時數據,通常要求性能優先,消息可靠性並不是必須的情況
- MQ 支持可插拔式的消息存儲,如:內存、文件和關系數據庫等方式
Queue 消息模型在 ActiveMQ 的存儲:
采用存儲采用先進先出(FIFO),一個消息只能被一個消費者消費,當消息被確認消費之后才會被刪除。
Topic消息模型(針對持久訂閱):
每個訂閱者獲取的消息實際是消息的一個副本,只有一個消息副本會被存儲,MQ 提供了一個指針來指向消息存儲並且分發消息副本到訂閱者,消息直到所有的持久化訂閱者都被接收才能被刪除。
持久化存儲方式:
- KahaDB 消息存儲
- AMQ 消息存儲
- JDBC 消息存儲
- 內存消息存儲
二、KahaDB 存儲方式
KahaDB 是從 ActiveMQ 5.4 開始默認的持久化插件。KahaDb 恢復時間遠遠小於其前身 AMQ 並且使用更少的數據文件,所以可以完全代替 AMQ,kahaDB 的持久化機制同樣是基於日志文件,索引和緩存。
(一)KahaDB 主要特性:
- 日志形式存儲消息;
- 消息索引以 B-Tree 結構存儲,可以快速更新;
- 完全支持 JMS 事務;
- 支持多種恢復機制;
(二)適用場景:
高吞吐量的應用程序
存儲大數據量的消息
(三)配置方式 conf/activemq.xml:
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
(四)KahaDB 存儲原理
當有活動消費者時,用於臨時存儲,消息會被發送給消費着,同時被安排將被存儲,如果消息及時被確認,就不需要寫入到磁盤。寫入到磁盤中的數據消息,在后續的消息活動中,如果消息發送成功,變標記為可刪除的。系統會周期性的清除或者歸檔日志文件。
(1) KahaDB 內部結構
- Data logs:消息日志包含了消息日志和一些命令
- Cache:當有活動消費者時,用於臨時存儲,消息會被發送給消費着,同時被安排將被存儲,如果消息及時被確認,這不需要寫入到磁盤
- Btree indexes(消息索引):用於引用消息日志(message id),它存儲在內存中,這樣能快速定位到。MQ會定期將內存中的消息索引保存到 metadata store 中,避免大量消息未發送時,消息索引占用過多內存空間。
- Redo log:用於在非正常關機情況下維護索引完整性。
(2) 目錄結構
- Db log files:用於存儲消息(默認大小32M),當 log 日志滿了,會創建一個新的,當 log 日志中的消息都被刪除,該日志文件會被刪除或者歸檔。
- Archive directory:當 datalog 不在被 kahadb 需要會被歸檔(通過 archiveDataLogs 屬性控制)。
- Db.data:存放 Btree indexs。
- Db.redo:存放 redo file,用於恢復 Btree indexs。
三、AMQ 消息存儲
寫入消息時,會將消息寫入日志文件,由於是順序追加寫,性能很高。為了提升性能,創建消息主鍵索引,並且提供緩存機制,進一步提升性能。每個日志文件的大小都是有限制的(默認32m,可自行配置)。當超過這個大小,系統會重新建立一個文件。當所有的消息都消費完成,系統會刪除這個文件或者歸檔(取決於配置)。主要的缺點是 AMQ Message 會為每一個 Destination 創建一個索引,如果使用了大量的 Queue,索引文件的大小會占用很多磁盤空間。而且由於索引巨大,一旦 Broker 崩潰,重建索引的速度會非常慢。
特點:類似 KahaDB,也包含了事務日志,每個 destination 都包含一個 index 文件,AMQ 適用於高吞吐量的應用場景,但是不適合多個隊列的場景。
配置方式 conf/activemq.xml:
<!--AMQ directory:數據存儲路徑 syncOnWrite:是否同步寫入 maxFileLength:日志文件大小 --> <persistenceAdapter> <amqPersistenceAdapter directory="${activemq.data}/AMQdb" syncOnWrite="true" maxFileLength="10mb" /> </persistenceAdapter>
(1) AMQ內部結構
- Data logs:消息日志包含了消息日志
- Cache:用於消息的快速檢索
- Reference store indexes:用於引用 datalogs 中的消息,通過 message ID 關聯
(2) 目錄結構
- Lock:保證同一時間只有一個 borker 訪問文件目錄
- temp-storag:用於存儲非持久化消息(當不在被存儲在內存中),如等待慢消費者處理消息
- Kr-store:用於存儲引用消息日志數據
- journal directory:包含了消息文件、消息日志和消息控制信息
- Archive:歸檔的數據日志
四、JDBC存儲
支持通過 JDBC 將消息存儲到關系數據庫,性能上不如文件存儲,能通過關系型數據庫查詢到消息的信息。
MQ 支持的數據庫:Apache Derby、MySQL、PostgreSQL、Oracle、SQLServer、Sybase、Informix、MaxDB。
存儲表結構:
表 1:ACTIVEMQ_MSGS:用於存儲消息,Queue 和 Topic 都存儲在這個表中:
字段 | 說明 |
---|---|
ID | 自增的數據庫主鍵 |
CONTAINER | 消息的Destination |
MSGID_PROD | 消息發送者客戶端的主鍵 |
MSG_SEQ | 是發送消息的順序,MSGID_PROD+MSG_SEQ可以組成JMS的MessageID |
EXPIRATION | 消息的過期時間,存儲的是從1970-01-01到現在的毫秒數 |
MSG | 消息本體的Java序列化對象的二進制數據 |
PRIORITY | 優先級,從0-9,數值越大優先級越高 |
表 2:ACTIVEMQ_ACKS:用於存儲訂閱關系。如果是持久化Topic,訂閱者和服務器的訂閱關系在這個表保存:
字段 | 說明 |
---|---|
CONTAINER | 消息的Destination |
SUB_DEST | 如果是使用Static集群,這個字段會有集群其他系統的信息 |
CLIENT_ID | 每個訂閱者都必須有一個唯一的客戶端ID用以區分 |
SUB_NAME | 訂閱者名稱 |
SELECTOR | 選擇器,可以選擇只消費滿足條件的消息。條件可以用自定義屬性實現,可支持多屬性AND和OR操作 |
LAST_ACKED_ID | 記錄消費過的消息的ID |
表 3:ACTIVEMQ_LOCK(消息鎖,保證同一時間只能有一個broker訪問這些表結構):
表 activemq_lock 在集群環境中才有用,只有一個 Broker 可以獲得消息,稱為 Master Broker,其他的只能作為備份等待 Master Broker 不可用,才可能成為下一個 Master Broker。這個表用於記錄哪個 Broker 是當前的 Master Broker。
配置方式:
1、配置數據源 conf/acticvemq.xml 文件:
<!-- 配置數據源--> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="111111"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>
2、配置 broke 中的 persistenceAdapter
dataSource 指定持久化數據庫的 bean,createTablesOnStartup 是否在啟動的時候創建數據表,默認值是 true,這樣每次啟動都會去創建數據表了,一般是第一次啟動的時候設置為 true,之后改成 false。
<!-- JDBC配置 --> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/> </persistenceAdapter>
ps:數據庫 activemq 需要手動創建。
五、內存消息存儲
內存消息存儲,會將所有的持久化消息存儲在內存中,必須注意JVM使用情況以及內存限制,適用於一些能快速消費的數據量不大的小消息,當MQ關閉或者宕機,未被消費的內存消息會被清空。
配置方式 設置 broker屬性值 persistent="false":
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="false"/>
每天用心記錄一點點。內容也許不重要,但習慣很重要!