JMS(Java消息服務)


Spring提供了一個JMS集成框架,簡化了JMS API的使用,其方式與Spring對JDBC API的集成基本相同。

JMS可以大致分為兩個功能領域,即消息的生成和消費。JmsTemplate類用於消息生成和同步消息接收。

Spring提供了許多消息偵聽器容器,你可以使用它們來創建消息驅動POJO(MDPs)。Spring還提供了一種聲明性的方法來創建消息偵聽器。

這個org.springframework.jms.core包提供使用JMS的核心功能。它包含JMS模板類,這些類通過處理資源的創建和釋放來簡化JMS的使用,就像JdbcTemplate為JDBC做的那樣。Spring模板類的共同設計原則是提供helper方法來執行常見操作,並且為了更復雜的使用,將處理任務的本質委托給用戶實現的回調接口。JMS模板遵循相同的設計。這些類為發送消息、同步消費消息以及向用戶公開JMS會話和消息生產者提供了各種方便的方法。

org.springframework.jms.support包提供JMSException轉換功能。將選中的JMSException層次結構轉換為未檢查異常的鏡像層次結構。如果javax.jms.JMSException存在時,此異常被包裝在未檢查的UncategorizedJmsException中。

org.springframework.jms.support.converter提供了一個MessageConverter抽象,用於在Java對象和JMS消息之間進行轉換。

org.springframework.jms.support.destination提供管理JMS目的地的各種策略,例如為存儲在JNDI中的目的地提供服務定位器。

org.springframework.jms.annotation包通過使用@JmsListener提供必要的基礎設施來支持注釋驅動的偵聽器端點。

org.springframework.jms.config提供了jms命名空間的解析器實現,以及配置偵聽器容器和創建偵聽器端點的java配置支持。

最后org.springframework.jms.connection包提供了適合在獨立應用程序中使用的ConnectionFactory的實現。它還包含Spring的PlatformTransactionManager for JMS(巧妙地命名為JmsTransactionManager)的實現。這允許將JMS作為事務資源無縫集成到Spring的事務管理機制中。

從Spring Framework 5開始,Spring的JMS包完全支持JMS 2.0,並要求在運行時提供JMS 2.0 API。我們建議使用JMS 2.0兼容地提供程序。如果你碰巧在你的系統中使用了舊的消息代理,你可以嘗試升級到JMS 2.0兼容的驅動程序來生成現有的代理。或者,你也可以嘗試針對基於JMS1.1的驅動程序運行,只需將JMS 2.0 API放在類路徑上,但只對驅動程序使用JMS1.1兼容的API。默認情況下,Spring的JMS支持遵循JMS1.1約定,因此通過相應的配置,它確實支持這樣的場景。但是,請僅考慮過渡場景。

一、使用JMS

JmsTemplate

JmsTemplate類是JMS核心包中的中心類。它簡化了JMS的使用,它在發送或同步接收消息時處理資源的創建和釋放。

使用JmsTemplate的代碼只需要實現回調接口,從而為它們提供一個明確定義的高級契約。當給定由JmsTemplate中的調用代碼提供的會話時,MessageCreator回調接口將創建一條消息。為了允許更復雜地使用JMS API,SessionCallback提供JMS會話,ProducerCallback公開會話和MessageProducer對。

JMS API公開了兩種類型的發送方法,一種將傳遞模式、優先級和生存時間作為服務質量(QOS)參數,另一種不接受QOS參數並使用默認值。由於JmsTemplate有許多發送方法,所以設置QOS參數已經作為bean屬性公開,以避免發送方法數量上的重復。類似地,同步接收調用的超時值是通過使用setReceiveTimeout屬性設置的。

一些JMS提供程序允許通過ConnectionFactory的配置以管理方式設置默認的QOS值。這會導致對MessageProducer實例的send方法(send(Destination Destination,Message Message))的調用使用與JMS規范中指定的不同的QOS默認值。因此,為了提供對QOS值的一致管理,必須通過將布爾屬性isExplicitQosEnabled設置為true來具體啟用JMST模板以使用其自己的QOS值。

為了方便起見,JmsTemplate還公開了一個基本的請求-應答操作,該操作允許發送消息並在作為操作的一部分創建的臨時隊列上等待應答。

JmsTemplate類的實例在配置后是線程安全的。這一點很重要,因為這意味着你可以配置JmsTemplate的單個實例,然后將此共享引用安全地注入到多個協作者中。明確地說,JmsTemplate是有狀態的,因為它維護對ConnectionFactory的引用,但是這個狀態不是會話狀態。

從Spring Framework 4.1開始,JmsMessagingTemplate構建在JmsTemplate之上,並提供與消息傳遞抽象 - 的集成,即org.springframework.messaging.Message,這使你可以創建要以通用方式發送的消息。

連接

JmsTemplate需要對ConnectionFactory的引用。ConnectionFactory是JMS規范的一部分,用作使用JMS的入口點。客戶機應用程序將其用作工廠來創建與JMS提供程序的連接,並封裝各種配置參數,其中許多參數是特定於供應商的,例如SSL配置選項。

在EJB中使用JMS時,供應商提供JMS接口的實現,以便它們能夠參與聲明性事務管理並執行連接和會話的池化。為了使用這個實現,Java EE容器通常要求你在EJB或servlet部署描述符中聲明JMS連接工廠作為資源引用。為了確保這些特性與EJB內的JmsTemplate一起使用,客戶機應用程序應該確保它引用ConnectionFactory的托管實現。

緩存消息資源

標准API涉及到創建許多中間對象。要發送消息,請執行以下“API”檢查:

ConnectionFactory->Connection->Session->MessageProducer->send

在ConnectionFactory和Send操作之間,將創建和銷毀三個中間對象。為了優化資源使用和提高性能,Spring提供了ConnectionFactory的兩個實現。

SingleConnectionFactory

Spring提供了ConnectionFactory接口的實現SingleConnectionFactory,它在所有createConnection()調用中返回相同的連接,並忽略對close()的調用。這對於測試和獨立環境非常有用,以便同一連接可以用於可能跨越任意數量事務的多個JmsTemplate調用。SingleConnectionFactory引用通常來自JNDI的標准ConnectionFactory。

CachingConnectionFactory

CachingConnectionFactory擴展了SingleConnectionFactory的功能,並添加了Session、MessageProducer和MessageConsumer實例的緩存。初始緩存大小設置為1。可以使用sessionCacheSize屬性來增加緩存的會話數。請注意,實際緩存的會話數大於該數量,因為會話是根據其確認模式進行緩存的,因此當sessionCacheSize設置為1時,最多可以有四個緩存會話實例(每個確認模式一個)。MessageProducer和MessageConsumer實例緩存在它們所屬的會話中,並且在緩存時還考慮生產者和使用者的獨特屬性。MessageProducers根據其目的地進行緩存。MessageConsumers是根據由destination、selector、noLocal delivery標志和持久訂閱名稱(如果創建持久使用者)組成的鍵進行緩存的。

目的地管理

目的地,作為ConnectionFactory實例,是JMS管理的對象,可以在JNDI中存儲和檢索。配置Spring應用程序上下文時,你可以使用JNDI JndiObjectFactoryBean工廠類或<jee:jndi-lookup> 對JMS目的地的引用執行依賴注入。但是,如果應用程序中有大量的目的地,或者如果有JMS提供者特有的高級目的地管理特性,那么這種策略通常很麻煩。這種高級目的地管理的例子包括創建動態目的地或支持目的地的分層名稱空間。JmsTemplate將目標名稱的解析委托給實現DestinationResolver接口的JMS目標對象。DynamicDestinationResolver是JmsTemplate使用的默認實現,可用於解析動態目標。還提供了一個JndiDestinationResolver來充當JNDI中包含的目的地的服務定位器,並且可以選擇返回到DynamicDestinationResolver中包含的行為。

通常,JMS應用程序中使用的目的地只在運行時已知,因此在部署應用程序時無法以管理方式創建。這通常是因為在交互系統組件之間存在共享的應用程序邏輯,這些組件根據眾所周知的命名約定在運行時創建目標。盡管動態目的地的創建不是JMS規范的一部分,但大多數供應商都提供了這種功能。動態目的地是用用戶定義的名稱創建的,這將它們與臨時目的地區分開來,並且通常不在JNDI中注冊。用於創建動態目標的API因提供程序而異,因為與目標關聯的屬性是特定於供應商的。但是,供應商有時會做出一個簡單的實現選擇,即忽略JMS規范中的警告,並使用方法TopicSession createTopic(字符串topicName)或QueueSession createQueue(String queueName)方法創建具有默認目的地屬性的新目標。根據供應商實現的不同,DynamicDestinationResolver還可以創建一個物理目標,而不是只解析一個。

boolean屬性pubSubDomain用於配置JmsTemplate,以了解正在使用的JMS域。默認情況下,此屬性的值為false,表示將使用點到點域Queues。此屬性(由JmsTemplate使用)通過DestinationResolver接口的實現確定動態目標解析的行為。還可以通過屬性defaultDestination配置帶有默認目的地的JmsTemplate。默認目的地是發送和接收操作不引用特定目標。

消息監聽容器

JMS消息在EJB世界中最常見的用途之一是驅動消息驅動bean(MDBs)。Spring提供了一種創建消息驅動POJO(MDPs)的解決方案,這種方法不會將用戶綁定到EJB容器。(請參閱Asynchronous Receiving:Message Driven POJO了解Spring的MDP支持的詳細內容。)自Spring Framework 4.1以來,端點方法可以用@JmsListener 。

消息偵聽器容器用於從JMS消息隊列接收消息,並驅動注入其中的MessageListener。偵聽器容器負責消息接收的所有線程化,並將其分派到偵聽器中進行處理。消息偵聽器容器是MDP和消息傳遞提供者之間的中介,負責注冊以接收消息、參與事務、資源獲取和釋放、異常轉換等。這使你可以編寫與接收消息相關聯的(可能復雜的)業務邏輯(並可能對其作出響應),並將JMS基礎設施的樣板問題委托給框架。

Spring提供了兩個標准的JMS消息偵聽器容器,每個容器都有其專門的特性集。

  • SimpleMessageListenerContainer
  • DefaultMessageListenerContainer
使用SimpleMessageListenerContainer

它在啟動時創建固定數量的JMS會話和使用者,使用標准JMS注冊偵聽器MessageConsumer.setMessageListener()方法,並將其留給JMS提供程序來執行偵聽器回調。此變量不允許動態適應運行時要求或參與外部管理的事務。兼容性方面,它非常接近獨立JMS規范的精神,但通常不兼容Java EE的JMS限制。

雖然SimpleMessageListenerContainer不允許參與外部托管事務,但它支持本機JMS事務。要啟用此功能,可以將sessionTransactivated標志切換為true,或者在XML名稱空間中將acknowledge屬性設置為transactied。從偵聽器拋出的異常將導致回滾,並重新傳遞消息。或者,考慮使用客戶端確認模式,該模式在異常情況下也提供重新傳遞,但不使用事務化會話實例,因此在事務協議中不包括任何其他會話操作(如發送響應消息)。

默認的AUTO_ACKNOWLEDGE(自動確認)模式不能提供正確的可靠性保證。當偵聽器執行失敗時(因為提供程序在偵聽器調用后自動確認每個消息,沒有要傳播到提供程序的異常)或偵聽器容器關閉時(你可以通過設置acceptMessagesWhileStopping標志來配置這一點),消息可能會丟失。確保在可靠性需要時使用事務性會話(例如,用於可靠的隊列處理和持久的主題訂閱)。

使用DefaultMessageListenerContainer

在大多數情況下都使用此消息偵聽器容器。與SimpleMessageListenerContainer不同,此容器變量允許動態適應運行時需求,並能夠參與外部管理的事務。使用JtaTransactionManager配置時,每個接收到的消息都注冊到XA事務中。因此,處理可以利用XA事務語義。這個偵聽器容器在JMS提供程序的低要求、高級功能(例如參與外部管理的事務)和與javaee環境的兼容性之間取得了很好的平衡。

你可以自定義容器的緩存級別。請注意,當沒有啟用緩存時,將為每個消息接收創建一個新連接和一個新會話。將其與高負載的非持久訂閱結合使用可能會導致消息丟失。在這種情況下,一定要使用適當的緩存級別。

當代理關閉時,這個容器還具有可恢復的功能。默認情況下,簡單的退避實現每五秒鍾重試一次。你可以為更細粒度的恢復選項指定自定義退避實現。

與它的同級(SimpleMessageListenerContainer)一樣,DefaultMessageListenerContainer支持本機JMS事務,並允許自定義確認模式。如果對你的場景可行,那么強烈建議在外部管理事務中使用這種方法,也就是說,如果你可以在JVM死亡的情況下偶爾處理重復的消息,那么就強烈建議這樣做。業務邏輯中的自定義重復消息檢測步驟可以覆蓋這樣的情況 - ,例如,以業務實體存在性檢查或協議表檢查的形式。任何這樣的安排都比其他方法更有效:用XA事務包裝整個處理過程(通過使用JtaTransactionManager配置DefaultMessageListenerContainer)來覆蓋JMS消息的接收以及消息偵聽器中業務邏輯的執行(包括數據庫操作等)。

默認的AUTO_ACKNOWLEDGE(自動確認)模式不能提供正確的可靠性保證。當偵聽器執行失敗時(因為提供程序在偵聽器調用后自動確認每個消息,沒有要傳播到提供程序的異常)或偵聽器容器關閉時(你可以通過設置acceptMessagesWhileStopping標志來配置這一點),消息可能會丟失。確保在可靠性需要時使用事務性會話(例如,用於可靠的隊列處理和持久的主題訂閱)。

事務管理器

Spring提供了一個JmsTransactionManager來管理單個JMS ConnectionFactory的事務。這使得JMS應用程序可以利用Spring的托管事務特性。JmsTransactionManager執行本地資源事務,將JMS連接/會話對從指定的ConnectionFactory綁定到線程。JmsTemplate自動檢測此類事務性資源,並相應地對其進行操作。

在Java EE環境中,ConnectionFactory將連接和會話實例集中在一起,因此這些資源可以跨事務高效地重用。在獨立環境中,使用Spring的SingleConnectionFactory將導致共享JMS連接,每個事務都有自己獨立的會話。或者,考慮使用特定於提供者的池適配器,例如ActiveMQ的PooledConnectionFactory類。

還可以將JmsTemplate與JtaTransactionManager和支持XA的JMS ConnectionFactory一起使用來執行分布式事務。注意,這需要使用JTA事務管理器以及正確配置XA的ConnectionFactory。

使用JMS API 從連接創建會話時,跨托管和非托管事務環境重用代碼可能會令人困惑。這是因為JMS API有一個工廠方法來創建會話,並且它需要事務和確認模式的值。在托管環境中,設置這些值是環境的事務性基礎設施的責任,因此這些值被JMS連接的供應商包裝器忽略。在非托管環境中使用JmsTemplate時,可以通過使用SessionTransaction和sessionAcknowledgeMode屬性來指定這些值。當你將PlatformTransactionManager與JmsTemplate一起使用時,該模板始終被賦予事務性JMS會話。

二、發送消息

JmsTemplate包含許多用於發送消息的方便方法。Send方法通過使用javax.jms.Destination對象和其他對象通過在JNDI查找中使用字符串指定目標。不帶destination參數的send方法使用默認目標。

以下示例使用MessageCreator回調從提供的會話對象創建文本消息:

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;

import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.JmsTemplate;

public class JmsQueueSender {

    private JmsTemplate jmsTemplate;
    private Queue queue;

    public void setConnectionFactory(ConnectionFactory cf) {
        this.jmsTemplate = new JmsTemplate(cf);
    }

    public void setQueue(Queue queue) {
        this.queue = queue;
    }

    public void simpleSend() {
        this.jmsTemplate.send(this.queue, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("hello queue world");
            }
        });
    }
}

在前面的示例中,JmsTemplate是通過傳遞對ConnectionFactory的引用來構造的。作為一種替代方案,作為一種替代方案,我們提供了一個零參數構造函數和connectionFactory,並可用於以JavaBean樣式(使用BeanFactory或普通Java代碼)構造實例。或者,考慮從Spring的JmsGatewaySupport便利基類派生,該基類為JMS配置提供了預構建的bean屬性。

send(String destinationName,MessageCreator creator)方法允許你使用目的地的字符串名稱發送消息。如果這些名稱是在JNDI中注冊的,那么應該將模板的destinationResolver屬性設置為jndedistinationresolver的實例。

如果你創建了JmsTemplate並指定了一個默認目的地,那么send(MessageCreator c)會向該目的地發送一條消息。

使用消息轉換器

為了方便域模型對象的發送,JmsTemplate有各種發送方法,這些方法將Java對象作為消息數據內容的參數。JmsTemplate中的重載方法convertAndSend()和receiveAndConvert()方法將轉換過程委托給MessageConverter接口的實例。這個接口定義了一個簡單的契約,用於在Java對象和JMS消息之間進行轉換。默認實現(SimpleMessageConverter)支持字符串和TextMessage、byte[]和BytesMessage之間的轉換,以及java.util.Map還有地圖信息。通過使用轉換器,你和你的應用程序代碼可以關注通過JMS發送或接收的業務對象,而不必關心如何將其表示為JMS消息的細節。

沙盒目前包括一個MapMessageConverter,它使用反射在JavaBean和MapMessage之間進行轉換。你可以自己實現的其他流行實現選擇是使用現有的XML編組包(如JAXB、Castor或XStream)來創建表示對象的TextMessage的轉換器。

為了適應消息的屬性、頭和正文的設置,而這些屬性、頭和正文不能被一般性地封裝在轉換器類中,MessagePostProcessor接口允許你在消息被轉換后發送之前訪問它。下面的示例演示如何修改消息頭和java.util.Map轉換為郵件:

public void sendWithConversion() {
    Map map = new HashMap();
    map.put("Name", "Mark");
    map.put("Age", new Integer(47));
    jmsTemplate.convertAndSend("testQueue", map, new MessagePostProcessor() {
        public Message postProcessMessage(Message message) throws JMSException {
            message.setIntProperty("AccountID", 1234);
            message.setJMSCorrelationID("123-00001");
            return message;
        }
    });
}

這將導致以下形式的消息:

MapMessage={
    Header={
        ... standard headers ...
        CorrelationID={123-00001}
    }
    Properties={
        AccountID={Integer:1234}
    }
    Fields={
        Name={String:Mark}
        Age={Integer:47}
    }
}
使用SessionCallback 和 ProducerCallback

雖然send操作涉及許多常見的使用場景,但有時你可能希望對JMS會話或MessageProducer執行多個操作。SessionCallback和ProducerCallback分別公開JMS Session and Session / MessageProducer。JmsTemplate上的execute()方法執行這些回調方法。

三、接收消息

同步接收

雖然JMS通常與異步處理關聯,但你也可以同步使用消息。重載的receive(..)方法提供了此功能。在同步接收期間,調用線程將阻塞,直到消息可用為止。這可能是一個危險的操作,因為調用線程可能被無限期地阻塞。receiveTimeout屬性指定接收方在放棄等待消息之前應等待多長時間。

異步接收——消息驅動的POJO

Spring還通過使用@JmsListener注釋來支持帶注釋的偵聽器端點,並提供了一個開放的基礎設施來以編程方式注冊端點。到目前為止,這是設置異步接收器最方便的方法。

與EJB世界中的消息驅動Bean(MDB)類似,消息驅動POJO(MDP)充當JMS消息的接收器。MDP的一個限制是它必須實現javax.jms.MessageListener接口。注意,如果你的POJO在多個線程上接收消息,那么確保你的實現是線程安全的是很重要的。

以下示例顯示了MDP的簡單實現:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ExampleListener implements MessageListener {

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                System.out.println(((TextMessage) message).getText());
            }
            catch (JMSException ex) {
                throw new RuntimeException(ex);
            }
        }
        else {
            throw new IllegalArgumentException("Message must be of type TextMessage");
        }
    }
}

一旦實現了MessageListener,就應該創建一個消息偵聽器容器了。

以下示例顯示如何定義和配置Spring附帶的消息偵聽器容器之一(在本例中為DefaultMessageListenerContainer):

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
</bean>
使用SessionAwareMessageListener接口

SessionAwareMessageListener接口是一個特定於Spring的接口,它提供了與JMS MessageListener類似的契約,但也為消息處理方法提供了對從中接收消息的JMS會話的訪問。下表顯示了SessionAwareMessageListener接口的定義:

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

    void onMessage(Message message, Session session) throws JMSException;
}

如果你希望MDPs能夠響應任何接收到的消息(通過使用onMessage(Message,Session)方法中提供的會話),你可以選擇讓MDPs實現此接口(優先於標准JMS MessageListener接口)。

Spring附帶的所有消息偵聽器容器實現都支持實現MessageListener或SessionAwareMessageListener接口的MDPs,實現SessionAwareMessageListener的類附帶一個警告,即它們隨后通過接口綁定到Spring。

注意,SessionAwareMessageListener接口的onMessage(..)方法拋出JMSException。與標准JMS MessageListener接口不同,在使用SessionAwareMessageListener接口時,客戶端代碼負責處理任何拋出的異常。

使用MessageListenerAdapter

MessageListenerAdapter類是Spring異步消息傳遞支持的最后一個組件。簡而言之,它允許你將幾乎任何類公開為MDP(盡管有一些限制)。

考慮以下接口定義:

public interface MessageDelegate {

    void handleMessage(String message);

    void handleMessage(Map message);

    void handleMessage(byte[] message);

    void handleMessage(Serializable message);
}

請注意,盡管接口既不擴展MessageListener也不擴展SessionAwareMessageListener接口,但你仍然可以通過使用MessageListenerAdapter類將其用作MDP。還要注意各種消息處理方法是如何根據它們可以接收和處理的各種消息類型的內容進行強類型化的。

現在考慮MessageDelegate接口的以下實現:

public class DefaultMessageDelegate implements MessageDelegate {
    // implementation elided for clarity...
}

特別要注意的是,前面的MessageDelegate接口實現(DefaultMessageDelegate類)根本沒有JMS依賴關系。它確實是一個POJO,我們可以通過以下配置將其制成MDP:

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultMessageDelegate"/>
    </constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
</bean>

下一個示例顯示了另一個只能處理接收JMS TextMessage消息的MDP。請注意消息處理方法實際上是如何調用receive的(MessageListenerAdapter中的消息處理方法的名稱默認為handleMessage),但它是可配置的(你將在本節后面看到)。還要注意receive(..)方法是如何被強類型化的,以便只接收和響應JMS TextMessage消息。下表顯示了TextMessageDelegate接口的定義:

public interface TextMessageDelegate {

    void receive(TextMessage message);
}

下表顯示了一個實現TextMessageDelegate接口的類:

public class DefaultTextMessageDelegate implements TextMessageDelegate {
    // implementation elided for clarity...
}

MessageListenerAdapter的配置如下:

<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultTextMessageDelegate"/>
    </constructor-arg>
    <property name="defaultListenerMethod" value="receive"/>
    <!-- we don't want automatic message context extraction -->
    <property name="messageConverter">
        <null/>
    </property>
</bean>

請注意,如果messageListener接收到TextMessage以外類型的JMS消息,則會拋出一個IllegalStateException(並隨后吞並)。MessageListenerAdapter類的另一個功能是,如果處理程序方法返回非空值,則可以自動發回響應消息。考慮以下接口和類:

public interface ResponsiveTextMessageDelegate {

    // notice the return type...
    String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
    // implementation elided for clarity...
}

如果將DefaultResponsiveTextMessageDelegate與MessageListenerAdapter一起使用,則從執行“receive(..)”方法返回的任何非空值(在默認配置中)將轉換為TextMessage。然后,生成的TextMessage被發送到原始消息的JMS Reply-to屬性中定義的目標(如果存在)或MessageListenerAdapter上設置的默認目標(如果已配置)。如果找不到目標,則拋出InvalidDestinationException(請注意,此異常不會被吞並並傳播到調用堆棧中)。

處理事務中的消息

在事務中調用消息偵聽器只需要重新配置偵聽器容器。

可以通過偵聽器容器定義上的sessionTransacted標志激活本地資源事務。 然后,每個消息偵聽器調用都在一個活動的JMS事務中運行,如果偵聽器執行失敗,消息接收將回滾。發送響應消息(通過SessionAwareMessageListener)是同一本地事務的一部分,但任何其他資源操作(如數據庫訪問)都是獨立操作的。這通常需要偵聽器實現中的重復消息檢測,以覆蓋數據庫處理已提交但消息處理未能提交的情況。

 慮下面的bean定義:

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
    <property name="sessionTransacted" value="true"/>
</bean>

要參與外部管理的事務,你需要配置事務管理器並使用支持外部管理事務的偵聽器容器(通常是DefaultMessageListenerContainer)。

要為XA事務參與配置消息偵聽器容器,你需要配置JtaTransactionManager(默認情況下,它委托給javaee服務器的事務子系統)。請注意,底層JMS ConnectionFactory需要具有XA功能,並在JTA事務協調器中正確注冊。(檢查你的Java EE服務器的JNDI資源配置)這使得消息接收和(例如)數據庫訪問成為同一事務的一部分(使用統一的提交語義,以XA事務日志開銷為代價)。

下面的bean定義創建了一個事務管理器:

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然后我們需要將它添加到我們先前的容器配置中。容器負責剩下的。下面的示例演示如何執行此操作:

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
    <property name="transactionManager" ref="transactionManager"/> 
</bean>

四、支持JCA Message Endpoints

從2.5版開始,Spring還提供了對基於JCA的MessageListener容器的支持。JmsMessageEndpointManager嘗試從提供程序的ResourceAdapter類名自動確定ActivationSpec類名。因此,通常可以提供Spring的通用JmsActivationSpecConfig,如下例所示:

<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecConfig">
        <bean class="org.springframework.jms.listener.endpoint.JmsActivationSpecConfig">
            <property name="destinationName" value="myQueue"/>
        </bean>
    </property>
    <property name="messageListener" ref="myMessageListener"/>
</bean>

或者,可以使用給定的ActivationSpec對象設置JmsMessageEndpointManager。ActivationSpec對象也可以來自JNDI查找(使用<jee:jndi-lookup>). 下面的示例演示如何執行此操作:

<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpec">
        <bean class="org.apache.activemq.ra.ActiveMQActivationSpec">
            <property name="destination" value="myQueue"/>
            <property name="destinationType" value="javax.jms.Queue"/>
        </bean>
    </property>
    <property name="messageListener" ref="myMessageListener"/>
</bean>

使用Spring的ResourceAdapterFactoryBean,可以在本地配置目標ResourceAdapter,如下例所示:

<bean id="resourceAdapter" class="org.springframework.jca.support.ResourceAdapterFactoryBean">
    <property name="resourceAdapter">
        <bean class="org.apache.activemq.ra.ActiveMQResourceAdapter">
            <property name="serverUrl" value="tcp://localhost:61616"/>
        </bean>
    </property>
    <property name="workManager">
        <bean class="org.springframework.jca.work.SimpleTaskWorkManager"/>
    </property>
</bean>

指定的WorkManager還可以指向特定於環境的線程池 - - 通常通過SimpleTaskManager實例的asyncTaskExecutor屬性。如果碰巧使用多個適配器,請考慮為所有ResourceAdapter實例定義一個共享線程池。

在某些環境中(如weblogic9或更高版本),你可以從JNDI獲取整個ResourceAdapter對象(通過使用<jee:jndi查找>). 然后,基於Spring的消息偵聽器可以與服務器托管的ResourceAdapter交互,后者也使用服務器的內置WorkManager。

Spring還提供了一個不綁定到JMS的通用JCA消息端點管理器:org.springframework.jca.endpoint.GenericMessageEndpointManager. 此組件允許使用任何消息偵聽器類型(例如CCI MessageListener)和任何特定於提供者的ActivationSpec對象。

 

五、注解驅動的Listener Endpoints

異步接收消息的最簡單方法是使用帶注釋的偵聽器端點基礎設施。簡而言之,它允許你將托管bean的方法公開為JMS偵聽器端點。下面的示例演示如何使用它:

@Component
public class MyService {

    @JmsListener(destination = "myDestination")
    public void processOrder(String data) { ... }
}

上一個示例的思想是,只要javax.jms.DestinationmyDestination,則相應地調用processOrder方法(在本例中,使用JMS消息的內容,類似於MessageListenerAdapter提供的內容)。

帶注釋的端點基礎設施通過使用JmsListenerContainerFactory在幕后為每個帶注釋的方法創建一個消息偵聽器容器。這樣的容器不是針對應用程序上下文注冊的,但是可以通過使用JmsListenerEndpointRegistry bean輕松定位以便於管理。

@JmsListener是java8上的一個可重復注釋,因此你可以通過添加額外的@JmsListener聲明將多個JMS目的地與同一方法關聯起來。

啟用Listener Endpoint注解

要啟用對@JmsListener注釋的支持,可以將@EnableJms添加到@Configuration類中,如下例所示:

@Configuration
@EnableJms
public class AppConfig {

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setDestinationResolver(destinationResolver());
        factory.setSessionTransacted(true);
        factory.setConcurrency("3-10");
        return factory;
    }
}

默認情況下,查找名為jmsListenerContainerFactory的bean作為工廠用於創建消息偵聽器容器的源。在這種情況下(忽略JMS基礎設施設置),你可以調用processOrder方法,核心輪詢大小為三個線程,最大池大小為十個線程。

你可以自定義偵聽器容器工廠以用於每個注釋,也可以通過實現JmsListenerConfigurer接口來配置顯式默認值。僅當至少有一個端點注冊而沒有特定容器工廠時,才需要默認值

如果你喜歡XML配置,可以使用<jms:annotation-driven>元素,如下例所示:

<jms:annotation-driven/>

<bean id="jmsListenerContainerFactory"
        class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationResolver" ref="destinationResolver"/>
    <property name="sessionTransacted" value="true"/>
    <property name="concurrency" value="3-10"/>
</bean>
編程式 Endpoint Registration

JmsListenerEndpoint提供了JMS端點的模型,並負責為該模型配置容器。除了JmsListener注釋檢測到的端點之外,該基礎結構還允許你以編程方式配置端點。下面的示例演示如何執行此操作:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
        endpoint.setId("myJmsEndpoint");
        endpoint.setDestination("anotherQueue");
        endpoint.setMessageListener(message -> {
            // processing
        });
        registrar.registerEndpoint(endpoint);
    }
}

在前面的示例中,我們使用了SimpleJmsListenerEndpoint,它提供了要調用的實際MessageListener。但是,你也可以構建自己的端點變量來描述自定義調用機制。

請注意,你可以完全跳過@JmsListener的使用,只通過JmsListenerConfigurer以編程方式注冊端點。

帶注釋的Endpoint方法簽名

到目前為止,我們已經在端點中注入了一個簡單的字符串,但是它實際上可以有一個非常靈活的方法簽名。在下面的示例中,我們重寫它以使用自定義標頭注入訂單:

@Component
public class MyService {

    @JmsListener(destination = "myDestination")
    public void processOrder(Order order, @Header("order_type") String orderType) {
        ...
    }
}

可以在JMS偵聽器端點中注入的主要元素如下:

  • 原始的javax.jms.Message或其任何子類(前提是它與傳入消息類型匹配)。
  • 這個javax.jms.Session對本機JMS API的可選訪問(例如,用於發送自定義回復)。
  • 這個org.springframework.messaging.Message,表示傳入的JMS消息。請注意,此消息包含自定義和標准標頭(由JmsHeaders定義)。
  • @Header注解的方法參數,用於提取特定的頭值,包括標准的JMS頭。
  • 一個@Headers帶注釋的參數,它必須可以賦值給java.util.Map用於訪問所有頭。
  • 不受支持的類型(消息或會話)之一的非注釋元素被視為有效負載。你可以通過用@Payload注釋參數來實現這一點。還可以通過添加額外的@Valid來打開驗證。

注入Spring消息抽象的能力對於從特定於傳輸的消息中存儲的所有信息而不依賴於傳輸特定的API尤其有用。下面的示例演示如何執行此操作:

@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }

方法參數的處理由DefaultMessageHandlerMethodFactory提供,你可以進一步自定義它以支持其他方法參數。你也可以在那里自定義轉換和驗證支持。

例如,如果我們希望在處理訂單之前確保訂單有效,可以使用@valid對有效負載進行注釋,並配置必要的驗證器,如下例所示:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setValidator(myValidator());
        return factory;
    }
}
響應管理

MessageListenerAdapter中現有的支持已經允許你的方法具有非空返回類型。在這種情況下,調用的結果被封裝在javax.jms.Message,在原始消息的JMSReplyTo頭中指定的目標或偵聽器上配置的默認目標中發送。現在可以通過使用消息傳遞抽象的@SendTo注釋來設置默認目的地。

設processOrder方法現在應該返回OrderStatus,我們可以編寫它來自動發送響應,如下例所示:

@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    // order processing
    return status;
}

如果有幾個@JmsListener注釋的方法,你可以將@SendTo注釋放在類級別,以共享默認的應答目的地。

如果需要以獨立於傳輸的方式設置其他頭,則可以返回一條消息,方法如下:

@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    // order processing
    return MessageBuilder
            .withPayload(status)
            .setHeader("code", 1234)
            .build();
}

如果需要在運行時計算響應目標,可以將響應封裝在JmsResponse實例中,該實例還提供了在運行時使用的目標。我們可以重寫前面的示例,如下所示:

@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
    // order processing
    Message<OrderStatus> response = MessageBuilder
            .withPayload(status)
            .setHeader("code", 1234)
            .build();
    return JmsResponse.forQueue(response, "status");
}

最后,如果需要為響應指定一些QoS值,例如優先級或生存時間,可以相應地配置JmsListenerContainerFactory,如下例所示:

@Configuration
@EnableJms
public class AppConfig {

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        QosSettings replyQosSettings = new QosSettings();
        replyQosSettings.setPriority(2);
        replyQosSettings.setTimeToLive(10000);
        factory.setReplyQosSettings(replyQosSettings);
        return factory;
    }
}

六、JMS Namespace支持

Spring為簡化JMS配置提供了一個XML命名空間。要使用JMS名稱空間元素,需要引用JMS模式,如下例所示:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:jms="http://www.springframework.org/schema/jms" 
        xsi:schemaLocation="
            http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/jms https://www.springframework.org/schema/jms/spring-jms.xsd">

    <!-- bean definitions here -->

</beans>

命名空間由三個頂級元素組成:<annotation-driven/>、<listener-container/>和<jca-listener-container/>。<annotation-driven/>允許使用注釋驅動的偵聽器端點。<listener-container/>和<jca-listener-container/>定義共享偵聽器容器配置,並且可以包含<listener/>子元素。以下示例顯示了兩個偵聽器的基本配置:

<jms:listener-container>

    <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>

    <jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>

</jms:listener-container>

前面的示例相當於創建兩個不同的偵聽器容器bean定義和兩個不同的MessageListenerAdapter bean定義,如使用MessageListenerAdapter中所示。除了上例中顯示的屬性外,listener元素還可以包含幾個可選屬性。下表描述了所有可用屬性:

JMS <listener>元素的屬性

  • id:器容器的bean名稱。如果沒有指定,則會自動生成一個bean名稱。
  • destination:監聽器的目標名稱,必須的,通過DestinationResolver策略解析。
  • ref:處理程序對象的bean名稱。必須的。
  • method:要調用的處理程序方法的名稱。如果ref屬性指向MessageListener或Spring SessionAwareMessageListener,則可以忽略此屬性。
  • response-destination:要向其發送響應消息的默認響應目標的名稱。這適用於不包含JMSReplyTo字段的請求消息。此目標的類型由偵聽器容器的響應目標類型屬性確定。注意,這只適用於具有返回值的偵聽器方法,對於該方法,每個結果對象都轉換為響應消息。
  • subscription:持久訂閱的名稱(如果有)。
  • selector:此偵聽器的可選消息選擇器。
  • concurrency:要為此偵聽器啟動的並發會話或使用者數。該值可以是表示最大值的簡單數字(例如,5),也可以是表示下限和上限的范圍(例如,3-5)。請注意,指定的最小值只是一個提示,可能在運行時被忽略。默認值是容器提供的值。

<listener-container/>元素還接受幾個可選屬性。這允許定制各種策略(例如,taskExecutor和destinationResolver)以及基本的JMS設置和資源引用。通過使用這些屬性,你可以定義高度定制的偵聽器容器,同時仍然可以受益於命名空間的便利性。

通過指定要通過factory id屬性公開的bean的id,可以自動將此類設置公開為JmsListenerContainerFactory,如下例所示:

<jms:listener-container connection-factory="myConnectionFactory"
        task-executor="myTaskExecutor"
        destination-resolver="myDestinationResolver"
        transaction-manager="myTransactionManager"
        concurrency="10">

    <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>

    <jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>

</jms:listener-container>

JMS <listener-container>元素的屬性

  • container-type:此偵聽器容器的類型。可用的選項有default、simple、default102或simple102(默認選項為default)。
  • container-class:作為完全限定類名的自定義偵聽器容器實現類。根據container-type屬性,默認值是Spring的標准DefaultMessageListenerContainer或SimpleMessageListenerContainer。
  • factory-id:公開此元素定義為具有指定id的JmsListenerContainerFactory的設置,以便可以與其他終結點一起重用。
  • connection-factory:對JMS ConnectionFactory bean的引用(缺省bean名稱是ConnectionFactory)。
  • task-executor:對JMS偵聽器調用器的Spring TaskExecutor的引用。
  • destination-resolver:對解析JMS目標實例的DestinationResolver策略的引用。
  • message-converter:對用於將JMS消息轉換為偵聽器方法參數的MessageConverter策略的引用。默認值是SimpleMessageConverter。
  • error-handler:對ErrorHandler策略的引用,用於處理在執行MessageListener期間可能發生的任何未捕獲的異常。
  • destination-type:偵聽器的JMS目標類型:queue、topic、durableTopic、sharedTopic或sharedDurableTopic。這可能會啟用容器的pubSubDomain、subscriptionDurable和subscriptionShared屬性。默認值為queue(這將禁用這三個屬性)。
  • response-destination-type:響應的JMS目標類型:隊列或主題。默認值是destination-type屬性的值。
  • client-id:此偵聽器容器的JMS客戶機標識。使用持久訂閱時必須指定它。
  • cache:JMS資源的緩存級別:none、connection、session、consumer或auto。默認情況下(auto),緩存級別實際上是使用者,除非指定了外部事務管理器 - - ,在這種情況下,有效的默認值將是none(假設Java EE的事務管理,其中給定的ConnectionFactory是一個支持XA的池)。
  • acknowledge:本機JMS確認模式:auto、client、dups-ok或transactied。值transactied激活本地事務處理會話。另一種方法是,您可以指定事務管理器屬性,如表后面所述。默認為自動。
  • transaction-manager:對外部平台TransactionManager(通常是基於XA的事務協調器,如Spring的JtaTransactionManager)的引用。如果未指定,則使用本機確認。
  • concurrency:每個偵聽器啟動的並發會話或使用者數。它可以是表示最大值的簡單數字(例如,5),也可以是表示下限和上限的范圍(例如,3-5)。請注意,指定的最小值只是一個提示,可能在運行時被忽略。默認值為1。對於主題偵聽器或隊列排序很重要,您應該將並發性限制為1。考慮將其提高到一般隊列。
  • prefetch:要加載到單個會話中的最大消息數。請注意,增加這個數字可能會導致並發用戶的飢餓。
  • receive-timeout:用於接收調用的超時(毫秒)。默認值為1000(1秒)。-1表示沒有超時。
  • back-off:指定用於計算恢復嘗試之間的間隔的BackOff實例。如果BackOffExecution實現返回BackOffExecution#STOP,則偵聽器容器不會進一步嘗試恢復。設置此屬性時,將忽略恢復間隔值。默認值是間隔為5000毫秒(即5秒)的FixedBackOff。
  • recovery-interval:指定恢復嘗試之間的間隔(毫秒)。它提供了一種方便的方法來創建具有指定間隔的FixedBackOff。有關更多恢復選項,請考慮指定一個BackOff實例。默認值為5000毫秒(即5秒)。
  • phase:此容器應在其中啟動和停止的生命周期階段。值越低,此容器開始的時間越早,停止的時間越晚。默認值是Integer.MAX_VALUE,這意味着容器開始得越晚,停止得越快越好。

配置具有jms模式支持的基於JCA的偵聽器容器非常相似,如下例所示:

<jms:jca-listener-container resource-adapter="myResourceAdapter"
        destination-resolver="myDestinationResolver"
        transaction-manager="myTransactionManager"
        concurrency="10">

    <jms:listener destination="queue.orders" ref="myMessageListener"/>

</jms:jca-listener-container>

<jca-listener-container/>元素的屬性

  • factory-id:
  • resource-adapter:
  • activation-spec-factory:
  • destination-resolver
  • message-converter
  • destination-type
  • response-destination-type
  • client-id
  • acknowledge
  • transaction-manager
  • concurrency
  • prefetch

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM