
一、什么是消息中間件
兩個系統或兩個客戶端之間進行消息傳送,利用高效可靠的消息傳遞機制進行平台無關的數據交流,並基於數據通信來進行分布式系統的集成。通過提供消息傳遞和消息排隊模型,它可以在分布式環境下擴展進程間的通信。
消息中間件,總結起來作用有三個:異步化提升性能、降低耦合度、流量削峰。

系統A發送消息給中間件后,自己的工作已經完成了,不用再去管系統B什么時候完成操作。而系統B拉去消息后,執行自己的操作也不用告訴系統A執行結果,所以整個的通信過程是異步調用的。
二、消息中間件的應用場景
2.1 異步通信
有些業務不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。

2.2 緩沖
在任何重要的系統中,都會有需要不同的處理時間的元素。消息隊列通過一個緩沖層來幫助任務最高效率的執行,該緩沖有助於控制和優化數據流經過系統的速度。以調節系統響應時間。
2.3 解耦
降低工程間的強依賴程度,針對異構系統進行適配。在項目啟動之初來預測將來項目會碰到什么需求,是極其困難的。通過消息系統在處理過程中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口,當應用發生變化時,可以獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2.4 冗余
有些情況下,處理數據的過程會失敗。除非數據被持久化,否則將造成丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的”插入-獲取-刪除”范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
2.5 擴展性
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。便於分布式擴容。
2.6 可恢復性
系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。
2.7 順序保證
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。
2.8 過載保護
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量無法提取預知;如果以為了能處理這類瞬間峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
2.9 數據流處理
分布式系統產生的海量數據流,如:業務日志、監控數據、用戶行為等,針對這些數據流進行實時或批量采集匯總,然后進行大數據分析是當前互聯網的必備技術,通過消息隊列完成此類數據收集是最好的選擇。
三、常用消息隊列(ActiveMQ、RabbitMQ、RocketMQ、Kafka)比較
| 特性MQ | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
|---|---|---|---|---|
| 生產者消費者模式 | 支持 | 支持 | 支持 | 支持 |
| 發布訂閱模式 | 支持 | 支持 | 支持 | 支持 |
| 請求回應模式 | 支持 | 支持 | 不支持 | 不支持 |
| Api完備性 | 高 | 高 | 高 | 高 |
| 多語言支持 | 支持 | 支持 | java | 支持 |
| 單機吞吐量 | 萬級 | 萬級 | 萬級 | 十萬級 |
| 消息延遲 | 無 | 微秒級 | 毫秒級 | 毫秒級 |
| 可用性 | 高(主從) | 高(主從) | 非常高(分布式) | 非常高(分布式) |
| 消息丟失 | 低 | 低 | 理論上不會丟失 | 理論上不會丟失 |
| 文檔的完備性 | 高 | 高 | 高 | 高 |
| 提供快速入門 | 有 | 有 | 有 | 有 |
| 社區活躍度 | 高 | 高 | 有 | 高 |
| 商業支持 | 無 | 無 | 商業雲 | 商業雲 |
四、消息中間件的角色
Queue: 隊列存儲,常用與點對點消息模型 ,默認只能由唯一的一個消費者處理。一旦處理消息刪除。
Topic: 主題存儲,用於訂閱/發布消息模型,主題中的消息,會發送給所有的消費者同時處理。只有在消息可以重復處 理的業務場景中可使用,Queue/Topic都是 Destination 的子接口
ConnectionFactory: 連接工廠,客戶用來創建連接的對象,例如ActiveMQ提供的ActiveMQConnectionFactory
Connection: JMS Connection封裝了客戶與JMS提供者之間的一個虛擬的連接。
Destination: 消息的目的地,目的地是客戶用來指定它生產的消息的目標和它消費的消息的來源的對象。JMS1.0.2規范中定義了兩種消息傳遞域:點對點(PTP)消息傳遞域和發布/訂閱消息傳遞域。
點對點消息傳遞域的特點如下:
- 每個消息只能有一個消費者。
- 消息的生產者和消費者之間沒有時間上的相關性。無論消費者在生產者發送消息的時候是否處於運行狀態,它都可以提取消息。
發布/訂閱消息傳遞域的特點如下:
- 每個消息可以有多個消費者。
- 生產者和消費者之間有時間上的相關性。
- 訂閱一個主題的消費者只能消費自它訂閱之后發布的消息。JMS規范允許客戶創建持久訂閱,這在一定程度上放松了時間上的相關性要求 。持久訂閱允許消費者消費它在未處於激活狀態時發送的消息。
在點對點消息傳遞域中,目的地被成為隊列(queue);在發布/訂閱消息傳遞域中,目的地被成為主題(topic)。
五、JMS的消息格式
JMS消息由以下三部分組成的:
-
消息頭:
每個消息頭字段都有相應的getter和setter方法。
-
消息屬性:
如果需要除消息頭字段以外的值,那么可以使用消息屬性。
-
消息體:
JMS定義的消息類型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。
消息類型:
| 屬性 | 類型 |
|---|---|
| TextMessage | 文本消息 |
| MapMessage | k/v |
| BytesMessage | 字節流 |
| StreamMessage | java原始的數據流 |
| ObjectMessage | 序列化的java對象 |
六、消息可靠性機制
只有在被確認之后,才認為已經被成功地消費了,消息的成功消費通常包含三個階段 :客戶接收消息、客戶處理消息和消息被確認。在事務性會話中,當一個事務被提交的時候,確認自動發生。在非事務性會話中,消息何時被確認取決於創建會話時的應答模式(acknowledgement mode)。該參數有以下三個可選值:
Session.AUTO_ACKNOWLEDGE:當客戶成功的從receive方法返回的時候,或者從MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的消息。Session.CLIENT_ACKNOWLEDGE:客戶通過消息的acknowledge方法確認消息。需要注意的是,在這種模式中,確認是在會話層上進行:確認一個被消費的消息將自動確認所有已被會話消費的消息。例如,如果一個消息消費者消費了10個消息,然后確認第5個消息,那么所有10個消息都被確認。Session.DUPS_ACKNOWLEDGE:該選擇只是會話遲鈍的確認消息的提交。如果JMS Provider失敗,那么可能會導致一些重復的消息。如果是重復的消息,那么JMS Provider必須把消息頭的JMSRedelivered字段設置為true。
6.1 優先級
可以使用消息優先級來指示JMS Provider首先提交緊急的消息。優先級分10個級別,從0(最低)到9(最高)。如果不指定優先級,默認級別是4。需要注意的是,JMS Provider並不一定保證按照優先級的順序提交消息。
6.2 消息過期
可以設置消息在一定時間后過期,默認是永不過期。
6.3 臨時目的地
可以通過會話上的createTemporaryQueue方法和createTemporaryTopic方法來創建臨時目的地。它們的存在時間只限於創建它們的連接所保持的時間。只有創建該臨時目的地的連接上的消息消費者才能夠從臨時目的地中提取消息。
七、什么是ActiveMQ
ActiveMQ是一種開源的基於JMS(Java Message Servie)規范的一種消息中間件的實現,ActiveMQ的設計目標是提供標准的,面向消息的,能夠跨越多語言和多系統的應用集成消息通信中間件。
官網地址:http://activemq.apache.org/
7.1 存儲方式
1. KahaDB存儲: KahaDB是默認的持久化策略,所有消息順序添加到一個日志文件中,同時另外有一個索引文件記錄指向這些日志的存儲地址,還有一個事務日志用於消息回復操作。是一個專門針對消息持久化的解決方案,它對典型的消息使用模式進行了優化
特性:
1、日志形式存儲消息;
2、消息索引以 B-Tree 結構存儲,可以快速更新;
3、 完全支持 JMS 事務;
4、支持多種恢復機制kahadb 可以限制每個數據文件的大小。不代表總計數據容量。
2. AMQ 方式: 只適用於 5.3 版本之前。 AMQ 也是一個文件型數據庫,消息信息最終是存儲在文件中。內存中也會有緩存數據。
3. JDBC存儲 : 使用JDBC持久化方式,數據庫默認會創建3個表,每個表的作用如下:
activemq_msgs:queue和topic的消息都存在這個表中
activemq_acks:存儲持久訂閱的信息和最后一個持久訂閱接收的消息ID
activemq_lock:跟kahadb的lock文件類似,確保數據庫在某一時刻只有一個broker在訪問
4. LevelDB存儲 : LevelDB持久化性能高於KahaDB,但是在ActiveMQ官網對LevelDB的表述:LevelDB官方建議使用以及不再支持,推薦使用的是KahaDB
5.Memory 消息存儲: 顧名思義,基於內存的消息存儲,就是消息存儲在內存中。persistent=”false”,表示不設置持 久化存儲,直接存儲到內存中,在broker標簽處設置。
7.2 協議
協議官網API:http://activemq.apache.org/configuring-version-5-transports.html
-
Transmission Control Protocol (TCP):
- 這是默認的Broker配置,TCP的Client監聽端口是61616。
- 在網絡傳輸數據前,必須要序列化數據,消息是通過一個叫wire protocol的來序列化成字節流。默認情況下,ActiveMQ把wire protocol叫做OpenWire,它的目的是促使網絡上的效率和數據快速交互。
- TCP連接的URI形式:tcp://hostname:port?key=value&key=value
- TCP傳輸的優點:
(1)TCP協議傳輸可靠性高,穩定性強
(2)高效性:字節流方式傳遞,效率很高
(3)有效性、可用性:應用廣泛,支持任何平台
-
New I/O API Protocol(NIO)
-
NIO協議和TCP協議類似,但NIO更側重於底層的訪問操作。它允許開發人員對同一資源可有更多的client調用和服務端有更多的負載。
-
適合使用NIO協議的場景:
(1)可能有大量的Client去鏈接到Broker上一般情況下,大量的Client去鏈接Broker是被操作系統的線程數所限制的。因此,NIO的實現比TCP需要更少的線程去運行,所以建議使用NIO協議
(2)可能對於Broker有一個很遲鈍的網絡傳輸NIO比TCP提供更好的性能 -
NIO連接的URI形式:nio://hostname:port?key=value
-
Transport Connector配置示例:
-
<transportConnectors>
<transportConnector
name="tcp"
uri="tcp://localhost:61616?trace=true" />
<transportConnector
name="nio"
uri="nio://localhost:61618?trace=true" />
</transportConnectors>
- User Datagram Protocol(UDP)
1:UDP和TCP的區別
(1)TCP是一個原始流的傳遞協議,意味着數據包是有保證的,換句話說,數據包是不會被復制和丟失的。UDP,另一方面,它是不會保證數據包的傳遞的
(2)TCP也是一個穩定可靠的數據包傳遞協議,意味着數據在傳遞的過程中不會被丟失。這樣確保了在發送和接收之間能夠可靠的傳遞。相反,UDP僅僅是一個鏈接協議,所以它沒有可靠性之說
2:從上面可以得出:TCP是被用在穩定可靠的場景中使用的;UDP通常用在快速數據傳遞和不怕數據丟失的場景中,還有ActiveMQ通過防火牆時,只能用UDP
3:UDP連接的URI形式:udp://hostname:port?key=value
4:Transport Connector配置示例:
<transportConnectors>
<transportConnector
name="udp"
uri="udp://localhost:61618?trace=true" />
</transportConnectors>
- Secure Sockets Layer Protocol (SSL)
1:連接的URI形式:ssl://hostname:port?key=value
2:Transport Connector配置示例:
<transportConnectors>
<transportConnector name="ssl" uri="ssl://localhost:61617?trace=true"/>
</transportConnectors>
八、案例(Hello World)
這里以windows為案例演示
下載地址:http://activemq.apache.org/components/classic/download/
8.1 安裝啟動
解壓后直接執行
bin/win64/activemq.bat

8.2 web控制台
http://localhost:8161/
賬號密碼:admin/admin


8.3 web控制台
修改 ActiveMQ 配置文件 activemq/conf/jetty.xml
jettyport節點: 配置文件修改完畢,保存並重新啟動 ActiveMQ 服務
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="127.0.0.1"/>
<property name="port" value="8161"/>
</bean>
8.4 開發
1. jar引入:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2. Sender :
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @program: activemq_01
* @ClassName Sender
* @description: 消息發送
* @author: muxiaonong
* @create: 2020-10-02 13:01
* @Version 1.0
**/
public class Sender {
public static void main(String[] args) throws Exception{
// 1. 獲取連接工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
// 2. 獲取一個向activeMq的連接
Connection connection = factory.createConnection();
// 3. 獲取session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.找目的地,獲取destination,消費端,也會從這個目的地取消息
Queue queue = session.createQueue("user");
// 5.1 消息創建者
MessageProducer producer = session.createProducer(queue);
// consumer --> 消費者
// producer --> 創建者
// 5.2. 創建消息
for (int i = 0; i < 100; i++) {
TextMessage textMessage = session.createTextMessage("hi:"+i);
// 5.3 向目的地寫入消息
producer.send(textMessage);
Thread.sleep(1000);
}
// 6.關閉連接
connection.close();
System.out.println("結束。。。。。");
}
}
3. Receiver :
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @program: activemq_01
* @ClassName Receiver
* @description: 消息接收
* @author: muxiaonong
* @create: 2020-10-02 13:01
* @Version 1.0
**/
public class Receiver {
public static void main(String[] args) throws Exception{
// 1. 獲取連接工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
// 2. 獲取一個向activeMq的連接
Connection connection = factory.createConnection();
connection.start();
// 3. 獲取session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.找目的地,獲取destination,消費端,也會從這個目的地取消息
Destination queue = session.createQueue("user");
// 5 獲取消息
MessageConsumer consumer = session.createConsumer(queue);
while(true){
TextMessage message = (TextMessage)consumer.receive();
System.out.println("message:"+message.getText());
}
}
}
測試結果:
message:hi:38
message:hi:39
message:hi:40
message:hi:41
message:hi:42
message:hi:43
message:hi:44
message:hi:45
web后台顯示有一個消費者處於連接狀態,且已消費了68個message,而該條隊列已沒有message待消費了

九、總結
今天的MQ入門教程系列就這里了,感興趣的小伙伴可以試試,遇到了什么問題,或者有疑問的,都可以在下方留言,小農看見了會第一時間回復大家,MQ作為一個消息中間件,不管是面試還是工作中都會經常用到,所以是很有必要去了解和學習的一個技術點,今天的分享就到這里了,謝謝各位小伙伴的觀看,我們下篇文章見,大家加油!

