AMQ學習筆記 - 06. 可靠消息傳送


概述


本文介紹JMS中可能發生消息故障的3個隱患階段,以及確保消息安全的3種保障機制。

故障分析


在介紹可靠傳送的確保機制之前,先分析消息在傳送的過程中可能在哪個階段出現問題。

1.兩個躍點

躍點的含義在於消息的持有者發生變化,如發送使消息由Producer持有變成JMS Provider持有。在消息傳送的過程中,共有2個躍點:
  1. 發送躍點
    Producer將消息發送到JMS Provider的目的地
  2. 接收躍點
    Consumer從JMS Provider的目的地獲取消息

2.三個隱患階段

在消息傳送過程中,可能在三個不同的階段出現問題:
  1. 發送階段
    Producer發送消息到目的地,可能會失敗。
  2. 消息緩存階段
    JMS Provider異常導致目的地中緩存的數據丟失。
  3. 接收階段
    Consumer從目的地接收消息,可能會失敗。

可靠傳送保障機制


為了解決在隱患階段可能出現的問題,JMS體系定義了幾個保障機制:確認、事務、持久化。

1.Acknowledge - 確認

確認是客戶端(Producer或Consumer)和JMS Provider之間,為了確保Message的可靠傳送而發送消息進行確認的機制。
這個機制可以用於解決發送階段、接收階段的安全隱患。

1.1.對於Producer

確認的機制是:生成方 調用的send()方法會被阻塞,直到JMS Provider收到了Message、存儲到目的地,並發送確認給生成方,阻塞才被解除。
所以,確認機制對於生成方來說是透明的。

1.2.對於Consumer

確認的機制是:消費者從目的地接收Message之后,發送接收確認給JMS Provider,JMS Provider收到確認后,將從目的地中刪除該消息,發送處理確認給消費者。
對於消費者而言,有幾種確認機制:
  • AUTO_ACKNOWLEDGE
    自動確認:會話對收到的每條Message,自動發送接收確認;然后會話線程阻塞,直到收到了JMS Provider發來的處理確認。
    接收確認的發送時機:同步接收中,調用receive()方法成功返回;異步接收中,MessageListener的onMessage(Message)方法被調用,並成功返回。
  • CLIENT_ACKNOWLEDGE
    客戶端確認:客戶端調用Message#acknowledge()發送接收確認[1];然后會話線程阻塞,直到收到了JMS Provider發來的處理確認。
    接收確認的發送時機:顯示調用Message#acknowledge()方法。
    注:
    [1] 每次確認不是只對當前的Message進行確認,而是對自上次確認以來的所有Message進行確認.
  • DUPS_OK_ACKNOWLEDGE
    消息可重復確認:這個機制行為上表現的和AUTO_ACKNOWLEDGE一樣,具有延遲確認的特點。這個機制可能會導致收到重復的消息。
    接收確認的發送時機:測試中,PTP Mode每一條消息都會即時確認;Pub/Sub Mode在接收的消息數,每超過prefetch size閥值[1]一半的時候確認一次。
    注:
    [1] 在brokerURL中可以指定參數jms.prefetchPolicy.topicPrefetch作為prefetch size閥值。你可能會猜測:應該也有參數jms.prefetchPolicy.queuePrefetch,可以讓DUPS_OK_ACKNOWLEDGE在PTP Mode中也批量確認。但是實驗的結果是每一條消息都會即時確認,就好像DUPS_OK_ACKNOWLEDGE只是針對Pub/Sub設定的。

1.3.確認機制的設置

javax.jms.Session針對三種確認機制,分別定義了整型常量:
  • AUTO_ACKNOWLEDGE - 自動確認
  • CLIENT_ACKNOWLEDGE - 客戶端確認
  • DUPS_OK_ACKNOWLEDGE - 可重復確認
 
javax.jms.Connection提供了設置的方法:
  • createSession(boolean transacted, int acknowledgeMode):Session
    transacted - 設置事務,后面才會談到事務,這里設置false就好了
    acknowledgeMode - 設置確認機制
 
確認機制的設置是對於有接收需要的Session來講的,如果客戶端是純粹的Producer,確認機制的設置應該是被忽略的。鑒於JMS API只提供了上面一個方法來獲取Session實例,建議在Producer-Client端獲取Session時,使用Connection# createSession(false,  Session. AUTO_ACKNOWLEDGE )。對於純粹的消費者,或者同時作為生產者和消費者,要針對消費的需要來考慮選擇哪一個確認機制。

1.4.確認機制的獲取

javax.jms.Session提供獲取的方法:
  • getAcknowledgeMode():int
    返回結果有4種,除了上述的3種確認機制之外,還有一個Session.SESSION_TRANSACTED表示Session啟用了事務。

1.5.測試

關於確認機制的測試,參考 確認機制的測試

2.Transaction - 事務

事務用於將多個操作組成一個原子操作,要么全部成功,要么全部失敗。與事務相關的兩個指令:提交、回滾。當確認所有的操作都是正確的時候,執行提交,這些操作就會生效;當其中有一個操作失敗的時候,執行回滾,所有已執行的操作就會撤銷。
事務的原理:類比於數據庫中的事務,原理相同。事務都是用於對於多個修改性質的操作進行綁定,以這些個修改操作要么全部成功,要么全部失敗的保障,來確保數據的一致性。數據庫中的事務是對修改數據的行為進行綁定,JMS中的事務是對修改目的地中消息的行為 (發送消息、接收消息) 進行綁定。
 
如果你對JDBC中的事務有所了解,對比其和JMS中的事務,興許能幫助你記憶JMS中的事務:
對比內容\事務 JDBC事務 JMS事務
范圍 java.sql.Connection實例的生命周期之內 javax.jms.Session實例的生命周期之內
設置和獲取 java.sql.Connection:
  • setAutoCommit(false)
    設置事務
  • getAutoCommit():boolean
    獲取事務
javax.jms.Connection:
  • createSession(true, *)
    設置事務。
    第二個參數可以設置確認模式,可以是任意值:啟用了事務就會忽略確認模式。
    不過建議使用Session.SESSION_TRANSACTED
javax.jms.Session:
  • getTransacted():boolean
    獲取事務
操作 java.sql.Connection:
  • commit():void
    提交
  • rollback():void
    回滾
javax.jms.Session:
  • commit():void
    提交
  • rollback():void
    回滾
 
事務的設置和確認機制的設置使用同一個方法,那么有必要把確認機制和事務進行對比:
對比內容\機制 確認機制 事務機制
本質 確保消費者能夠獲取到消息,在收到消費者的確認之后,才會將消息從目的地移除。 將多個操作綁在一起,成為原子操作,從而有相同的操作結果(成功或失敗)。
客戶端編程 在Producer端是透明的,只能在Consumer端進行控制。 可以在Producer和Consumer端實施控制。
功能性 確認消息已收到。 事務的commit,提供類似於批量確認的功能。
不要單純為了批量確認而使用事務,要從業務的需要上考慮使用事務。
因為事務機制提供了確認的功能,所以確認機制和事務機制是互斥的。Connection# createSession(boolean transacted, int acknowledgeMode)方法用於設置事務或確認:
  • 如果第一個參數為true,表示啟用事務,第二個參數就會被忽略(建議傳入Session.SESSION_TRANSACTED
  • 如果第一個參數為false,表示不啟用事務,第二個參數用於設置確認模式,就有效了。

2.1.使用模型

 1 try {
 2     // ...
 3     javax.jms.Connection connection = ...;
 4     javax.jms.Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
 5     
 6     // send or receive operates
 7     
 8     session.commit(); // commit at lase
 9 } catch(JMSException e) {
10     session.rollback(); // rollback when exception happens
11 } finally {
12     // close resources
13 }

 

2.2.測試

關於事務的測試,參考 事務的測試

3.持久化模式

確認模式和事務模式足以 [1]處理發送階段、接收階段的安全隱患,持久化的機制則用來處理消息在目的地階段的安全隱患。
如果Producer選擇不使用持久化,則消息緩存在內存中,雖然可以獲得高吞吐率,但是一旦JMS Provider宕掉,就會導致消息的丟失。非持久化具有高吞吐量和低可靠性的特點。
如果Producer選擇持久化,則JMS Provider會將消息存到物理媒介上(文件、數據庫),Consumer獲取消息,也是從物理媒介讀取,吞吐量受到影響,但是即使JMS Provider宕掉,消息也不會丟失。持久化具有低吞吐量和高可靠性的特點。
針對持久化,JMS Provider可以提供多種持久化方案,比如持久化到本地文件、不同的數據庫等,不過這些不是Producer需要關心的,Producer只需要告訴JMS Provider要不要持久化消息就好了。
持久化方案的選擇,在 %ActiveMQ_HOME%\conf\activemq.xml文件, <persistenceAdapter />元素下配置。可以參考 ActiveMQ持久化,這里不介紹這塊內容。
 
補充:
[1] 實際上,在發送階段需要我們自己提供容錯方案 - 比如對發送失敗的消息,緩存到本地文件等。

3.1.持久化設置

javax.jms.MessageProducer:
  • setDeliveryMode(int):void
    - 設置此producer實例的默認遞送模式
  • send(Message message):void
    - 以默認的deliveryMode發送消息
  • send(Message message, int deliveryMode, int priority, long timeToLive):void
    - 以指定的deliveryMode發送消息
 
javax.jms.DeliveryMode
  • static int NON_PERSISTENT
    非持久化模式
  • static int PERSISTENT
    持久化模式
 
設置遞送模式的代碼:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久化
producer.setDelicertMode(DeliveryMode.PERSISTENT); // 持久化

3.2.測試

參考 持久化的測試

參考


  1. 第 2 章 客戶端編程模型
    本文的骨架是參考這篇文章整理的
  2. ActiveMQ訊息傳送機制以及ACK機制
    這篇文章對ACK的機制講的比較細,我嘗試了optimizeACK + prefetch + AUTO_ACKNOWLEDGE的預取機制,以及DUPS_OK_ACKNOWLEDGE在Pub/Sub模式下的延遲確認機制。
  3. ActiveMQ持久化
    介紹了幾種常見的持久化方案。






免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM