一、 消息隊列
1. 分布式應用與集群的區別:
如果是一個業務被拆分成多個子業務部署在不同的服務器上,那就是分布式應用;如果是同一個業務部署在多台服務器上,那就是集群。
2. 系統間通信方式:
一種是基於遠程過程調用的方式(如RPC調用);另一種是基於消息隊列的方式。
二、RabbitMQ
1.RabbitMQ特點:
RabbitMQ是一個由Erlang語言開發的基於AMQP標准的開源框架。RabbitMQ最初起源於金融系統,用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。其具體特點包括:
可靠性 靈活的路由 支持消息集群 高可用性
支持多種協議 (除支持AMQP協議之外,還通過插件的方式支持其他消息隊列協議,如STOMP、MQTT)
支持多語言客戶端 提供跟蹤機制 提供管理界面
提供插件機制 (RabbitMQ提供了許多插件,也可以編寫自己的插件)
2. 總結:
RabbitMQ最大的優勢在於提供了比較靈活的消息路由策略、高可用性、可靠性以及豐富的插件、多種平台支持和完善的文檔。不過,由於AMQP協議本身導致它的實現比較重量,從而使得與其他MQ (比如Kafka) 對比其吞吐量處於下風。
三、ActiveMQ
1.ActiveMQ 特點:
ActiveMQ是由Apache出品的一款開源消息中間件,旨在為應用程序提供高效、可擴展、穩定、安全的企業級消息通信。ActiveMQ實現了JMS 1.1 並提供了很多附加的特性,比如JMX管理、主從管理、消息組通信、消息優先級、延遲接收消息、虛擬接收者、消息持久化、消息隊列監控等。主要特性如下:
支持Java、C、C++、C#、Ruby、Perl、Python、PHP等多種語言的客戶端和協議,如OpenWire、STOMP、AMQP、MQTT協議。
提供了像消息組通信、消息優先級、延遲接收消息、虛擬接收者、消息持久化之類的高級特性。
完全支持JMS 1.1 和 J2EE 1.4 規范 (包括持久化、分布式事務消息、事務)
支持Spring框架,ActiveMQ 可以通過Spring 的配置文件方式很容易嵌入Spring應用中。
通過了常見的J2EE服務器測試,比如TomEE、Geronimo、JBoss、GlassFish、WebLogic。
連接方式多樣化,ActiveMQ 提供了多種連接方式,例如 in-VM、TCP、SSL、NIO、UDP、多播、JGroups、JXTA。
支持通過使用JDBC 和 Journal 實現消息的快速持久化。
為高性能集群、客戶端-服務器、點對點通信等場景而設計。
提供了技術和語言中立的REST API 接口。
支持以AJAX 方式調用 ActiveMQ。
可以作為內存中的JMS 提供者,非常適合 JMS 單元測試。
四、Kafka
1.Kafka 特點:
Kafka 最早是由LinkedIn 公司開發的一種分布式的基於 發布/訂閱 的消息系統,后來成為 Apache 的頂級項目。其主要特點如下:
同時為發布和訂閱提供高吞吐量。 (Kafka 的設計目標是以時間復雜度為 O(1) 的方式提供消息持久化能力的,即使對TB級別以上數據也能保證常數時間的訪問性能,即使在非常廉價的商用機器上也能做到單機支持每秒 100K 條消息的傳輸)
消息持久化。 (將消息持久化到磁盤,因此可用於批量消費,例如 ETL 以及實時應用程序。通過將數據持久化到硬盤以及復制可以防止數據丟失。)
分布式。 (支持服務器間的消息分區及分布式消費,同時保證每個Partition 內的消息順序傳輸。其內部的Producer、Broker 和 Consumer 都是分布式架構,這更易於向外擴展。)
消費消息采用 Pull 模式。(消息被處理的狀態是在 Consumer 端維護的,而不是由服務器端維護,Broker 無狀態,Consumer 自己保存offet。)
支持Online 和 Offline 場景,同時支持離線數據處理和實時數據處理。
五、RocketMQ
1.RocketMQ 特點:
RocketMQ是阿里巴巴於2012年開源的分布式消息中間件,后來捐贈給 Apache軟件基金會,並於2017年9月25日成為Apache的頂級項目。作為經歷過多次阿里巴巴“雙11” 這種“超級工程”的洗禮並有穩定出色表現的國產中間件,以其高性能、低延遲和高可靠等特性近年來被越來越多的國內企業所使用。其主要特點如下:
具有靈活的可擴展性。 (RocketMQ 天然支持集群,其核心四大組件(NameServer、Broker、Producer、Consumer)的每一個都可以在沒有單點故障的情況下進行水平擴展。)
具有海量消息堆積能力。 (RocketMQ 采用零拷貝原理實現了超大量消息的堆積能力,據說單機已經可以支持億級消息堆積,而且在堆積了這么多消息后依然保持寫入低延遲)
支持順序消息。 (RocketMQ 可以保證消息消費者按照消息發送的順序對消息進行消費。順序消息分為全局有序消息和局部有序消息,一般推薦使用局部有序消息,即生產者通過將某一類消息按順序發送至同一個隊列中來實現。)
支持多種消息過濾方式。 (消息過濾分為在服務器端過濾和在消費端過濾。在服務器端過濾時可以按照消息消費者的要求進行過濾,優點是減少了不必要的消息傳輸,缺點是增加了消息服務器的負擔,實現相對復雜。消費端過濾則完全由具體應用自定義實現,這種方式更加靈活,缺點是很多無用的消息會被傳輸給消息消費者。)
支持事務消息。 (RocketMQ 除支持普通消息、順序消息之外,還支持事務消息,這個特性對於分布式事務來說提供了另一種解決思路。)
支持回溯消費。 (回溯消費是指對於消費者已經消費成功的消息,由於業務需求需要重新消費。RocketMQ 支持按照時間回溯消費,時間維度精確到毫秒,可以向前回溯,也可以向后回溯。)
六、怎么保證數據不丟失
queue 點對點模式 不存在數據丟失問題
“負載均衡”模式,如果當前沒有消費者,消息也不會丟棄;如果有多個消費者,那么一條消息也只會發送給其中一個消費者,並且要求消費者ack信息ack確認機制。
Topics 發布訂閱模式 消息可能會丟失
ActiveMQ——消息持久化
有了持久化機制,消息在發送后會首先持久化到對應的文件或數據表中,在消息被消費后,再從這些文件或表中刪除(對於持久化的主體消息不會刪除)。
“訂閱-發布”模式,如果當前沒有訂閱者,消息將會被丟棄。如果有多個訂閱者,那么這些訂閱者都會收到消息
在發布訂閱的時候 為每個訂閱者設置一個唯一的訂閱標識
在消息服務器啟動時會先從持久化的媒介中讀取之前未消費的消息,並將讀取到的消息發送給消息的訂閱者或者消費者去消費,這樣就不會出現消息的丟失,保證了消息的可靠性。
消息的可靠性通過三個方面保證——持久化、事務和簽收。這里說一下ActiveMQ中消息持久化的方式。
七、消息持久化的方式
ActiveMQ中主要有以下幾種持久化的方式:具體可參考官網:消息的持久化方式和方案選擇
1️⃣AMQ Message Stroe:基於文件的存儲方式,具有寫入速度快和容易恢復的特點,消息存儲在一個個文件中,文件的默認大小是32M,是之前的默認持久化方式,現在幾乎不用了
2️⃣KahaDB:基於日志文件的持久化方式,從Active5.4版本后作為默認的持久化方式,在activemq.xml文件中可以看到如下配置
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
即默認情況下會將消息持久化到activemq安裝目錄下的/data/kahadb目錄下,在該目錄下有4個或5個文件,用來保存持久化的消息數據(db-n.log)和索引(db.data)等:數據會被追加到db-n.log文件中,當不再需要某一個db-n.log文件中的數據的時候,該log文件會被丟棄
文件的作用介紹:
db-<n>.log:KahaDB存儲消息到預定義大小的數據記錄文件中,文件命名為db-<n>.log,當數據文件已滿時,會創建一個新的數據記錄文件,n的值也會隨之遞增,文件名按照數字進行編號,如db-1.log、db-2.log、db-3.log、…當不再有引用道數據文件中的任何 消息時,文件會被刪除或歸檔
db.data:該文件包含了持久化的BTree索引,索引了消息數據記錄中的消息,他是消息的索引文件,本質上是B-Tree,使用B-Tree作為索引到db-n.log中找消息
db.free:當前db.data文件中有哪些頁是空閑的,文件的具體內容是所有空閑頁的ID
db.redo:用來進行消息恢復
lock:文件鎖,表示當前獲得KahaDB讀寫權限的broker
3️⃣JDBC消息存儲:基於JDBC的消息存儲,可借助於這種方式將消息持久化到Mysql等數據庫中
4️⃣LevelDB消息存儲:這種文件系統是從ActiveMQ5.8之后引進的,和KahaDB非常相似,也是基於文件的本地數據庫存儲形式,但是它提供了比KahaDB更快的持久性,它不是使用B-Tree索引日志,而是使用基於LevelDB的索引
5️⃣JDBC + ActiveMQ高速緩存:即JDBC Message store with ActiveMQ Journal,和JDBC的方式很相似,只不過是在持久化到的數據庫和ActiveMQ服務器之間加了一層高速緩存
JDBC方式的消息持久化步驟 由於ActiveMQ是基於Java語言開發的,因此我們可以直接在其項目的lib包下加入數據庫驅動的jar包或者數據庫連接池的jar包等。
1、將Mysql的驅動jar包加入到ActiveMQ的lib目錄下
2、在activemq.xml文件中配置數據源:activemq.xml類似於Spring的容器配置文件
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.2.143:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
3、修改消息的持久化方式:由默認的KahaDB改為現在的JDBC的方式
改之前:
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
改之后:注意數據源的名稱要一致,createTablesOnStartup是說重啟MQ服務后是否自動創建ActiveMQ相關的表,一般設置為true
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/> </persistenceAdapter>
4、在連接的數據庫服務器上創建使用的庫
5、配置好后,重啟MQ服務,會看到數據庫中多了3張表,這些表是ActiveMQ啟動服務時創建的根ActiveMQ相關的表
activemq_acks:該表記錄的是主題訂閱關系(消息簽收者)信息
activemq_msgs:該表記錄的是待消費的消息,該表中的消息一旦被消費則會被刪除(主題類的消息不會被清除)
activemq_lock:在集群環境中才有用,用於記錄哪個Broker是當前的Master Broker
這樣配置好后,一旦有消息產生就會在相應的表中看到記錄:但隊列中的消息一旦被消費就又會被刪除
配置JDBC持久化的高速緩存:僅有上面的配置,則消息生產者每次有消息需要持久化都會通過JDBC去調用數據庫服務以持久化數據,但像隊列中的消息,大部分在消費后又需要清除,進行及其短暫的持久化意義不大,但卻依然使JDBC頻繁的和數據庫進行交互,效率低下。因此可以在ActiveMQ和數據庫服務器之間加一層高速緩存,用來暫時緩存消息,若隊列中的消息長時間沒有被消費時才進行持久化,這樣就會大大減少ActiveMQ和數據庫服務交互的次數,提高效率。舉例來說:生產者生產了1000條消息,這1000條消息會保存到緩存文件journal文件中,如果消費者的消費速度很快,在journal文件還沒有同步到數據庫之前,消費者已經消費了900條,那么這時就只需要將剩余的100條同步即可,如果消費者消費的很慢,journal文件可以批量的將消息同步到數據庫,大大減少了ActiveMQ和數據庫服務器的交互次數。將持久化的方式改為如下方式:
<persistenceFactory> <journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768" useJournal="true" useQuickJournal="true" dataSource="#mysql-ds" dataDirectory="activemq-data" /> </persistenceFactory>
來源於:傳送門