JMS(Java Message Service),即 Java 消息服務,是 JavaEE 中規范標准之一;用於在兩個應用程序之間進行異步通信。本文主要介紹下 JMS 1.1 規范的基本內容並簡要說明下 JMS 2.0。
1、基本概念
JMS Provider(提供者) 實現 JMS 接口規范的消息中間件,也就是 MQ 服務器
JMS Producer(生產者) 創建和發送 JMS 消息的客戶端應用
JMS Consumer(消費者) 接收和處理 JMS 消息的客戶端應用
JMS Message(消息) 消息由消息頭、消息屬性和消息體組成
JMS Queue(消息隊列) 消息保存的地方,用於點對點的消息模型
JMS Topic(消息主題) 消息保存的地方,用於發布訂閱的消息模型
2、JMS 消息模型
2.1、點對點消息模型(Point-to-Point Messaging Domain)
該消息模型的特點:
a、每個消息只有一個消費者,消息一旦被消費,就不在消息隊列中了。
b、提供者和消費者之間在時間上沒有依賴性,也就是說當提供者發送了消息之后,不管消費者有沒有正在運行,它不會影響到消息被發送到隊列。
c、每條消息僅會傳送給一個消費者。可能會有多個消費者在一個隊列中偵聽,但是每個隊列中的消息只能被隊列中的一個消費者所消費。
d、消息存在先后順序。一個隊列會按照消息服務器將消息放入隊列中的順序,把它們傳送給消費者。當已被消費時,就會從隊列頭部將它們刪除(除非使用了消息優先級)。
e、消費者在成功接收消息之后需向隊列應答成功。
2.2、發布/訂閱消息模型(Publish/Subscribe Messaging Domain)
該消息模型的特點:
a、每個消息可以有多個消費者
b、發布者和訂閱者之間有時間上的依賴性。針對某個主題的訂閱者,它必須創建一個訂閱者之后,才能消費發布者的消息,而且只能消費訂閱時間之后的消息;JMS允許訂閱者創建一個可持久化的訂閱。這樣,即使訂閱者宕機恢復后,也能接收宕機時生產者發布的消息。
d、每條消息都會傳送給稱為訂閱者的多個消息消費者。
f、消息是被推送給消費者的。
3、JMS API 接口
ConnectionFactory 客戶端用來創建連接的受管對象;可以通過 JNDI 來查找 ConnectionFactory 對象。
Connection 客戶端到 JMS 提供者之間的活動連接。
Session 發送和接收消息的一個單線程上下文
Destination 由 Session 創建的 Queue 或 Topic 對象。
MessageProducer 由 Session創建的對象,用於發送消息到 Queue 或 Topic。
MessageCosumer 由 Session 創建的對象,用於接收 Queue 或 Topic 中的消息。
Message 消費者和生產者之間傳送的數據。
MessageListener 消息監聽器,消費者注冊消息監聽器,有消息到達,將調用該接口的 onMessage 方法。
4、JMS Message
JMS 消息由 消息頭、消息屬性、消息體 三部分組成。
4.1、消息頭
JMSDestination 消息發送的目的地,主要有 Queue 和 Topic,它們都是 Destination 的實現。
JMSDeliveryMode:消息傳輸模式,有兩種模式:持久化模式和非持久化模式;持久化的消息在消息服務器宕機后再重啟不會丟失,而非持久化的消息則會丟失,可通過將消息設置為持久化來保證消息的可靠性,Queue 中的消息默認是持久化的,Topic 中的消息默認是非持久化的。
JMSExpiration:消息的過期時間,默認永不過期。若給 MessageProducer 對象設置了 timeToLive 屬性值或者在調用 MessageProducer.send() 時指定了 timeToLive 的值,則消息將在 timeToLive 之后過期;如果設置 timeToLive 的值為 0,則永不過期,也可以給消息設置 JMSExpiration 屬性值指定該消息的過期時間。消息發送后,在消息過期后若還沒有被消費則會被清除。
JMSPriority 消息的優先級,有 0-9 十個級別,0-4 是普通消息,5-9 是加急消息。JMS 不要求 MQ 嚴格按照這十個優先等級發送消息,但必須保證加急消息先於普通消息到達目的地,默認的消息優先級是 4 級。
JMSMessageID 每條消息的唯一標識,默認由 MQ 產生,也可以自定義。
JMSTimestamp 消息發送時的時間。
JMSCorrelationID 關聯的消息 ID,通常用在需要回傳消息的時候。
JMSReplyTo 消息回復的目的地,其值為一個 Topic 或 Queue, 這個由發送者設置,但是接收者可以決定是否響應。
JMSRedelivered 消息是否重復發送過,如果該消息之前發送過,那么這個屬性的值需要被設置為 true;客戶端可以根據這個屬性的值來確認這個消息是否重復發送過,以避免重復處理。
JMSType 消息類型,包括 TextMessage、BytesMessage、MapMessage、StreamMessage 和 ObjectMessage。
4.2、消息體
消息傳輸的內容
4.3、消息屬性
消息屬性可看作消息頭的補充,消息屬性按類型可以分為標准屬性(JMSX 作為前綴),消息組件自定義的屬性(JMS_ 作為前綴),以及應用自定義的屬性。自定義的屬性不要以前面兩種為前綴。標准的JMSX屬性如下:
5、消息的可靠性
消息的可靠性通過三個方面保證:消息的持久化、事務、消息的簽收。
5.1、消息的持久化
消息的持久化是通過設置DeliveryMode實現的,DeliveryMode 有兩種模式:
DeliveryMode.PERSISTENT:持久化,服務器宕機重啟后消息依然存在
DeliveryMode.NON_PERSISTENT:非持久化,服務器宕機再重啟消息將不存在
Queue 中的消息默認是持久化的,Topic中的消息默認是非持久化的。
對於 Topic,消費者采用 MessageConsumer 和采用 TopicSubscriber 消費消息是不同的,JMS 不會將 MessageConsumer 對象持久化(也就無法記錄時間節點),但是會將 TopicSubscriber 對象持久化,這樣就可以記錄每個訂閱者的訂閱時間點,即使消費者掉線,也能在恢復后消費掉線時產生的消息;采用 TopicSubscriber 方式消費消息時需要消息持久化。
消息的持久化和消息的訂閱模式是完全不同的兩個概念,它們之間沒有任何關系,只不過消息的持久化是否有意義需要參考消息的消費方式。
消息的持久化可以在兩個地方設置:
a、調用生產者的 MessageProducer.setDeliveryMode() 方法,設置該生產者生產的所有消息的持久化模式,除非單獨為某個消息設置了持久化模式
b、調用消息的 Message.setJMSDeliveryMode(),只設置這一條消息的持久化模式
5.2、事務
在通過 Connection 創建 Session 的時候可以指明這個 Session 下的消息生產者和消息消費者是否以事務的方式發送和消費消息:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
事務中的操作作為一個原子的整體,要么一次性全部提交,要么全部回滾。以事務的方式發送和消費消息,需要顯示的提交和回滾事務,在事務未提交之前是不會生效的。
try { ... session.commit(); } catch (JMSException e) { session.rollback(); e.printStackTrace(); }
5.3、消息的簽收
消息的簽收是消息被消費的標志,消息的簽收機制是為了避免消息的重復消費,因此消息的簽收是相對於消費者的,對於生產者幾乎沒有意義。
對於Queue中的消息,一旦消息被簽收則這條消息的狀態就會從待消費狀態(Pending Messages)變為已消費狀態(Messages Dequeued),從而從待消費隊列中移除。
對於Topic中的消息,若采用 MessageConsumer 消費消息則簽收機制是沒有意義的,因為 MessageConsumer 只能消費 Topic 中自消費者在 MQ 服務器注冊之后推送到 Topic 中的消息,至於這之前的消息簽收與否 MessageConsumer 不關心(因為看不到之前的消息),也就是說使用 MessageConsumer 消費 Topic 中的消息時是不會存在重復消費的問題的;若采用 TopicSubscriber 消費消息,簽收機制避免重復消費消息的作用就凸顯出來了,此時消息的簽收將會作為某個訂閱者(以 Connection 的 ClientID 作為標識)已消費過 Topic 中的某個消息的標志,也就是說消費者每次上線后都只會消費訂閱的Topic中未被簽收的消息,已簽收的消息則不會被重復消費。
消息的簽收機制有四種:
1、Session.AUTO_ACKNOWLEDGE:值為 1,自動簽收,消費一條簽收一條
2、Session.CLIENT_ACKNOWLEDGE:值為 2,客戶端手動簽收,需顯示調用 Message.acknowledge() 方法完成簽收
3、Session.DUPS_OK_ACKNOWLEDGE:值為 3,不必必須簽收,消息可能會重復發送。在第二次重新傳遞消息的時候,消息頭的 JmsDelivered 會被置為 true 標示當前消息已經傳送過一次,客戶端需要進行消息的重復處理控制。
4、Session.SESSION_TRANSACTED:值為 0,以事務的方式簽收,該種方式創建 Session 時事務必須設置為 true。
事務對消息簽收的影響:消息簽收是事務控制的一部分
1、若創建 Session 時是以事務的方式創建的,此時只要事務提交就會將所有消息的簽收狀態置為已簽收,只要事務不提交則消息的簽收狀態就不起作用
2、若創建 Session 時是以非事務的方式創建的,則對消息的簽收有沒有影響
6、JMS 2.0 新特性
6.1、新增特性
a、延遲投遞:消息生產者現在可以指定一個消息不立即投遞而是在特定的時間間隔以后投遞
b、可以異步發送消息
c、JMS提供者必須設置JMSXDeliveryCount消息屬性
6.2、改進擴展性的變更
持久訂閱或非持久訂閱現在可以是“共享的”,共享的訂閱可以有多個消費者,多個消費者共同消費消息;該功能類型 Kafka 的消費者組的概念。
6.3、簡化 JMS API 使用的變更
a、Connection,Session 和其他帶有 close() 方法的對象現在實現了 java.lang. AutoCloseable 接口,這允許它們被用於 Java SE7 的 try-with-resources 語句中
b、添加了新的方法用於創建 session,而無需提供過多的參數
c、雖然創建非共享持久訂閱時 Client ID 還是必須的,但是現在創建共享持久訂閱時 Client ID 是可選的
d、Message 增加 getBody 方法,可以直接從消息中抽取內容而不需要預先將其轉換到一個子類型
e、增加了一個新的簡化 API,相對於標准 API 更加簡單,特別是用於 Java EE 應用的時候
6.3.1、簡化 API
簡化 API 與傳統 API 提供的消息功能是一樣的,但是它需要的接口更少、使用更方便。 簡化 API 提供的主要接口如下:
ConnectionFactory:客戶端用來創建連接的受管對象,傳統API也會使用此接口。
JMSContext:客戶端到 JMS 提供者之間的活動連接,以及發送和接收消息的一個單線程上下文。
JMSProducer:由 JMSContext 創建的對象,用於發送消息到 Queue 或 Topic。
JMSConsumer:由 JMSContext 創建的對象,用於接收 Queue 或 Topic 中的消息
在簡化 API 中,一個 JMSContext 對象封裝了傳統 API 中 Connection 和 Session 兩個對象的行為。