ActiveMQ 消息存儲


深入淺出 JMS(四) - ActiveMQ 消息存儲

 一、消息的存儲方式

ActiveMQ 支持 JMS 規范中的持久化消息與非持久化消息

  • 持久化消息通常用於不管是否消費者在線,它們都會保證消息會被消費者消費。當消息被確認消費后,會從存儲中刪除
  • 非持久化消息通常用於發送通知以及實時數據,通常要求性能優先,消息可靠性並不是必須的情況
  • MQ 支持可插拔式的消息存儲,如:內存、文件和關系數據庫等方式

Queue 消息模型在 ActiveMQ 的存儲:

  采用存儲采用先進先出(FIFO),一個消息只能被一個消費者消費,當消息被確認消費之后才會被刪除。

Topic消息模型(針對持久訂閱):

  每個訂閱者獲取的消息實際是消息的一個副本,只有一個消息副本會被存儲,MQ 提供了一個指針來指向消息存儲並且分發消息副本到訂閱者,消息直到所有的持久化訂閱者都被接收才能被刪除。

持久化存儲方式:

  • KahaDB 消息存儲
  • AMQ 消息存儲
  • JDBC 消息存儲
  • 內存消息存儲

二、KahaDB 存儲方式

  KahaDB 是從 ActiveMQ 5.4 開始默認的持久化插件。KahaDb 恢復時間遠遠小於其前身 AMQ 並且使用更少的數據文件,所以可以完全代替 AMQ,kahaDB 的持久化機制同樣是基於日志文件,索引和緩存。

(一)KahaDB 主要特性:

  • 日志形式存儲消息;
  • 消息索引以 B-Tree 結構存儲,可以快速更新;
  • 完全支持 JMS 事務;
  • 支持多種恢復機制;

(二)適用場景:

高吞吐量的應用程序
存儲大數據量的消息

(三)配置方式 conf/activemq.xml:

<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>

圖4.1 KahaDB

(四)KahaDB 存儲原理

當有活動消費者時,用於臨時存儲,消息會被發送給消費着,同時被安排將被存儲,如果消息及時被確認,就不需要寫入到磁盤。寫入到磁盤中的數據消息,在后續的消息活動中,如果消息發送成功,變標記為可刪除的。系統會周期性的清除或者歸檔日志文件。

(1) KahaDB 內部結構

圖4.1 KahaDB內部結構

  • Data logs:消息日志包含了消息日志和一些命令
  • Cache:當有活動消費者時,用於臨時存儲,消息會被發送給消費着,同時被安排將被存儲,如果消息及時被確認,這不需要寫入到磁盤
  • Btree indexes(消息索引):用於引用消息日志(message id),它存儲在內存中,這樣能快速定位到。MQ會定期將內存中的消息索引保存到 metadata store 中,避免大量消息未發送時,消息索引占用過多內存空間。
  • Redo log:用於在非正常關機情況下維護索引完整性。

(2) 目錄結構

圖4.1 KahaDB目錄結構

  • Db log files:用於存儲消息(默認大小32M),當 log 日志滿了,會創建一個新的,當 log 日志中的消息都被刪除,該日志文件會被刪除或者歸檔。
  • Archive directory:當 datalog 不在被 kahadb 需要會被歸檔(通過 archiveDataLogs 屬性控制)。
  • Db.data:存放 Btree indexs。
  • Db.redo:存放 redo file,用於恢復 Btree indexs。

三、AMQ 消息存儲

寫入消息時,會將消息寫入日志文件,由於是順序追加寫,性能很高。為了提升性能,創建消息主鍵索引,並且提供緩存機制,進一步提升性能。每個日志文件的大小都是有限制的(默認32m,可自行配置)。當超過這個大小,系統會重新建立一個文件。當所有的消息都消費完成,系統會刪除這個文件或者歸檔(取決於配置)。主要的缺點是 AMQ Message 會為每一個 Destination 創建一個索引,如果使用了大量的 Queue,索引文件的大小會占用很多磁盤空間。而且由於索引巨大,一旦 Broker 崩潰,重建索引的速度會非常慢。

特點:類似 KahaDB,也包含了事務日志,每個 destination 都包含一個 index 文件,AMQ 適用於高吞吐量的應用場景,但是不適合多個隊列的場景。

配置方式 conf/activemq.xml:

<!--AMQ directory:數據存儲路徑 syncOnWrite:是否同步寫入 maxFileLength:日志文件大小 --> <persistenceAdapter> <amqPersistenceAdapter directory="${activemq.data}/AMQdb" syncOnWrite="true" maxFileLength="10mb" /> </persistenceAdapter>

(1) AMQ內部結構

圖4.3 AMQ內部結構

  • Data logs:消息日志包含了消息日志
  • Cache:用於消息的快速檢索
  • Reference store indexes:用於引用 datalogs 中的消息,通過 message ID 關聯

(2) 目錄結構

圖4.4 AMQ目錄結構

  • Lock:保證同一時間只有一個 borker 訪問文件目錄
  • temp-storag:用於存儲非持久化消息(當不在被存儲在內存中),如等待慢消費者處理消息
  • Kr-store:用於存儲引用消息日志數據
  • journal directory:包含了消息文件、消息日志和消息控制信息
  • Archive:歸檔的數據日志

四、JDBC存儲

支持通過 JDBC 將消息存儲到關系數據庫,性能上不如文件存儲,能通過關系型數據庫查詢到消息的信息。

MQ 支持的數據庫:Apache Derby、MySQL、PostgreSQL、Oracle、SQLServer、Sybase、Informix、MaxDB。

存儲表結構:

表 1:ACTIVEMQ_MSGS:用於存儲消息,Queue 和 Topic 都存儲在這個表中:

字段 說明
ID 自增的數據庫主鍵
CONTAINER 消息的Destination
MSGID_PROD 消息發送者客戶端的主鍵
MSG_SEQ 是發送消息的順序,MSGID_PROD+MSG_SEQ可以組成JMS的MessageID
EXPIRATION 消息的過期時間,存儲的是從1970-01-01到現在的毫秒數
MSG 消息本體的Java序列化對象的二進制數據
PRIORITY 優先級,從0-9,數值越大優先級越高

表 2:ACTIVEMQ_ACKS:用於存儲訂閱關系。如果是持久化Topic,訂閱者和服務器的訂閱關系在這個表保存:

字段 說明
CONTAINER 消息的Destination
SUB_DEST 如果是使用Static集群,這個字段會有集群其他系統的信息
CLIENT_ID 每個訂閱者都必須有一個唯一的客戶端ID用以區分
SUB_NAME 訂閱者名稱
SELECTOR 選擇器,可以選擇只消費滿足條件的消息。條件可以用自定義屬性實現,可支持多屬性AND和OR操作
LAST_ACKED_ID 記錄消費過的消息的ID

表 3:ACTIVEMQ_LOCK(消息鎖,保證同一時間只能有一個broker訪問這些表結構):

表 activemq_lock 在集群環境中才有用,只有一個 Broker 可以獲得消息,稱為 Master Broker,其他的只能作為備份等待 Master Broker 不可用,才可能成為下一個 Master Broker。這個表用於記錄哪個 Broker 是當前的 Master Broker。

配置方式:

1、配置數據源 conf/acticvemq.xml 文件:

<!-- 配置數據源--> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="111111"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>

2、配置 broke 中的 persistenceAdapter

dataSource 指定持久化數據庫的 bean,createTablesOnStartup 是否在啟動的時候創建數據表,默認值是 true,這樣每次啟動都會去創建數據表了,一般是第一次啟動的時候設置為 true,之后改成 false。

<!-- JDBC配置 --> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/> </persistenceAdapter> 

ps:數據庫 activemq 需要手動創建。

五、內存消息存儲

內存消息存儲,會將所有的持久化消息存儲在內存中,必須注意JVM使用情況以及內存限制,適用於一些能快速消費的數據量不大的小消息,當MQ關閉或者宕機,未被消費的內存消息會被清空。

配置方式 設置 broker屬性值 persistent="false":

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="false"/>

每天用心記錄一點點。內容也許不重要,但習慣很重要!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM