ActiveMQ 筆記(六)ActiveMQ的消息存儲和持久化


個人博客網:https://wushaopei.github.io/    (你想要這里多有)

一、持久化機制

1、Activemq持久化

1.1 什么是持久化:

持久化就是高可用的機制,即使服務器宕機了,消息也不會丟失

1.2 持久化的作用

將MQ 收到的消息存儲到文件、硬盤、數據庫 等、 則叫MQ 的持久化,這樣即使服務器宕機,消息在本地還是有,仍就可以訪問到。

詳情——官網 : http://activemq.apache.org/persistence

1.3 ActiveMQ 支持的消息持久化機制

  • 為了避免意外宕機以后丟失信息,需要做到重啟后可以恢復消息隊列,消息系統一半都會采用持久化機制。
  • ActiveMQ的消息持久化機制有JDBC,AMQ,KahaDB和LevelDB,無論使用哪種持久化方式,消息的存儲邏輯都是一致的。
  • 就是在發送者將消息發送出去后,消息中心首先將消息存儲到本地數據文件、內存數據庫或者遠程數據庫等。再試圖將消息發給接收者,成功則將消息從存儲中刪除,失敗則繼續嘗試嘗試發送。
  • 消息中心啟動以后,要先檢查指定的存儲位置是否有未成功發送的消息,如果有,則會先把存儲位置中的消息發出去。

注意:一句話:ActiveMQ宕機了,消息不會丟失的機制。

二、持久化方式

1、Activemq的持久化方式有幾種

1.1 AMQ Mesage Store(了解)

AMQ是一種文件存儲形式,它具有寫入速度快和容易恢復的特點。消息存儲再一個個文件中文件的默認大小為32M,當一個文件中的消息已經
全部被消費,那么這個文件將被標識為可刪除,在下一個清除階段,這個文件被刪除。AMQ適用於ActiveMQ5.3之前的版本

注意:基於文件的存儲方式,是以前的默認消息存儲,現在不用了

1.2 KahaDB消息存儲(默認)

 5.4 之后基於日志文件的持久化插件,默認持久化插件,提高了性能和恢復能力

KahaDB 的屬性配置 : http://activemq.apache.org/kahadb

說明:它使用一個事務日志和 索引文件來存儲所有的地址

db-<數字>.log 存儲數據,一個存滿會再次創建 db-2 db-3 …… ,當不會有引用到數據文件的內容時,文件會被刪除或歸檔

db.data 是一個BTree 索引,索引了消息數據記錄的消息,是消息索引文件,它作為索引指向了 db-<x>.log 里的消息

一點題外話:就像mysql 數據庫,新建一張表,就有這個表對應的 .MYD 文件,作為它的數據文件,就有一個 .MYI 作為索引文件。

db.free 存儲空閑頁 ID 有時會被清除

db.redo 當 KahaDB 消息存儲在強制退出后啟動,用於恢復 BTree 索引

lock 顧名思義就是鎖

四類文件+一把鎖 ==》 KahaDB

1.3 JDBC消息存儲

消息基於JDBC存儲的

1.4 LevelDB消息存儲(了解)

希望作為以后的存儲引擎,5.8 以后引進,也是基於文件的本地數據存儲形式,但是比 KahaDB 更快

它比KahaDB 更快的原因是她不使用BTree 索引,而是使用本身自帶的 LeavelDB 索引

這種文件系統是從ActiveMQ5.8之后引進的,它和KahaDB非常相似,也是基於文件的本地數據庫存儲形式,但是它提供比KahaDB更快的持久性。
但它不使用自定義B-Tree實現來索引獨寫日志,而是使用基於LevelDB的索引

題外話:為什么LeavelDB 更快,並且5.8 以后就支持,為什么還是默認 KahaDB 引擎,因為 activemq 官網本身沒有定論,LeavelDB 之后又出了可復制的LeavelDB 比LeavelDB 更性能更優越,但需要基於 Zookeeper 所以這些官方還沒有定論,任就使用 KahaDB

默認配置如下:

<persistenceAdapter>
      <levelDB directory="activemq-data"/>
</persistenceAdapter>

1.5 JDBC Message Store with ActiveMQ Journal

2、JDBC存儲消息(重點)

JDBC : 有一部分數據會真實的存儲到數據庫中 
       使用JDBC 的持久化,

修改配置文件,默認 kahaDB

修改之前:

<persistenceAdapter>

       <kahaDB directory="${activemq.data}/kahadb"/>  

 </persistenceAdapter>

修改之后:

<persistenceAdapter>

      <jdbcPersistenceAdapter dataSource="#mysql-ds"/>

 </persistenceAdapter>

在activemq 的lib 目錄下添加 jdbc 的jar 包 (connector.jar 我使用5.1.41 版本)

修改配置文件 : activemq.xml 使其連接自己windows 上的數據庫,並在本地創建名為activemq 的數據庫

讓linux 上activemq 可以訪問到 mysql ,之后產生消息。

ActiveMQ 啟動后會自動在 mysql 的activemq 數據庫下創建三張表:activemq_msgs 、activemq_acks、activemq_lock

activemq_acks:用於存儲訂閱關系。如果是持久化Topic,訂閱者和服務器的訂閱關系在這個表保存

activemq_lock:在集群環境中才有用,只有一個Broker可以獲得消息,稱為Master Broker

activemq_msgs:用於存儲消息,Queue和Topic都存儲在這個表中

點對點會在數據庫的數據表 ACTIVEMQ_MSGS 中加入消息的數據,且在點對點時,消息被消費就會從數據庫中刪除

但是對於主題,訂閱方式接受到的消息,會在 ACTIVEMQ_MSGS 存儲消息,即使MQ 服務器下線,並在 ACTIVEMQ_ACKS 中存儲消費者信息 。 並且存儲以 activemq 為主,當activemq 中的消息被刪除后,數據庫中的也會自動被刪除。

如果表沒生成,可能需要自己創建

-- auto-generated definition create table ACTIVEMQ_ACKS ( CONTAINER varchar(250) not null comment '消息的Destination', SUB_DEST varchar(250) null comment '如果使用的是Static集群,這個字段會有集群其他系統的信息', CLIENT_ID varchar(250) not null comment '每個訂閱者都必須有一個唯一的客戶端ID用以區分', SUB_NAME varchar(250) not null comment '訂閱者名稱', SELECTOR varchar(250) null comment '選擇器,可以選擇只消費滿足條件的消息,條件可以用自定義屬性實現,可支持多屬性AND和OR操作', LAST_ACKED_ID bigint null comment '記錄消費過消息的ID', PRIORITY bigint default 5 not null comment '優先級,默認5', XID varchar(250) null, primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY) ) comment '用於存儲訂閱關系。如果是持久化Topic,訂閱者和服務器的訂閱關系在這個表保存'; create index ACTIVEMQ_ACKS_XIDX on ACTIVEMQ_ACKS (XID); -- auto-generated definition create table ACTIVEMQ_LOCK ( ID bigint not null primary key, TIME bigint null, BROKER_NAME varchar(250) null ); -- auto-generated definition create table ACTIVEMQ_MSGS ( ID bigint not null primary key, CONTAINER varchar(250) not null, MSGID_PROD varchar(250) null, MSGID_SEQ bigint null, EXPIRATION bigint null, MSG blob null, PRIORITY bigint null, XID varchar(250) null ); create index ACTIVEMQ_MSGS_CIDX on ACTIVEMQ_MSGS (CONTAINER); create index ACTIVEMQ_MSGS_EIDX on ACTIVEMQ_MSGS (EXPIRATION); create index ACTIVEMQ_MSGS_MIDX on ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ); create index ACTIVEMQ_MSGS_PIDX on ACTIVEMQ_MSGS (PRIORITY); create index ACTIVEMQ_MSGS_XIDX on ACTIVEMQ_MSGS (XID); 

坑:

JDBC 改進: 加入高速緩存機制 Journal

高速緩存在 activemq.xml 中的配置:

⑤代碼運行驗證

一定要開啟持久化 :  messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

(1)隊列Queue:

生產者:

      。。。。。
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME); MessageProducer messageProducer = session.createProducer(queue); messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); for (int i = 0; i < 3; i++) { 。。。。。。。。。。。

運行結果:

在點對點類型中

  • 當DeliveryMode設置為NON_PERSISTENCE時,消息被保存在內存中
  • 當DeliveryMode設置為PERSISTENCE時,消息保存在broker的相應的文件或者數據庫中。

 而且點對點類型中消息一旦被Consumer消費,就從數據中刪除
 
消費前的消息,會被存放到數據庫

上面的消息被消費后被MQ自動刪除

(2)主題Topic

        。。。。
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.setClientID("我是生產者張三"); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(ACTIVEMQ_TOPIC_NAME); MessageProducer messageProducer = session.createProducer(topic); messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); for (int i = 0; i < 3; i++) { 。。。。。。

 在點對點類型中

  • 當DeliveryMode設置為NON_PERSISTENCE時,消息被保存在內存中
  • 當DeliveryMode設置為PERSISTENCE時,消息保存在broker的相應的文件或者數據庫中。

 而且點對點類型中消息一旦被Consumer消費,就從數據中刪除
 
消費前的消息,會被存放到數據庫

上面的消息被消費后被MQ自動刪除

3、JDBC Message store with ActiveMQ Journal(重點)

3.1 定義:

3.2 說明

這種方式克服了JDBC Store的不足,JDBC每次消息過來,都需要去寫庫讀庫。
ActiveMQ Journal,使用高速緩存寫入技術,大大提高了性能。
 
當消費者的速度能夠及時跟上生產者消息的生產速度時,journal文件能夠大大減少需要寫入到DB中的消息。
舉個例子:

生產者生產了1000條消息,這1000條消息會保存到journal文件,如果消費者的消費速度很快的情況下,在journal文件還沒有同步到DB之前,消費者已經消費了90%的以上消息,那么這個時候只需要同步剩余的10%的消息到DB。如果消費者的速度很慢,這個時候journal文件可以使消息以批量方式寫到DB。

3.3 配置方式:

原來的配置:

<persistenceAdapter> 
        <jdbcPersistenceAdapter dataSource="#mysql-ds" /> 
</persistenceAdapter>

修改的結果:

<persistenceFactory>        
         <journalPersistenceAdapterFactory 
                     journalLogFiles="5" 
                     journalLogFileSize="32768" 
                     useJournal="true" 
                     useQuickJournal="true" 
                     dataSource="#mysql-ds" 
                     dataDirectory="../activemq-data" /> 
</persistenceFactory>

以前是實時寫入mysql,在使用了journal后,數據會被journal處理,如果在一定時間內journal處理(消費)完了,就不寫入mysql,如果沒消費完,就寫入mysql,起到一個緩存的作用

小總結:

  1. 持久化消息是指:
  • MQ 所在的服務器down 了消息也不會丟失

     2.持久化機制演化過程:

  • 從最初的AMQ Message Store 方案到 ActiveMQ V4版本推出的High performance journal (高性能事務)附件,並且同步推出了關系型數據庫的存儲方案, ActiveMQ 5.3 版本有推出了KahaDB 的支持,(也是5.4之后的默認持久化方案),后來ActiveMQ 從5.8開始支持LevelDB ,現在5.9 提供了 Zookeeper + LevelDB 的集群化方案。

     3. ActiveMQ 消息持久化機制有:

AMQ 基於日志文件
KahaDB 基於日志文件,5.4 之后的默認持久化
JDBC 基於第三方數據庫
LevelDB 基於文件的本地數據庫存儲,從5.8 之后推出了LevelDB 性能高於 KahaDB
ReplicatedLevelDB Store

從5.8之后提供了基於LevelDB 和Zookeeper 的數據復制方式,用於Master-slave方式的首數據復制選方案

但是無論使用哪種持久化方式,消息的存儲邏輯都一樣


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM