文章目錄
引言
什么是消息中間件?隨着業務的急速增長以及分布式架構的逐漸興起,首先要考慮的就是如何高效的在各節點之間傳遞消息,其次要考慮的是流量洪峰時如何削減系統的壓力以及跨平台消息的傳輸等問題,消息中間件就可以幫我們解決以上問題。而消息隊列產品眾多,我們該如何選擇呢?本系列文章主要針對目前使用最多的ActiveMQ、Kafka、RabbitMQ進行講解說明。
正文
一、ActiveMQ是如何產生的?
產生背景
一開始消息中間件的廠商繁多,且各個廠商之間沒有統一的規范,這就導致了各消息中間件非常難以整合協作,因此,后來陸續出現了如JMS和AMQP這樣的消息隊列規范,提供了統一的標准,而ActiveMQ就是完全遵循JMS規范開發的消息隊列。
JMS規范
基本概念
什么是JMS(Java Message Service)規范?JMS是一個基於Java平台面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。在設計JMS時,設計師就計划能夠結合現有消息隊列的優點,如:
- 不同的消息傳送模式或域,例如點對點消息傳送和發布/訂閱消息傳送
- 支持同步和異步消息
- 支持可靠性消息的傳輸
- 支持常見的消息格式,如:文本、字節、流、對象等
JMS體系結構
上面是從百度找的一個圖片,下面對其中各個對象分別進行說明:
- ConnectionFactory:連接工廠,一般設為單例模式,一旦創建,就一直運行在應用容器內,客戶端使用連接工廠創建一個JMS連接。
- Connection:JMS連接表示JMS客戶端和服務器端之間的一個活動的連接。
- Session:JMS會話表示JMS客戶與JMS服務器之間的會話狀態。JMS會話建立在JMS連接上,表示客戶與服務器之間的一個會話線程。
- Destination:消息管道,從生產端流向客戶端,包括隊列(PTP),主題(Pub/Sub)。
- Message Producer和Message Consumer:生產者和消費者對象由Session對象創建,用於發送和接收消息。
- Message:JMS 消息由以下幾部分組成:消息頭,屬性,消息體。
- 消息頭(header):JMS消息頭包含了許多字段,它們是消息發送后由JMS提供者或消息發送者產生,用來表示消息、設置優先權和失效時間等等,並且為消息確定路由Routing。
- 屬性(property):由消息發送者產生,用來添加刪除消息頭以外的附加信息。
- 消息體(body):由消息發送者產生,JMS中定義了5種消息體:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。
了解了基本概念后,下面就一起來看看如何使用ActiveMQ吧。
二、如何使用?
基本功能
本節主要講解activeMQ的基本功能和使用,詳細API請查閱官方文檔。
消息傳遞
在上文也講了ActiveMq支持P2P(點對點)傳輸和pub/sub模型,這兩種傳遞方式的本質區別就是消息是否可重復消費。比如微信私聊和群聊,私聊就是P2P,除了私聊的雙方其它人無法再獲取消息,而群聊就相當於pub/sub模式,即群成員都訂閱了該群的消息。下面首先我們來看看P2P傳輸。
P2P
先創建一個Producer生產消息:
public static void main(String[] args) {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
Connection connection = null;
try {
// 創建並開啟連接
connection = factory.createConnection();
connection.start();
// 創建會話,設置是否為事務型會話以及消息簽收方式
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 創建發送隊列
Destination destination = session.createQueue("queue");
// 創建消息發送者
MessageProducer producer = session.createProducer(destination);
// 創建消息並設置消息內容
TextMessage textMessage = session.createTextMessage();
textMessage.setText("Hello");
// 發送消息
producer.send(textMessage);
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
上面代碼注釋寫的很清楚了,可以看到是完全符合JMS的體系結構的,首先創建一個連接工廠,並通過連接工廠創建連接,然后通過連接創建會話(在創建會話時可以指定是否為事務型會話以及設置消息的簽收方式,相關概念在后面會詳細講解),之后再為本次會話創建管道,即傳輸隊列(這里可以指定是創建隊列(p2p)還是還是主題(pub/sub)),最后創建消息對象發送到管道提交即完成本次會話的消息生產。接下來看看消費者如何消費消息:
public static void main(String[] args) {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
Connection connection = null;
try {
connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 創建接收隊列
Destination destination = session.createQueue("queue");
// 創建消息消費者
MessageConsumer consumer = session.createConsumer(destination);
// 接收消息
TextMessage message = (TextMessage) consumer.receive();
System.out.println(message.getText());
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
整個流程和生產者流程基本是一樣的,只不過消費者不再需要自己生產消息,而是從消息隊列中獲取,這里是通過receive方法獲取的,該方法相當於是客戶端主動從隊列中“拉”消息,並且在消息隊列為空時會阻塞等待消息傳入;另外還有一種隊列“推”送的方式,通過監聽器實現。
public static void main(String[] args) {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
Connection connection = null;
try {
connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("queue");
MessageConsumer consumer = session.createConsumer(destination);
// 使用監聽器監聽隊列
MessageListener listener = new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println(((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
};
while (true) {
consumer.setMessageListener(listener);
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
需要注意的是listener不會阻塞等待,當消息到達時會主動調用onMessage方法,但它的生命周期和方法的生命周期是相同的,需要像上面一樣死循環監聽,同時receive和listener是互斥的,即同時只能使用其中一種方式來獲取消息。
pub/sub
相對於P2P,發布訂閱模式就是可以有多個消費者監聽同一個隊列,並可重復消費同一個消息,整個代碼實現流程和上面的是一樣的,只是將 Destination destination = session.createQueue(“queue”);改為Destination destination = session.createTopic(“topic”);即可。
這里需要思考一個問題,消費者能夠訂閱到哪個時間段的消息呢?是所有的消息還是自消費者注冊監聽之后的呢?很顯然,肯定是只能獲取到注冊監聽之后的消息。但是,若是消費者中途怠機再恢復,怠機過程中產生的消息能否接收到呢?AcitveMQ是支持獲取怠機過程中的消息的,即持久訂閱工功能。
持久訂閱
什么是持久訂閱?舉個例子,相當於你在微博點擊關注某個博主,無論你是否在線,博主發送的消息你都是可以獲取到的,持久訂閱就類似這樣,在創建好連接后首先設置一個自身的身份標識clientId,這個id是唯一的:
connection.setClientID("lwj");
然后通過下面API創建消費者即可創建持久訂閱:
MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "lwj");
需要注意持久訂閱只有pub/sub模式下才支持。
消息傳遞的可靠性
在學習了基礎的使用后,我們應該考慮一個問題,消息隊列該如何保證消息傳遞的可靠性呢?即如何保證生產的消息正確被消費者簽收或者被生產者銷毀?這就牽涉到事務型會話和非事務型會話,JMS Session接口提供了 commit 和 rollback 方法。事務提交意味着生產的所有消息被發送,消費的所有消息被確認;事務回滾意味着生產的所有消息被銷毀,消費的所有消息被恢復並重新提交,除非它們已經過期。 事務型的會話總是牽涉到事務處理中,commit 或 rollback 方法一旦被調用,一個事務就結束了,而另一個事務被開始;關閉事務性會話將回滾其中的事務。
事務型會話與非事務型會話
JMS在創建session會話時通過第一個參數指定是否為事務型會話:
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
當為事務型會話時,調用commit方法前消息並不會真正的投遞到消息中間件中去,而在調用commit后消息會自動確認,需要保證發送端和接收端都是事務型會話。
當為非事務型會話時,相當於生產者逐個投遞到消息中間件,但是消息的確認取決於消費者如何設置ACK_MODE,即創建會話時的第二個參數,該參數有4個選項:
- SESSION_TRANSACTED:當為事務型會話時的默認選項,若不是事務型會話設置該參數會拋出異常
- AUTO_ACKNOWLEDGE:當消費者成功的從 receive 方法返回的時候,或者從MessageListenner.onMessage 方法成功返回的時候,會話自動確認客戶收到消息。
- CLIENT_ACKNOWLEDGE:消費者通過調用Message的 acknowledge 方法確認消息。需要注意該模式下何時調用acknowledge方法,那么在調用該方法之前收到的消息都會一起被確認,而在此之后收到的消息不會被確認。比如,發送10條消息,消費者在收到第5條消息時調用acknowledge方法,那么前5條都會被確認。
- DUPS_OK_ACKNOWLEDGE:消息延遲批量確認,消息生產者在消費者沒有確認消息時會重新發送消息。該模式可優化消費者確認消息的性能,但可能會導致消費者收到重復消息(這個參數在優化一節中還會詳細講解)。
需要注意第一個是和事務綁定,后面三個都是針對消費端的,即消息中間件需要接收到消費者的ack才會認為消息被正確處理。
持久化與非持久化消息的存儲策略
消息隊列為保證高效,消息首先肯定是存儲在內存中的,那么一旦消息隊列怠機或者消息過多超出內存,消息就會面臨丟失的風險,所以需要有相關的手段來保證。
正常情況下,非持久化消息是存儲在內存中的,能夠存儲的最大消息數據在/conf/activemq.xml文件中的systemUsage節點可配置:
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
- memoryUsage是設置整個ActiveMQ節點的“可用內存限制”。這個值不能超過ActiveMQ本身設置的最大內存大小。其中的percentOfJvmHeap屬性表示百分比。
- storeUsage是設置整個ActiveMQ節點,用於存儲“持久化消息”的“可用磁盤空間”。
- tempUsage是設置臨時文件大小。一旦ActiveMQ服務節點存儲的消息達到了memoryUsage的限制,非持久化消息就會被轉儲到 temp store區域,雖然我們說過非持久化消息不進行持久化存儲,但是ActiveMQ為了防止數據洪峰出現時非持久化消息大量堆積致使內存耗盡的情況出現,還是會將非持久化消息寫入到磁盤的臨時區域——temp store。
從上文我們可以了解到ActiveMQ的存儲策略,但是還有個問題,持久化消息是通過什么介質存儲的呢?主要有以下5種:
- KahaDB:默認的存儲方式。在data/kahadb這個目錄下,會生成四個文件:
- db-*.log 存儲消息內容。新的數據以APPEND的方式追加到日志文件末尾。屬於順序寫入,因此消息存儲是比較快的。默認是32M,達到閥值會自動遞增。
- db.data 它是消息的索引文件,本質上是B-Tree(B樹),使用B-Tree作為索引指向db-*.log里面存儲的消息。
- db.redo 用來進行消息恢復
- lock文件 鎖,表示當前獲得kahadb讀寫權限的broker
- JDBC存儲,需要配置JDBC連接以及引入相應的jar。會在數據庫創建三張表:
- ACTIVEMQ_MSGS:消息表,queue和topic都存在這個表中
- ACTIVEMQ_ACKS:存儲持久訂閱的信息和最后一個持久訂閱接收的消息ID
- ACTIVEMQ_LOCKS:鎖表,用來確保某一時刻,只能有一個ActiveMQ broker實例來訪問數據庫
- Memory存儲:即內存
- LevelDB存儲:性能優於KahaDB,但官方不推薦使用。
- JDBC Message store with ActiveMQ Journal:這種方式克服了JDBC Store的不足,JDBC每次消息過來,都需要去寫庫和讀庫。ActiveMQ Journal,使用高速緩存寫入技術,大大提高了性能。
詳細配置方式參照官方文檔。
消息發送策略
ActiveMQ支持同步、異步兩種發送模式將消息發送到消息中間件上。
同步發送過程中,發送者發送一條消息會阻塞直到消息中間件反饋一個確認消息,表示消息已經被消息中間件處理。這個機制提供了消息的安全性保障,但是由於是阻塞的操作,會影響到客戶端消息發送的性能。異步發送的過程中,發送者不需要等待broker提供反饋,所以性能相對較高。但是可能會出現消息丟失的情況。所以使用異步發送的前提是在某些情況下允許出現數據丟失的情況。
默認情況下,非持久化消息是異步發送的,持久化消息並且是在非事務模式下是同步發送的。但是在開啟事務的情況下,消息都是異步發送。由於異步發送的效率會比同步發送性能更高,所以在發送持久化消息的時候,盡量去開啟事務會話。除了持久化消息和非持久化消息的同步和異步特性以外,我們還可以通過以下幾種方式來設置異步發送:
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616?jms.useAsyncSend=true");
((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
((ActiveMQConnection)connection).setUseAsyncSend(true);
三、原理淺析
ActiveMQ的上手非常簡單,但僅僅只是會用肯定不行,只有了解其原理,才能對特定的場景做出優化和設計,而要了解其原理,只有通過分析其源碼才能完全了解。限於篇幅原因,接下來只是針對發送消息、消費消息和消息重發機制的流程做一個概括性總結。
發送原理
上面就是整個發消息的流程圖,當生產者調用send發送消息時,首先會判斷producerWindowSize(這個稍后會詳細講解)是否還有空間,若沒有了就阻塞等待空間;反之則繼續判斷是否是異步發送消息,如果是同步,則直接通過底層傳輸協議傳輸消息,並阻塞等待response結果;如果是異步發送,同樣通過底層傳輸協議傳輸消息,但不再需要阻塞等待response,同時會去增加producerWindowSize的值。
什么是producerWindowSize?這個配置主要用來約束異步發送時producer端允許積壓(未ack)的消息大小。當發送消息時,首先會判斷producerWindowSize是否還有剩余空間,如果沒有就阻塞等待空間釋放,即等待broker(可以就當作是消息隊列中間件)確認消息;如果有空間,就放入到該空間下,等待broker處理。可以通過以下兩種方式配置:
- 在連接url中設置,對所有producer都有效:tcp://localhost:61616?jms.producerWindowSize=1048576
- 在destination名稱中設置,僅對使用該destination的producer有效,並且優先級更高:test-queue?producer.windowSize=1048576
消費原理
消費消息流程
消費者在通過receive消費消息時,並不是直接去broker上獲取的消息,而是從本地的unconsumerMessage隊列中獲取,而該隊列則是每次批量從broker上拉取消息,每次拉取的數量就是由prefetchSize控制的。當隊列中沒有消息時,就會阻塞等待獲取消息;反之則依次從unconsumerMessage隊列中取出消息消費,並將應答放到delivered隊列返回給broker,消費消息和ack是異步的。那消息是如何添加到unconsumerMessage隊列中的呢?這個過程也是非常復雜的,這里就不詳細分析了,感興趣的讀者可自行分析源碼。下面我們來看看消息的確認過程。
消息確認及消息重發
看到上面這張圖,可能會比較懵,沒關系,我們首先來了解一下ACK_MODE和ACK_TYPE,ACK_MODE在上文已經講過了,但僅僅是消費端確認了還不夠,還需要讓broker知道消息是否正常消費,因此在確認消息后消費者還會根據處理結果返回不同的ACK_TYPE給broker,ACK_TYPE一共有以下6種:
- DELIVERED_ACK_TYPE = 0 消息"已接收",但尚未處理結束。
- POSION_ACK_TYPE = 1 消息"錯誤",通常表示"拋棄"此消息,比如消息重發多次后,都無法正確處理時,消息將會被刪除或者 加入DLQ(死信隊列)
- STANDARD_ACK_TYPE = 2 “標准"類型,通常表示為消息"處理成功”,broker 端可以刪除消息了。
- REDELIVERED_ACK_TYPE = 3 消息需"重發",比如 consumer 處理消息時拋出了異常,broker 稍后會重新發送此消息。
- INDIVIDUAL_ACK_TYPE = 4 表示無論在任何 ACK_MODE 下只確認"單條消息"。
- UNMATCHED_ACK_TYPE = 5 在 Topic 中,如果一條消息在轉發給“訂閱者”時,發現此消息不符合 Selector 過濾條件,那么此消息將 不會轉發給訂閱者,消息將會被存儲引擎刪除(相當於在 Broker 上確認了消息)。
- EXPIRED_ACK_TYPE = 6 消息已過期。
清楚了ACK_TYPE所對應的意思后,再看這張圖就很明了了。首先從unconsumerMessage隊列中取出消息並處理,若消費消息出現異常失敗,消費者就會返回REDELIVERED_ACK_TYPE給broker,broker就會重發該條消息,當超過次數限制消費者就會返回POSION_ACK_TYPE告訴broker該條消息是有毒的,broker根據配置將該條消息拋棄或是加入死信隊列中(該隊列可以被重新消費);若消費消息成功未出現異常,就會將ack message添加到delivered隊列中,消費該隊列的消息時,會進行一系列判斷並根據結果返回不同的ACK_TYPE。
剛剛我們提到消息消費失敗會導致消息重發,那究竟在哪些情況下會被重發呢?主要有以下幾種情況:
- 在事務型會話中,若是沒有調用session.commit提交確認消息或者調用session.rollback方法。
- 在非事務性會話中,ACK 模式為 CLIENT_ACKNOWLEDGE 的情況下,沒有調用 acknowledge 或者調用了 recover 方法。
- 處理消息時發生異常。
這就是整個消息的確認和重發原理。
四、基本優化
使用任何一個中間件並出現性能問題時,我們都會考慮如何去優化,本節只是簡單講講消費端的優化。
在上文我們提到過prefetchSize配置,該配置表示消費者每次從隊列中獲取消息的條數,該配置為0時表示消費者通過pull方式從broker獲取消息,另外不同類型的隊列具有不同的默認值:
- 持久化隊列和非持久化隊列的默認值為1000
- 持久化 topic 默認值為 100
- 非持久化topic的默認值為 Short.MAX_VALUE-1
但是僅僅只有批量獲取肯定是不夠的,因為從上文我們知道,消息還有一個確認過程,如果還是單個單個的確認,那這個批量獲取就沒有什么意義了(除了第一次是批量獲取消息,后面都是單個單個的獲取消息),所以ActiveMQ還提供了optimizeAcknowledge配置,該參數為true時,消費者會延遲確認(默認是ack了0.65*prefetchSize個消息后才確認)。該配置可以直接在連接url中配置(其中optimizeAcknowledgeTimeOut是表示超過該時間也會自動確認):
ConnectionFactory connectionFactory= new w ActiveMQConnectionFactory( "tcp://192.168.0.106:61616?jms.optimizeAcknowledge=true&jms.optimizeAcknowledgeTimeOut=10000" ");
因此,這兩者協同配合才能起到優化的作用。另外,需要注意的是,如果消費端的消費速度比較高,通過這兩者組合是能大大提升消費者的性能。如果消費者的消費性能本身就比較慢,設置比較大的 prefetchSize 反而不能有效的達到提升消費性能的目的,因為過大的prefetchSize 會導致某一消費端積壓消息,而其它的消費端卻“無所事事”。同時,該方案需要消費端能夠容忍重復消息,因為當消息還未確認時消費者就怠機了,那么broker就會將該消息重發給其它消費者,導致消息重復。
總結
通過以上學習,我們能看出ActiveMQ是非常簡單易上手的,但它有以下缺點:
- 持久化消息存儲需要建立索引,因此吞吐量低,不適合TPS要求高的業務。
- 不支持消息分片功能,只能自己實現。
由於消息隊列產品眾多,本文只是從基本概念和使用、核心機制原理以及優化等幾方面對ActiveMQ做了一個概括性的引導和總結,並未涉及詳細的源碼分析,另具體的配置也請參照官方文檔。