概述
本文介紹JMS中可能發生消息故障的3個隱患階段,以及確保消息安全的3種保障機制。
故障分析
在介紹可靠傳送的確保機制之前,先分析消息在傳送的過程中可能在哪個階段出現問題。
1.兩個躍點
躍點的含義在於消息的持有者發生變化,如發送使消息由Producer持有變成JMS Provider持有。在消息傳送的過程中,共有2個躍點:
- 發送躍點
Producer將消息發送到JMS Provider的目的地 - 接收躍點
Consumer從JMS Provider的目的地獲取消息

2.三個隱患階段
在消息傳送過程中,可能在三個不同的階段出現問題:
- 發送階段
Producer發送消息到目的地,可能會失敗。 - 消息緩存階段
JMS Provider異常導致目的地中緩存的數據丟失。 - 接收階段
Consumer從目的地接收消息,可能會失敗。

可靠傳送保障機制
為了解決在隱患階段可能出現的問題,JMS體系定義了幾個保障機制:確認、事務、持久化。
1.Acknowledge - 確認
確認是客戶端(Producer或Consumer)和JMS Provider之間,為了確保Message的可靠傳送而發送消息進行確認的機制。
這個機制可以用於解決發送階段、接收階段的安全隱患。
這個機制可以用於解決發送階段、接收階段的安全隱患。
1.1.對於Producer
確認的機制是:生成方
調用的send()方法會被阻塞,直到JMS Provider收到了Message、存儲到目的地,並發送確認給生成方,阻塞才被解除。
所以,確認機制對於生成方來說是透明的。
1.2.對於Consumer
確認的機制是:消費者從目的地接收Message之后,發送接收確認給JMS Provider,JMS Provider收到確認后,將從目的地中刪除該消息,發送處理確認給消費者。
對於消費者而言,有幾種確認機制:
- AUTO_ACKNOWLEDGE
自動確認:會話對收到的每條Message,自動發送接收確認;然后會話線程阻塞,直到收到了JMS Provider發來的處理確認。
接收確認的發送時機:同步接收中,調用receive()方法成功返回;異步接收中,MessageListener的onMessage(Message)方法被調用,並成功返回。 - CLIENT_ACKNOWLEDGE
客戶端確認:客戶端調用Message#acknowledge()發送接收確認[1];然后會話線程阻塞,直到收到了JMS Provider發來的處理確認。
接收確認的發送時機:顯示調用Message#acknowledge()方法。
注:
[1] 每次確認不是只對當前的Message進行確認,而是對自上次確認以來的所有Message進行確認. - DUPS_OK_ACKNOWLEDGE
消息可重復確認:這個機制行為上表現的和AUTO_ACKNOWLEDGE一樣,具有延遲確認的特點。這個機制可能會導致收到重復的消息。
接收確認的發送時機:測試中,PTP Mode每一條消息都會即時確認;Pub/Sub Mode在接收的消息數,每超過prefetch size閥值[1]一半的時候確認一次。
注:
[1] 在brokerURL中可以指定參數jms.prefetchPolicy.topicPrefetch作為prefetch size閥值。你可能會猜測:應該也有參數jms.prefetchPolicy.queuePrefetch,可以讓DUPS_OK_ACKNOWLEDGE在PTP Mode中也批量確認。但是實驗的結果是每一條消息都會即時確認,就好像DUPS_OK_ACKNOWLEDGE只是針對Pub/Sub設定的。
1.3.確認機制的設置
javax.jms.Session針對三種確認機制,分別定義了整型常量:
- AUTO_ACKNOWLEDGE - 自動確認
- CLIENT_ACKNOWLEDGE - 客戶端確認
- DUPS_OK_ACKNOWLEDGE - 可重復確認
javax.jms.Connection提供了設置的方法:
- createSession(boolean transacted, int acknowledgeMode):Session
transacted - 設置事務,后面才會談到事務,這里設置false就好了
acknowledgeMode - 設置確認機制
確認機制的設置是對於有接收需要的Session來講的,如果客戶端是純粹的Producer,確認機制的設置應該是被忽略的。鑒於JMS API只提供了上面一個方法來獲取Session實例,建議在Producer-Client端獲取Session時,使用Connection#
createSession(false,
Session.
AUTO_ACKNOWLEDGE
)。對於純粹的消費者,或者同時作為生產者和消費者,要針對消費的需要來考慮選擇哪一個確認機制。
1.4.確認機制的獲取
javax.jms.Session提供獲取的方法:
- getAcknowledgeMode():int
返回結果有4種,除了上述的3種確認機制之外,還有一個Session.SESSION_TRANSACTED表示Session啟用了事務。
1.5.測試
關於確認機制的測試,參考
確認機制的測試。
2.Transaction - 事務
事務用於將多個操作組成一個原子操作,要么全部成功,要么全部失敗。與事務相關的兩個指令:提交、回滾。當確認所有的操作都是正確的時候,執行提交,這些操作就會生效;當其中有一個操作失敗的時候,執行回滾,所有已執行的操作就會撤銷。
事務的原理:類比於數據庫中的事務,原理相同。事務都是用於對於多個修改性質的操作進行綁定,以這些個修改操作要么全部成功,要么全部失敗的保障,來確保數據的一致性。數據庫中的事務是對修改數據的行為進行綁定,JMS中的事務是對修改目的地中消息的行為
(發送消息、接收消息)
進行綁定。
如果你對JDBC中的事務有所了解,對比其和JMS中的事務,興許能幫助你記憶JMS中的事務:
對比內容\事務 | JDBC事務 | JMS事務 |
范圍 | java.sql.Connection實例的生命周期之內 | javax.jms.Session實例的生命周期之內 |
設置和獲取 | java.sql.Connection:
|
javax.jms.Connection:
|
操作 | java.sql.Connection:
|
javax.jms.Session:
|
事務的設置和確認機制的設置使用同一個方法,那么有必要把確認機制和事務進行對比:
對比內容\機制 | 確認機制 | 事務機制 |
本質 | 確保消費者能夠獲取到消息,在收到消費者的確認之后,才會將消息從目的地移除。 | 將多個操作綁在一起,成為原子操作,從而有相同的操作結果(成功或失敗)。 |
客戶端編程 | 在Producer端是透明的,只能在Consumer端進行控制。 | 可以在Producer和Consumer端實施控制。 |
功能性 | 確認消息已收到。 | 事務的commit,提供類似於批量確認的功能。 不要單純為了批量確認而使用事務,要從業務的需要上考慮使用事務。 |
因為事務機制提供了確認的功能,所以確認機制和事務機制是互斥的。Connection#
createSession(boolean transacted, int acknowledgeMode)方法用於設置事務或確認:
- 如果第一個參數為true,表示啟用事務,第二個參數就會被忽略(建議傳入Session.SESSION_TRANSACTED)
- 如果第一個參數為false,表示不啟用事務,第二個參數用於設置確認模式,就有效了。
2.1.使用模型
1 try { 2 // ... 3 javax.jms.Connection connection = ...; 4 javax.jms.Session session = connection.createSession(true, Session.SESSION_TRANSACTED); 5 6 // send or receive operates 7 8 session.commit(); // commit at lase 9 } catch(JMSException e) { 10 session.rollback(); // rollback when exception happens 11 } finally { 12 // close resources 13 }
2.2.測試
關於事務的測試,參考
事務的測試。
3.持久化模式
確認模式和事務模式足以
[1]處理發送階段、接收階段的安全隱患,持久化的機制則用來處理消息在目的地階段的安全隱患。
如果Producer選擇不使用持久化,則消息緩存在內存中,雖然可以獲得高吞吐率,但是一旦JMS Provider宕掉,就會導致消息的丟失。非持久化具有高吞吐量和低可靠性的特點。
如果Producer選擇持久化,則JMS Provider會將消息存到物理媒介上(文件、數據庫),Consumer獲取消息,也是從物理媒介讀取,吞吐量受到影響,但是即使JMS Provider宕掉,消息也不會丟失。持久化具有低吞吐量和高可靠性的特點。
針對持久化,JMS Provider可以提供多種持久化方案,比如持久化到本地文件、不同的數據庫等,不過這些不是Producer需要關心的,Producer只需要告訴JMS Provider要不要持久化消息就好了。
持久化方案的選擇,在
%ActiveMQ_HOME%\conf\activemq.xml文件,
<persistenceAdapter />元素下配置。可以參考
ActiveMQ持久化,這里不介紹這塊內容。
補充:
[1] 實際上,在發送階段需要我們自己提供容錯方案 - 比如對發送失敗的消息,緩存到本地文件等。
3.1.持久化設置
javax.jms.MessageProducer:
- setDeliveryMode(int):void
- 設置此producer實例的默認遞送模式 - send(Message message):void
- 以默認的deliveryMode發送消息 - send(Message message, int deliveryMode, int priority, long timeToLive):void
- 以指定的deliveryMode發送消息
javax.jms.DeliveryMode
- static int NON_PERSISTENT
非持久化模式 - static int PERSISTENT
持久化模式
設置遞送模式的代碼:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久化 producer.setDelicertMode(DeliveryMode.PERSISTENT); // 持久化
3.2.測試
參考
持久化的測試。
參考
- 第 2 章 客戶端編程模型
本文的骨架是參考這篇文章整理的 - ActiveMQ訊息傳送機制以及ACK機制
這篇文章對ACK的機制講的比較細,我嘗試了optimizeACK + prefetch + AUTO_ACKNOWLEDGE的預取機制,以及DUPS_OK_ACKNOWLEDGE在Pub/Sub模式下的延遲確認機制。 - ActiveMQ持久化
介紹了幾種常見的持久化方案。