最后部分的XML懶得寫了,因為個人更傾向於JavaConfig形式。
為知筆記版本見這里,帶格式~
做了一個小demo,放到碼雲上了,有興趣的點我。
說明:需要先了解下JMS的基礎知識。
1、介紹
Spring 提供了一個JMS集成框架,簡化了JMS API的使用,類似於Spring提供的JDBC API集成。
JMS可以粗略地划分成兩大功能區域,就是消息的生產(production)和消費(consumption)。JmsTemplate 用於消息的生產和同步的消息接收。對於異步消息接收,類似於Java EE的消息驅動,Spring提供了大量了message listener container,可以用於創建Message-Driven POJOs (MDPs)。Spring還提供了一種聲明式創建消息監聽器的方式。
- package org.springframework.jms.core 提供了使用JMS的核心功能。它包含了JMS template類,通過處理資源的創建和釋放 -- 這類似於JdbcTemplate為JDBC所做的。common設計准則,就是提供幫助方法 來執行common操作,同時為更復雜的使用 將處理任務代理到用戶實現的回調接口。該JMS template遵循了同樣的設計。該類提供了不同的便捷方法,用於發送消息、同步消費消息、以及將JMS session和消息的producer暴露給用戶。
- package org.springframework.jms.support 提供了JMSException翻譯功能。(Spring的一貫行為 -- by larry)
- package org.springframework.jms.support.converter 提供了一個MessageConverter abstraction,用於在Java objects和JMS messages之間轉換。
- package org.springframework.jms.support.destination 提供了不同的策略來管理JMS destinations,例如,為存儲在JNDI中的destinations提供了一個service locator。
- package org.springframework.jms.annotation 提供了必要的infrastructure 以支持注解驅動的監聽器端點 -- @JmsListener。
- package org.springframework.jms.config 為jms namespace提供了解析器實現;提供了java config支持以配置監聽器容器和創建監聽器端點。
- package org.springframework.jms.connection 提供了在獨立應用中使用的ConnectionFactory實現。它同樣包含了一個Spring PlatformTransactionManager 用於JMS的實現 -- JmsTransactionManager。這允許將JMS作為一個事務性的資源無縫地集成到Spring的事務管理機制中。
2、使用Spring JMS
2.1、JmsTemplate
JmsTemplate是JMS core package的中心類。它簡化了JMS的使用,因為它負責處理資源的創建和釋放 -- 當發送、同步接收消息時。
使用JmsTemplate的代碼,只需要實現回調接口。MessageCreator回調接口負責創建一個消息,使用的是JmsTemplate提供的Session。為了允許更復雜的JMS API使用,回調SessionCallback 提供了JMS session,而ProducerCallback則暴露了Session、MessageProducer pair。
JMS API 暴露了兩種類型的send方法,一種使用投遞模式、優先級、TTL作為Quality of Service (QOS)參數,一種不攜帶QOS參數 -- 使用默認值。由於JmsTemplate中有很多send方法,QOS參數的設置已經被暴露為bean properties,以避免讓send方法的數量翻倍。類似地,用於同步接收調用的timeout值也是通過setReceiveTimeout設置的。
一些JMS providers允許通過ConnectionFactory的配置來設置QOS的默認值。這會有影響,當調用 MessageProducer’s send method `send(Destination destination, Message message) 時,會使用不同的QOS默認值,而非在JMS spec中指定的值。為了提供一致的QOS管理,JmsTemplate 必須特別地啟用它自己的QOS值 -- 通過將 isExplicitQosEnabled設為 true。
為了方便,JmsTemplate還暴露了一個基礎的 請求-應答 操作,允許發送一個消息,然后等待一個響應 -- 在該操作創建的一個臨時的queue上。
注意:一旦配置完成,JmsTemplate的實例都是線程安全的。這非常重要,因為這意味着你可以配置一個唯一的JmsTemplate實例,然后將其安全地注入到多個bean中。要清楚,JmsTemplate是帶狀態的,它維持了一個對ConnectionFactory的引用,但該狀態不是一個對話狀態(雙向狀態)。
自Spring Framework 4.1起,JmsMessagingTemplate 構建於JmsTemplate之上,提供了消息抽象的集成,例如 org.springframework.messaging.Message。 這允許你以泛泛的方式創建消息來發送。
2.2、連接
JmsTemplate 要求引用一個ConnectionFactory。ConnectionFactory 是JMS spec的一部分,作為JMS的入口。客戶端應用會使用它來創建連接 -- 其實現由JMS provider提供,並封裝不同的配置參數,許多參數都是供應商特定的,例如SSL配置選項。
當在EJB中使用JMS時,供應商提供了JMS接口的實現,因此,它們可以參與聲明式事務管理,可以執行連接和session的池化。為了使用這種實現,Java EE containers 通常要求用戶聲明一個JMS連接工廠作為EJB或servlet部署描述符中的 resource-ref。為了確保EJB中JmsTemplate對這些功能的使用,客戶端應用應該確保它引用了被管理的ConnectionFactory實現。
緩存的消息資源
標准API 涉及到了很多中間件對象的創建。想要發送一條消息,會執行下面的API步驟:
ConnectionFactory->Connection->Session->MessageProducer->send
這ConnectionFactory和Send操作之間,創建和銷毀了三個中間件對象。我們提供了兩個ConnectionFactory的實現,以優化資源的使用和釋放。
SingleConnectionFactory
該實現,會在所有的createConnection()調用時返回相同的Connection,並忽略close()的調用。這在測試環境和獨立運行環境中很有用,因為多次JmsTemplate的調用都會使用同一個連接,方便事務管理。
該實現,需要引用標准的ConnectionFactory(通常來自JNDI)。
CachingConnectionFactory
該實現繼承了SingleConnectionFactory,並添加了Session、MessageProducer、MessageConsumer的緩存。默認的緩存數量是1,可以使用property sessionCacheSize修改。
注意:實際緩存的sessions數量會多於那個值,這是因為session的緩存是基於它們的應答模式(acknowledgment mode),所以,最多可能四倍的數量。
MessageProducer和MessageConsumer會緩存在各自的session中。
MessageProducer是基於它們的destination而緩存的。MessageConsumer是基於一個key而緩存的,該key由destination、selector、noLocal delivery flag、以及durable subscription name(如果創建了durable consumers的話)組成。
2.3、Destination管理
類似於ConnectionFactories,Destinations也是JMS管理的對象,也可以被存儲,可以通過JNDI獲取。
在配置一個Spring的application context的時候,你可以使用JNDI工廠類 JndiObjectFactoryBean / <jee:jndi-lookup> 來執行依賴注入,注入到對JMS destinations引用的對象中。然而,經常,這種策略是冗長的 -- 如果有大量的destinations 或者有不同於JMS provider的高級destination管理特性。這種高級destination管理的例子,可能是動態destinations的創建 或者 對於destinations層級命名空間的支持。
JmsTemplate使用DestinationResolver接口的實現來代理了destination名字到JMS destination對象的解析。
JmsTemplate默認使用的實現是DynamicDestinationResolver,負責解析動態的destinations。
還提供了JndiDestinationResolver,作為包含在JNDI中的destinations的一個service locator,並能可選地會滾到DynamicDestinationResolver的行為。
很常見的一種情況是,JMS應用中的destinations僅在運行時才知道,因此,無法在應用部署時創建。這很常見是因為 在交互式系統組件之間有一種共享的應用邏輯,就是在運行時按照一種眾所周知的命名慣例來創建destinations。即使動態destinations的創建不是JMS spec的一部分,大多數提供商仍然提供了這種功能。動態destinations是使用用戶定義的名字創建的,這使得它們與臨時的destinations不同,且經常不在JNDI中注冊。創建動態destinations的API隨着提供商的不同而不同,因為關鍵到destination的properties通常都是提供商特有的。However, a simple implementation choice that is sometimes made by vendors is to disregard the warnings in the JMS specification and to use the TopicSession method createTopic(String topicName) or the QueueSession method createQueue(String queueName) to create a new destination with default destination properties. Depending on the vendor implementation, DynamicDestinationResolver may then also create a physical destination instead of only resolving one.
JmsTemplate的boolean property pubSubDomain 被用於配置使用什么樣的JMS domain。其默認值是false,表明是一個p2p domain,會使用Queues。JmsTemplate使用該property來決定動態destination解析的行為 -- 通過DestinationResolver接口的實現。
你還可以為JmsTemplate配置一個默認的destination -- 通過property defaultDestination。當不指定destination時,就會使用這個默認的destination來發送和接收。
2.4、Message Listener Containers 消息監聽器容器
在EJB世界中 JMS消息的最常見用法的一種是,驅動 消息驅動的beans(drive message-driven beans MDBs)。Spring提供了一種解決方案來創建message-driven POJOs (MDPs),不需要將用戶綁定到EJB容器。(見4.2、異步接收 - MDPs)。自Spring Framework 4.1起,可以使用@JmsListener來注解端點方法,詳見6、注解驅動的監聽器端點。
消息監聽器容器是被用於接收來自JMS消息隊列的消息,並驅動注入其中的MessageListener。該監聽器容器負責所有的消息接收線程,並分發到監聽器處理。消息監聽器容器,是MDP和消息提供者之間的中間件,負責注冊接收消息、參與事務、資源獲取和釋放、異常轉換等等。這可以讓你作為一個應用開發者專注業務邏輯。
Spring提供了兩種標准JMS消息監聽器容器,每種都有其獨特的特色集。
SimpleMessageListenerContainer
該消息監聽器容器是兩種標准flavors的簡化版。它會在啟動時創建固定數量的JMS sessions和consumers,使用標准的JMS MessageConsumer.setMessageListener()方法注冊監聽器,並由JMS提供者來執行監聽器回調。該變體不允許動態適配運行時需求,不允許參與外部被管理的事務。
在兼容性方面,非常類似於獨立的JMS spec,但一般不兼容Java EE的JMS限制。
注意:雖然不允許參與外部被管理的事務,但支持native JMS事務:將sessionTransacted設為true即可,或者在namespace中將acknowledge設為transacted,此時,你的監聽器拋出的異常都會導致回滾,消息會重新投遞。
或者,考慮使用'CLIENT_ACKNOWLEDGE' mode,也會在異常時重新投遞,但不使用transacted Sessions,因此,沒有包含任何其他Session操作(例如發出響應消息) -- 以事務協議。
DefaultMessageListenerContainer
該消息監聽器容器可以用於大多數場景。不同於SimpleMessageListenerContainer,該容器變體允許動態適配運行時需求,且參與外部被管理的事務。
接收到的每個消息,都會被注冊一個XA事務 -- 當配置了JtaTransactionManager時;因此,其處理可能利用XA事務語義。
該消息監聽器容器試圖在降低對JMS provider的要求、高級功能(如參與外部的被管理的事務)和Java EE環境的兼容性之間做出一個平衡。
該容器的緩存級別可以自定義。注意,當不啟用緩存時,每次消息接收都會創建新的connection和session。當與高負載下的非持久化訂閱結合時,可能會導致消息的丟失。務必使用一個恰當的緩存級別。
當broker掛掉時,該容器也具備恢復能力。默認,一個簡單的BackOff實現會每隔5秒重試一次。也可以指定一個自定義的BackOff實現,例子可以參考ExponentialBackOff。
注意:和SimpleMessageListenerContainer類似,DefaultMessageListenerContainer也支持native JMS事務,還允許自定義應答模式。(略)
2.5、事務管理
Spring提供了JmsTransactionManager,負責管理單一JMS ConnectionFactory的事務。它允許JMS應用利用Spring的事務管理特性。
JmsTransactionManager執行本地資源事務,從指定的ConnectionFactory到線程 綁定一個JMS Connection/Session pair。
JmsTemplate自動偵測該事務性的資源,並操作它們。
在一個JavaEE環境中,ConnectionFactory會池化Connections和Sessions,因此,資源會在事務之間被復用。在獨立環境中,使用Spring的SingleConnectionFactory 會導致一個共享的JMS Connection,每個事務都有其獨立的Session。或者,考慮使用provider-specific pooling adapter,如ActiveMQ的PooledConnectionFactory。
JmsTemplate也可以使用JtaTransactionManager 和 XA-capable JMS ConnectionFactory 來執行分布式事務。注意,這要求使用JTA事務管理器 以及 合適的XA-configured ConnectionFactory!(詳見Java EE server / JMS provider 的文檔。)
當使用JMS API來從一個Connection創建一個Session時,在被管理和未被管理的事務性環境中 復用代碼 可能混淆。 這是因為JMS API只有一個工廠方法來創建Session,它要求設置事務和應答模式。在一個被管理的環境中,這些值的設置是環境的事務性設施來負責的,因此 這些值會被供應商的JMS Connection封裝所忽略。當在未被管理的環境中使用JmsTemplate時,你可以通過使用properties sessionTransacted和sessionAcknowledgeMod來指定這些值。當配合PlatformTransactionManager時,JmsTemplate會總是使用一個事務性的JMS Session。
3、發送消息
JmsTemplate 包含了很多便利方法 來發送消息。有需要使用Destination對象來指定destination的方法,也有使用String來指定destination的方法。那些沒有指定destination的方法,會使用默認的destination。
import javax.jms.ConnectionFactory;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Queue;import javax.jms.Session;import org.springframework.jms.core.MessageCreator;import org.springframework.jms.core.JmsTemplate;public class JmsQueueSender {private JmsTemplate jmsTemplate;private Queue queue;public void setConnectionFactory(ConnectionFactory cf) {this.jmsTemplate = new JmsTemplate(cf);}public void setQueue(Queue queue) {this.queue = queue;}public void simpleSend() {this.jmsTemplate.send(this.queue, new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage("hello queue world");}});}}
該例子使用 MessageCreator 回調來創建一個text message -- 使用提供的Session對象。JmsTemplate的構造需要傳入一個ConnectionFactory的引用。或者,無參構造,然后設置connectionFactory。或者,考慮從Spring的JmsGatewaySupport 便捷基類中中獲取,會提供一個預先構建好JMS配置的bean properties。
方法 send(String destinationName, MessageCreator creator) 可以讓你發送一條消息。其destination是字符串形式的,如果是在JNDI中注冊的, 那你應該將template的destinationResolver property設為JndiDestinationResolver。
如果你創建了JmsTemplate,並指定了默認的destination,方法send(MessageCreator c) 會直接發送到那個destination。
3.1、使用消息轉換器 Message Converters
為了能夠發送domain model objects,JmsTemplate有不同的發送方法 接受一個Java對象作為參數 -- 作為消息的內容。
重載方法convertAndSend()和receiveAndConvert() 將這種轉換處理代理到MessageConverter接口的實例。該接口定義了一個簡單的約定來轉換Java對象和JMS消息。
默認的實現是SimpleMessageConverter,支持String和TextMessage、byte[]和BytesMessage、java.util.Map和MapMessage之間的轉換。
使用該轉換器,你和你的代碼可以專注於業務對象。
sandbox 目前包含一個MapMessageConverter,它使用反射來進行JavaBean和MapMessage之間的轉換。其他流行的實現包括JAXB、Castor、XMLBeans、XStream,來創建一個TextMessage。
為了容納消息的properties、headers以及body的設置 (不能通用地封裝進一個轉換器類中),MessagePostProcessor接口允許在轉換之后、發送之前訪問該消息。下面的例子演示了如何修改一個消息的header和一個property -- 在java.util.Map被轉成消息之后。
public void sendWithConversion() {Map map = new HashMap();map.put("Name", "Mark");map.put("Age", new Integer(47));jmsTemplate.convertAndSend("testQueue", map, new MessagePostProcessor() {public Message postProcessMessage(Message message) throws JMSException {message.setIntProperty("AccountID", 1234);message.setJMSCorrelationID("123-00001");return message;}});}
其結果是這樣的:
MapMessage={Header={... standard headers ...CorrelationID={123-00001}}Properties={AccountID={Integer:1234}}Fields={Name={String:Mark}Age={Integer:47}}}
3.2、SessionCallback 和 ProducerCallback
有一些情況下,你可能想要在一個JMS Session或MessageProducer上執行多個操作。SessionCallback 和 ProducerCallback 暴露了JMS Session 和 Session / MessageProducer pair。JmsTemplate的execute() 方法會執行這些回調。
4、接收消息
4.1、同步接收
JMS通常關聯到異步處理,但也可以進行同步處理。
重載方法receive(..) 提供這種功能。在一個同步接收中,調用的線程會阻塞住,直到獲取了消息。這是一種很危險的操作,因為調用的線程可能永久卡住。property receiveTimeout 可以指定接收器等待多久,而不必一直卡住。
4.2、異步接收 - Message-Driven POJOs
Spring 還提供了注解式監聽器端點 -- 使用@JmsListener。目前為止,這是最便捷的方式來設置一個異步接收器,詳見6.2、啟用監聽器端點注解。
類似於EJB世界里的Message-Driven Bean,Message-Driven POJO扮演了JMS消息的接收器。MDP的一個限制是,它必須實現javax.jms.MessageListener接口 (看一下下面MessageListenerAdapter的討論)。請注意,POJO在多線程上接收消息時,必須確保你的實現是線程安全的。
下面是MDP的一個簡單實現:
import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;public class ExampleListener implements MessageListener {public void onMessage(Message message) {if (message instanceof TextMessage) {try {System.out.println(((TextMessage) message).getText());}catch (JMSException ex) {throw new RuntimeException(ex);}}else {throw new IllegalArgumentException("Message must be of type TextMessage");}}}
當你實現了你的MessageListener,就可以創建一個消息監聽器容器了。
下面的例子演示了如何定義和配置一個消息監聽器容器(Spring提供的,該例子用的是DefaultMessageListenerContainer)。
<!-- this is the Message Driven POJO (MDP) --><bean id="messageListener" class="jmsexample.ExampleListener" /><!-- and this is the message listener container --><bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destination" ref="destination"/><property name="messageListener" ref="messageListener" /></bean>
詳情請查閱不同的消息監聽器容器的javadocs。
4.3、SessionAwareMessageListener接口
該接口是Spring專有的接口,提供了類似於JMS MessageListener接口的約定,還提供了消息處理方法 -- 通過訪問接收Message的JMS Session。
package org.springframework.jms.listener;public interface SessionAwareMessageListener {void onMessage(Message message, Session session) throws JMSException;}
你可以選擇讓你的MDPs實現該接口(也可以偏好JMS MessageListener) -- 如果你想你的MDPs能夠響應任何接收到的消息(使用這里提供的Session)。
請注意:這里的onMessage(..)方法拋出了JMSException。而標准的JMS MessageListener則需要用戶自己處理異常。
4.4、MessageListenerAdapter
該類是Spring的異步消息支持的最終組件:簡言之,它允許你將幾乎任意類暴露為MDP(當然,有一些限制)。
看看下面的接口定義。注意,雖然該接口既沒有繼承MessageListener,也沒有繼承SessionAwareMessageListener,它仍然可以被用作一個MDP -- 通過使用MessageListenerAdapter類。
public interface MessageDelegate {void handleMessage(String message);void handleMessage(Map message);void handleMessage(byte[] message);void handleMessage(Serializable message);}public class DefaultMessageDelegate implements MessageDelegate {// implementation elided for clarity...}
特別的,注意上面的實現,沒有任何JMS依賴!通過下面的配置,我們可以將其制作成一個MDP!!!
<!-- this is the Message Driven POJO (MDP) --><bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"><constructor-arg><bean class="jmsexample.DefaultMessageDelegate"/></constructor-arg></bean><!-- and this is the message listener container... --><bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destination" ref="destination"/><property name="messageListener" ref="messageListener" /></bean>
下面是另一個MDP例子,只能處理JMS TextMessage。注意,實際調用的消息處理方法是 receive (而MessageListenerAdapter的默認方法名字是handleMessage),但可以配置(下面有說)。注意,receive(..)方法僅能響應JMS TextMessage消息。
public interface TextMessageDelegate {void receive(TextMessage message);}public class DefaultTextMessageDelegate implements TextMessageDelegate {// implementation elided for clarity...}
相應的MessageListenerAdapter的配置如下:
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"><constructor-arg><bean class="jmsexample.DefaultTextMessageDelegate"/></constructor-arg><property name="defaultListenerMethod" value="receive"/><!-- we don't want automatic message context extraction --><property name="messageConverter"><null/></property></bean>
注意,上面的messageListener 如果接收了一個JMS類型,而非TextMessage類型,會拋出IllegalStateException。該異常會被消化!!
MessageListenerAdapter的另一個能力就是能夠自動發回一個響應Message -- 如果一個處理器方法返回了non-void值。看看下面的接口和類:
public interface ResponsiveTextMessageDelegate {// notice the return type...String receive(TextMessage message);}public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {// implementation elided for clarity...}
如果上面的 DefaultResponsiveTextMessageDelegate 配合 MessageListenerAdapter 使用,然后receive(..)返回的所有的non-null值 都會(默認配置)被轉成TextMessage。該TextMessage會被發送至 在源Message中定義的JMS Reply-To property Destination(如果存在),或者發送至 MessageListenerAdapter中設置的默認Destination;如果沒有找到Destination,那么會拋出InvalidDestinationException (請注意,該異常不會被消化,而是會傳播到上一層調用棧)。
4.5、在事務中處理消息
在一個事務中調用消息監聽器,只需要重新配置下監聽器容器。
通過設置監聽器容器的sessionTransacted,可以很輕松地激活本地資源事務。每個消息監聽器調用都會在一個活動的JMS事務中操作,監聽器執行失敗時 會回滾消息異常。
通過SessionAwareMessageListener發送一個響應消息,也是同一個本地事務的一部分,但任何其他資源操作(如數據庫訪問)則會獨立地操作。這一般要求在監聽器實現中進行重復的消息偵測,需要涵蓋 數據庫處理已提交 但消息處理提交失敗的情況。
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destination" ref="destination"/><property name="messageListener" ref="messageListener"/><property name="sessionTransacted" value="true"/></bean>
為了參與進外部被管理的事務中,你需要配置一個事務管理器,並使用支持外部被管理的事務的監聽器容器,通常是 DefaultMessageListenerContainer。
為了配置一個消息監聽器容器 使用XA 事務參與,需要配置一個JtaTransactionManager (默認,其會代理到Java EE server的事務子系統中)。注意,底層的JMS ConnectionFactory需要是XA-capable的,並正確地注冊到你的JTA事務坐標系中。(詳見你的Java EE server的JNDI資源配置)。這允許消息接收 以及數據庫訪問 成為同一個事務的一部分(統一的提交語義,耗費的是XA事務日志overhead)。
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后,你只需要將其添加到我們之前的容器配置中即可。該容器會負責剩下的部分:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destination" ref="destination"/><property name="messageListener" ref="messageListener"/><property name="transactionManager" ref="transactionManager"/></bean>
5、對JCA 消息端點的支持 (囧,又一個未知領域)
自版本2.5開始,Spring還提供了對於基於JCA的MessageListener容器的支持。
>>>>>>>>>>>>>>>>>>略<<<<<<<<<<<<<<<<<<<
6、注解驅動的監聽器端點
異步接收消息 的最簡單的方式 是使用注解式監聽器端點。簡言之,它允許你將一個被管理的bean的方法暴露成一個JMS監聽器端點。
@Componentpublic class MyService {@JmsListener(destination = "myDestination")public void processOrder(String data) { ... }}
上面的例子就是,只要在 javax.jms.Destination "destination"上有可用的消息時,就會調用該processOrder 方法(這種情況下,JMS消息的內容類似於MessageListenerAdapter提供的)。
該注解式端點設施,在幕后 為每一個被注解的方法 創建了一個消息監聽器容器 -- 通過使用JmsListenerContainerFactory。該容器沒有注冊在application context中,但是,通過使用JmsListenerEndpointRegistry bean,可以容易地定位該容器 以便管理。
注意:在Java 8中,@JmsListener 是一個可重復的注解,就是說,可以關聯多個JMS destinations到同一個方法中。而在Java 6和7中,你可以使用@JmsListeners 注解。
6.1、啟用 監聽器端點注解
為了啟用注解@JmsListener,你需要在你的一個@Configuration類上添加注解@EnableJms。
@Configuration@EnableJmspublic class AppConfig {@Beanpublic DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory());factory.setDestinationResolver(destinationResolver());factory.setConcurrency("3-10");return factory;}}
默認,該infrastructure 會查找名字為jmsListenerContainerFactory 的bean,將其作為factory的源 來創建消息監聽器容器。 在上面的例子中,忽略掉JMS infrastructure 的設置,processOrder方法會調用3個初始線程、最多10個池線程。
也可以自定義監聽器容器工廠,按照每個注解來使用,或者配置一個顯式的默認--通過實現JmsListenerConfigurer 接口。如果至少一個端點沒有被配置特定的容器工廠,那就會需要該默認。
詳見javadoc和下面的例子。
如果你傾向於使用XML配置,請使用<jms:annotation-driven> 元素。
<jms:annotation-driven/><bean id="jmsListenerContainerFactory"class="org.springframework.jms.config.DefaultJmsListenerContainerFactory"><property name="connectionFactory" ref="connectionFactory"/><property name="destinationResolver" ref="destinationResolver"/><property name="concurrency" value="3-10"/></bean>
6.2、編碼式端點注冊
JmsListenerEndpoint 提供一個JMS端點的model,並負責為該model配置容器。除了通過注解JmsListener配置之外,infrastructure 還允許你編碼式配置端點。
@Configuration@EnableJmspublic class AppConfig implements JmsListenerConfigurer {@Overridepublic void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();endpoint.setId("myJmsEndpoint");endpoint.setDestination("anotherQueue");endpoint.setMessageListener(message -> {// processing});registrar.registerEndpoint(endpoint);}}
在上面的例子中,我們使用了SimpleJmsListenerEndpoint,它負責提供實際的MessageListener,你也可以構建你自己的端點變體。
注意,你還可以不使用@JmsListener,完全使用JmsListenerConfigurer 編碼式注冊你的端點。
6.3、注解式端點方法簽名
到目前為止,我們已經在我們的端點中注入了一個簡單的String,但它實際上還可以有更靈活的方法簽名。讓我們來重寫一下,注入Order和自定義header:
@Componentpublic class MyService {@JmsListener(destination = "myDestination")public void processOrder(Order order, @Header("order_type") String orderType) {...}}
你可以注入到JMS監聽器端點的主要元素包括:
原生的javax.jms.Message 或其任意子類(當然需要匹配來信類型)。
javax.jms.Session 以可選的訪問native JMS API,如發生一個自定義的應答。
org.springframework.messaging.Message 代表了incoming JMS消息。注意,該消息持有自定義的和標准的headers(如JmsHeaders所定義的)。
@Header 注解的參數,必須能夠賦給 java.util.Map以訪問所有headers。
一個無注解的元素,且不是被支持的類型(如Message 和 Session),被認為是payload(內容?)。你可以使用注解@Payload來顯式地聲明。你還可以打開校驗,通過再添加一個@Valid。
注入Spring的Message abstraction的能力特別重要 -- 可以利用所有存儲在特定傳輸中的消息的所有信息,而不需要使用特定傳輸的API來應答。
@JmsListener(destination = "myDestination")public void processOrder(Message<Order> order) { ... }
方法簽名的處理,是由DefaultMessageHandlerMethodFactory 提供的,DefaultMessageHandlerMethodFactory還可以進一步定制以支持更多的方法參數。還可以同時定制轉換和校驗。
例如,如果我們想確認我們的Order 是有效的(在處理它之前),我們可以使用@Valid 注解,並配置必要的校驗器,如下:
@Configuration@EnableJmspublic class AppConfig implements JmsListenerConfigurer {@Overridepublic void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());}@Beanpublic DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();factory.setValidator(myValidator());return factory;}}
6.4、響應管理
在MessageListenerAdapter中已有的支持 允許你的方法擁有一個non-void返回類型。這種情況下,該返回值會被封裝進一個javax.jms.Message 中,發送到源message的JMSReplyTo header中指定的destination,或者發送到監聽器配置的默認destination。該默認destination 現在也可以在消息抽象上使用@SendTo 注解來設置。
假定我們的方法processOrder 應該返回一個OrderStatus,我們可以用下面的方式讓其自動發送一個響應:
@JmsListener(destination = "myDestination")@SendTo("status")public OrderStatus processOrder(Order order) {// order processingreturn status;}
注意:如果你有幾個@JmsListener 注解的方法,你還可以將@SendTo 放在類級別上,以共用一個默認的應答destination。
如果你需要獨立於傳輸之外設置額外的headers,那你應該返回Message,如下:
@JmsListener(destination = "myDestination")@SendTo("status")public Message<OrderStatus> processOrder(Order order) {// order processingreturn MessageBuilder.withPayload(status).setHeader("code", 1234).build();}
如果你需要在運行時計算響應destination,你可以把你的響應封裝進JmsResponse,它提供了運行時可用的destination。前面的例子可以重寫如下:
@JmsListener(destination = "myDestination")public JmsResponse<Message<OrderStatus>> processOrder(Order order) {// order processingMessage<OrderStatus> response = MessageBuilder.withPayload(status).setHeader("code", 1234).build();return JmsResponse.forQueue(response, "status");}
7、JMS namespace支持
Spring提供了XML namespace以簡化JMS的配置。想要使用該JMS namespace elements,你需要引用JMS schema:
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:jms="http://www.springframework.org/schema/jms"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"><!-- bean definitions here --></beans>
該namespace由三個頂級的elements構成:<annotation-driven/>、 <listener-container/> 和<jca-listener-container/>。第一個,啟用了注解驅動的監聽器端點。后兩個定義了共享的監聽器容器配置,可能包含<listener/> 子元素。下面是一個簡單的例子:
<jms:listener-container><jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/><jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/></jms:listener-container>
上面的例子,等效於創建兩個不同的監聽器容器bean定義 和 兩個不同的MessageListenerAdapter bean定義 -- 如4.4章節所述。除了以上attributes,listener元素還可以包含幾個可選項:
Table 30.1. Attributes of the JMS <listener> element
>>>>>>>>>>>>>>>>后面的部分略,因為個人更喜歡JavaConfig形式<<<<<<<<<<<<<<<<<
官方文檔地址:
http://docs.spring.io/spring/docs/current/spring-framework-reference/html/jms.html
