1.SpringJMS核心接口介紹
1.JmsTemplate
JmsTemplate: 是Spring自身提供,只需向Spring容器內注冊這個類即可,就可以使用
JmsTemplate類對象方便的操作JMS,下面介紹他常用的方法。
注意:
JmsTemplate類是線程安全的,可以在整個應用范圍使用。但並不是說整個引用只能使用一個JmsTemplate實例,可以根據需要注入多個JmsTemplate實例。
// send - 發送一個消息,使用消息創建接口MessageCreator
publicvoid send(MessageCreator messageCreator)
publicvoid send(finalDestination destination,finalMessageCreator messageCreator)
publicvoid send(finalString destinationName,finalMessageCreator messageCreator)
// sendAndReceive - 發送並接收消息
publicMessage sendAndReceive(MessageCreator messageCreator)
publicMessage sendAndReceive(finalDestination destination,finalMessageCreator messageCreator)
publicMessage sendAndReceive(finalString destinationName,finalMessageCreator messageCreator)
// convertAndSend - 使用MessageConverter接口轉換消息,先將對象轉換成消息再發送消息。與之對應的是receiveAndConvert方法
publicvoid convertAndSend(Object message)throwsJmsException
publicvoid convertAndSend(Destination destination,finalObject message)
publicvoid convertAndSend(String destinationName,finalObject message)
publicvoid convertAndSend(Object message,MessagePostProcessor postProcessor)
publicvoid convertAndSend(Destination destination,finalObject message,finalMessagePostProcessor postProcessor)
publicvoid convertAndSend(String destinationName,finalObject message,finalMessagePostProcessor postProcessor)
// receive - 接收消息
publicMessage receive()
publicMessage receive(Destination destination)
publicMessage receive(String destinationName)
// receiveSelected - 接收消息,並過濾消息
publicMessage receiveSelected(String messageSelector)
publicMessage receiveSelected(finalDestination destination,finalString messageSelector)
publicMessage receiveSelected(finalString destinationName,finalString messageSelector)
// receiveAndConvert - 接收消息,並使用MessageConverter接口轉換消息,把一個消息轉換成一個對象
publicObject receiveAndConvert()
publicObject receiveAndConvert(Destination destination)
publicObject receiveAndConvert(String destinationName)
// receiveSelectedAndConvert - 接收消息,並使用消息過濾和消息轉換
publicObject receiveSelectedAndConvert(String messageSelector)
publicObject receiveSelectedAndConvert(Destination destination,String messageSelector)
publicObject receiveSelectedAndConvert(String destinationName,String messageSelector)
// browse - 瀏覽消息
public<T> T browse(BrowserCallback<T> action)
public<T> T browse(Queue queue,BrowserCallback<T> action)
public<T> T browse(String queueName,BrowserCallback<T> action)
// browseSelected - 瀏覽消息,並使用過濾
public<T> T browseSelected(String messageSelector,BrowserCallback<T> action)
public<T> T browseSelected(finalQueue queue,finalString messageSelector,finalBrowserCallback<T> action)
public<T> T browseSelected(finalString queueName,finalString messageSelector,finalBrowserCallback<T> action)
// execute - 執行SessionCallback、ProducerCallback、BrowserCallback回調接口,並得到回調接口返回值
public<T> T execute(SessionCallback<T> action)
public<T> T execute(SessionCallback<T> action,boolean startConnection)
public<T> T execute(ProducerCallback<T> action)
public<T> T execute(finalDestination destination,finalProducerCallback<T> action)
public<T> T execute(finalDestination destination,finalProducerCallback<T> action)
2.連接工廠(連接池)
JmsTemplate需要引用一個ConnectionFactory,
JmsTemlate每次發送消息時都會重新創建連接,創建connection,session,創建productor。這是一個非常耗性能的地方,特別是大數據量的情況下。所以出現了
PooledConnectionFactory,
PooledConnectionFactory是
ActiveMQ中的類,它實現了ConnectionFactory。
而
SpringJMS的實現有
SingleConnectionFactory、
CachingConnectionFactory。
- PooledConnectionFactory:會緩存connection,session和productor,不會緩存consumer。因此只適合於生產者發送消息。
- SingleConnectionFactory:對於建立JMS服務器鏈接的請求會一直返回同一個Connection,並且會忽略Connection的close方法調用(包括調用createConnection()得到的Connection)
- CachingConnectionFactory:繼承了SingleConnectionFactory,所以它擁有SingleConnectionFactory的所有功能,同時它還新增了緩存功能,它可以緩存Session、MessageProducer和MessageConsumer。
SpringJMS提供了CachingConnectionFactory,這才是首選的方案。然而CachingConnectionFactory有一個問題必須指出,默認情況下,CachingConnectionFactory只緩存一個session,在它的JavaDoc中,它聲明對於低並發情況下這是足夠的。與之相反,PooledConnectionFactory的默認值是500。這些設置,在很多情況下,需要親自去測試並驗證。我將其設置為100,對我來說還是很不錯。
3.接口介紹
以下是SpringJMS的常用接口或類:
- MessageCreator -- 消息創建接口,發送消息時需要使用此接口創建消息
- SessionCallback -- 使用JMS Session時的回調接口
- ProducerCallback -- 使用JMS Session和MessageProducer時的回調接口
- BrowserCallback -- 使用JMS Session和QueueBrowser時的回調接口
- MessageListener -- 消息監聽器接口,注解@JmsListener與其功能相似
- ListenerContainer -- 消息偵聽器容器接口,實現有SimpleMessageListenerContainer、DefaultMessageListenerContainer
- MessageConverter -- 消息轉換接口,用於JMS消息到Java對象之間的轉換
2.消息監聽器
在Spring整合JMS的應用中我們在定義消息監聽器的時候一共可以定義三種類型的消息監聽器,分別是MessageListener、SessionAwareMessageListener和MessageListenerAdapter。下面就分別來介紹一下這幾種類型的區別。
1.MessageListener
MessageListener是最原始的消息監聽器,它是JMS規范中定義的一個接口。其中定義了一個用於處理接收到的消息的onMessage方法,該方法只接收一個Message參數。我們前面在講配置消費者的時候用的消息監聽器就是MessageListener,代碼如下:
publicclassQueueMessageListenerimplementsMessageListener
{
@Override
publicvoid onMessage(Message message)
{
try
{
if(message instanceofTextMessage)
{
TextMessage tm =(TextMessage) message;
System.out.println("監聽消息:"+ tm.getText());
}
else
{
System.out.println("消息類型:"+ message.getClass());
}
}
catch(JMSException e)
{
e.printStackTrace();
}
}
}
在上面代碼中我們只是簡單的打印了一些消息相關的信息。
2.SessionAwareMessageListener
SessionAwareMessageListener是Spring為我們提供的,它不是標准的JMS MessageListener。MessageListener的設計只是純粹用來接收消息的,假如我們在使用MessageListener處理接收到的消息時我們需要發送一個消息通知對方我們已經收到這個消息了,那么這個時候我們就需要在代碼里面去重新獲取一個Connection或Session。SessionAwareMessageListener的設計就是為了方便我們在接收到消息后發送一個回復的消息,它同樣為我們提供了一個處理接收到的消息的onMessage方法,但是這個方法可以同時接收兩個參數,一個是表示當前接收到的消息Message,另一個就是可以用來發送消息的Session對象。先來看一段代碼:
publicclassQueueSessionAwareMessageListenerimplementsSessionAwareMessageListener<TextMessage>
{
/** 回復消息的目的地 */
privateDestination destination;
@Override
publicvoid onMessage(TextMessage message,Session session)throwsJMSException
{
System.out.println("監聽消息內容:"+ message.getText());
MessageProducer messageProducer = session.createProducer(destination);
TextMessage replyMessage = session.createTextMessage("已收到消息:"+ message.getJMSMessageID());
messageProducer.send(replyMessage);
}
publicDestination getDestination()
{
return destination;
}
publicvoid setDestination(Destination destination)
{
this.destination = destination;
}
}
在上面代碼中我們定義了一個SessionAwareMessageListener,在這個Listener中我們在接收到了一個消息之后,利用對應的Session創建了一個到destination的生產者和對應的消息,然后利用創建好的生產者發送對應的消息。
3.MessageListenerAdapter
MessageListenerAdapter類實現了MessageListener接口和SessionAwareMessageListener接口,它的主要作用是將接收到的消息進行類型轉換,然后通過反射的形式把它交給一個普通的Java類進行處理。
MessageListenerAdapter會把接收到的消息做如下轉換:
- TextMessage轉換為String對象
- BytesMessage轉換為byte數組
- MapMessage轉換為Map對象
- ObjectMessage轉換為對應的Serializable對象
既然前面說了MessageListenerAdapter會把接收到的消息做一個類型轉換,然后利用反射把它交給真正的目標處理器——一個普通的Java類進行處理,如果真正的目標處理器是一個MessageListener或者是一個SessionAwareMessageListener,那么Spring將直接使用接收到的Message對象作為參數調用它們的onMessage方法,而不會再利用反射去進行調用。那么我們在定義一個MessageListenerAdapter的時候就需要為它指定這樣一個目標類。這個目標類我們可以通過MessageListenerAdapter的構造方法參數指定,也可以通過屬性
delegate
指定,如:
<!--消息監聽適配器-->
<bean id="messageListenerAdapter"class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<!--通過構造函數指定消息處理器-->
<constructor-arg>
<bean id="readerMessage"class="com.test.ReaderMessage"/>
</constructor-arg>
<!--指定消息處理器類處理消息的方法-->
<property name="defaultListenerMethod" value="receiveMessage"/>
<!--通過delegate屬性指定消息處理器-->
<property name="delegate">
<bean id="readerMessage"class="com.test.ReaderMessage"/>
</property>
</bean>
前面說了如果我們指定的這個目標處理器是一個MessageListener或者是一個SessionAwareMessageListener的時候Spring將直接利用接收到的Message對象作為方法參數調用它們的onMessage方法。但是如果指定的目標處理器是一個普通的Java類時Spring將利用Message進行了類型轉換之后的對象作為參數通過反射去調用真正的目標處理器的處理方法,那么Spring是如何知道該調用哪個方法呢?這是通過MessageListenerAdapter的defaultListenerMethod屬性來決定的,當我們沒有指定該屬性時,Spring會默認調用目標處理器的handleMessage方法。
若使用以下
MessageListenerByAdapter
類處理消息,可以配置
MessageListenerAdapter
的
delegate
屬性來設置處理消息的方法。
publicclassMessageListenerByAdapter
{
publicvoid handleMessage(String message)
{
System.out.println("handleMessage方法處理消息,消息內容是:"+ message);
}
publicvoid receiveMessage(String message)
{
System.out.println("receiveMessage方法處理消息,消息內容是:"+ message);
}
}
MessageListenerAdapter除了會自動的把一個普通Java類當做MessageListener來處理接收到的消息之外,
其另外一個主要的功能是可以自動的發送返回消息。
當我們設置的消息處理方法的返回值不為空的時候,SpringJMS會自動將它封裝為一個JMS Message,然后自動進行回復。那么這個時候這個回復消息將發送到哪里呢?這主要有兩種方式可以指定。
- 第一,可以通過發送的Message的setJMSReplyTo方法指定該消息對應的回復消息的目的地。這需要生產者發送消息之前調用setJMSReplyTo方法。
- 第二,通過MessageListenerAdapter的defaultResponseDestination屬性來指定。
注意:
當兩種方式都指定了消息的回復目的地的時候使用發送消息的setJMSReplyTo方法指定的目的地將具有較高的優先級。
4.配置消息監聽器
配置以上三種消息監聽器,需要使用消息偵聽容器(MessageListenerContainer
),
在配置一個MessageListenerContainer的時候有三個屬性必須指定,一個是表示從哪里監聽的ConnectionFactory;一個是表示監聽什么的Destination;一個是接收到消息以后進行消息處理的MessageListener。示例如下:
- <bean id="jmsContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!--設置連接工廠-->
<property name="connectionFactory" ref="connectionFactory"/>
<!--設置監聽地址-->
<property name="destination" ref="queueDestination"/>
<!--設置消息監聽處理器,可以是以上三種-->
<property name="messageListener" ref="queueMessageListener"/>
</bean>
Spring提供了三種AbstractMessageListenerContainer的子類,每種各有其特點。
- SimpleMessageListenerContainer:這個消息偵聽容器是三種中最簡單的。它在啟動時創建一個會話session和消費者Consumer,並且會使用標准的JMS MessageConsumer.setMessageListener()方法注冊監聽器讓JMS提供者調用監聽器的回調函數。它不會動態的適應運行時需要和參與外部的事務管理。兼容性方面,它非常接近於獨立的JMS規范,但一般不兼容Java EE的JMS限制。
- DefaultMessageListenerContainer:這個消息偵聽器使用的最多。跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer會動態的適應運行時需要,並且能夠參與外部的事務管理。它很好的平衡了對JMS提供者要求低、先進功能如事務參與和兼容Java EE環境。
- ServerSessionMessageListenerContainer:這個監聽器容器利用JMS ServerSessionPool SPI動態管理JMS Session。使用者各種消息監聽器可以獲得運行時動態調優功能,但是這也要求JMS提供者支持ServerSessionPool SPI。如果不需要運行時性能調整,請使用 DefaultMessageListenerContainer 或 SimpleMessageListenerContainer。
3.ActiveMQ與SpringJMS整合示例
本節提供了一個SpringJMS與ActiveMQ
整合的示例
,本示例可以作為參考。
1.配置spring-context-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beansxmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd"
default-lazy-init="false">
<!-- 配置JMS連接工廠 -->
<beanid="connectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory">
<propertyname="brokerURL"value="failover:(tcp://localhost:61616)"/>
</bean>
<!-- ActiveMQ連接池配置,ActiveMQ實現 -->
<!--
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory" ref="connectionFactory" />
</bean>
-->
<!-- ActiveMQ連接池配置,SpingJMS實現 -->
<beanid="cachingConnectionFactory"class="org.springframework.jms.connection.CachingConnectionFactory">
<propertyname="targetConnectionFactory"ref="connectionFactory"/>
<!-- Session緩存數量,這里屬性也可以直接在這里配置 -->
<propertyname="sessionCacheSize"value="100"/>
</bean>
<!-- 消息隊列01 -->
<beanid="queueDestination01"class="org.apache.activemq.command.ActiveMQQueue">
<!-- 設置消息隊列的名字 -->
<constructor-arg>
<value>spring-jms-queue01</value>
</constructor-arg>
</bean>
<!-- 消息隊列02 -->
<beanid="queueDestination02"class="org.apache.activemq.command.ActiveMQQueue">
<!-- 設置消息隊列的名字 -->
<constructor-arg>
<value>spring-jms-queue02</value>
</constructor-arg>
</bean>
<!-- 配置JMS模板,Spring提供的JMS工具類,它發送、接收消息。 -->
<beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
<propertyname="connectionFactory"ref="cachingConnectionFactory"/>
<propertyname="defaultDestination"ref="queueDestination01"/>
<propertyname="receiveTimeout"value="10000"/>
</bean>
<!-- 消息監聽容器(Queue),配置連接工廠,監聽的隊列是spring-jms-queue02,監聽器是上面定義的監聽器 -->
<beanid="jmsContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<propertyname="connectionFactory"ref="cachingConnectionFactory"/>
<propertyname="destination"ref="queueDestination02"/>
<propertyname="messageListener">
<beanid="queueMessageListener"class="spring.QueueMessageListener"/>
</property>
</bean>
</beans>
注意:在配置文件中使用了
SpringJMS的命名空間,就可以使用配置:
<jms:listener-container>、
<jms:jca-listener-container>
2.代碼實現
配置文件中的消息監聽器
spring.QueueMessageListener
代碼如下:
publicclassQueueMessageListenerimplementsMessageListener
{
@Override
publicvoid onMessage(Message message)
{
try
{
if(message instanceofTextMessage)
{
TextMessage tm =(TextMessage) message;
System.out.println("監聽消息:"+ tm.getText());
}
else
{
System.out.println("消息類型:"+ message.getClass());
}
}
catch(JMSException e)
{
e.printStackTrace();
}
}
}
實現消息創建接口:
/** 自定義消息創建器 */
publicclassMyMessageCreatorimplementsMessageCreator
{
privateString messageStr;
publicMyMessageCreator(String messageStr)
{
this.messageStr = messageStr;
}
@Override
publicMessage createMessage(Session session)throwsJMSException
{
return session.createTextMessage(messageStr);
}
}
根據配置編寫測試代碼:
publicstaticvoid main(String[] args)throwsException
{
ClassPathXmlApplicationContext applicationContext =newClassPathXmlApplicationContext("spring-context-jms.xml");
// 得到消息隊列
Destination queueDestination01 = applicationContext.getBean("queueDestination01",Destination.class);
Destination queueDestination02 = applicationContext.getBean("queueDestination02",Destination.class);
// Spring JMS操作模版
JmsTemplate jmsTemplate = applicationContext.getBean("jmsTemplate",JmsTemplate.class);
// 發送消息
jmsTemplate.send(queueDestination01,newMyMessageCreator("測試消息001"));
jmsTemplate.send(queueDestination01,newMyMessageCreator("測試消息002"));
jmsTemplate.send(queueDestination02,newMyMessageCreator("測試消息003"));
jmsTemplate.send(queueDestination02,newMyMessageCreator("測試消息004"));
//接收消息
TextMessage textMessage =(TextMessage) jmsTemplate.receive(queueDestination01);
System.out.println(textMessage.getText());
textMessage =(TextMessage) jmsTemplate.receive(queueDestination01);
System.out.println(textMessage.getText());
applicationContext.close();
System.out.println("OK!");
}
本示例在配置文件中配置了兩個隊列:
queueDestination01
、
queueDestination02
,並使用監聽器
queueMessageListener
監聽隊列
queueDestination02
。在測試代碼中分別向兩個隊列發送消息,然后接收隊列
queueDestination01
的消息,而隊列
queueDestination02
的消息會被監聽器
queueMessageListener
接收處理。
3.運行結果
監聽消息:測試消息003
監聽消息:測試消息004
測試消息001
測試消息002
OK!
4.消息轉換MessageConverter
MessageConverter的作用主要有兩方面,一方面它可以把我們的非標准化Message對象轉換成我們的目標Message對象,這主要是用在發送消息的時候;另一方面它又可以把我們的Message對象轉換成對應的目標對象,這主要是用在接收消息的時候。
例如:我們需要把一個用戶信息
User
對象當JMS消息一樣發送和接收,而不需要每次發送與接收之前都需要寫轉換代碼(序列化),此時我們就可以使用MessageConverter
接口。如用戶
User
類定義如下:
publicclassUserimplementsSerializable
{
privatestaticfinallong serialVersionUID =1L;
privateString name;
privateint age;
privateboolean sex;
......
}
1.實現轉換接口MessageConverter
MessageConverter接口的實現如下,這里我們簡單的使用Json序列化和反序列化實現:
publicclassUserMessageConverterimplementsMessageConverter
{
privateObjectMapper mapper =newObjectMapper();
@Override
publicMessage toMessage(Object object,Session session)throwsJMSException,MessageConversionException
{
String json =null;
try
{
json = mapper.writeValueAsString(object);
}
catch(JsonProcessingException e)
{
e.printStackTrace();
}
System.out.println("toMessage : "+ json);
return session.createTextMessage(json);
}
@Override
publicObject fromMessage(Message message)throwsJMSException,MessageConversionException
{
String json =((TextMessage) message).getText();
Object object =null;
try
{
object = mapper.readValue(json,User.class);
}
catch(Exception e)
{
e.printStackTrace();
}
System.out.println("fromMessage : "+ object);
return object;
}
}
Json的序列化與反序列化使用了
jackson庫。其實Spring也實現了幾種
MessageConverter,如下:
- org.springframework.jms.support.converter.SimpleMessageConverter
- org.springframework.jms.support.converter.MessagingMessageConverter
- org.springframework.jms.support.converter.MarshallingMessageConverter
- org.springframework.jms.support.converter.MappingJackson2MessageConverter
2.配置
MessageConverter
<!-- 配置JMS模板,Spring提供的JMS工具類,它發送、接收消息。 -->
<beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
<propertyname="connectionFactory"ref="cachingConnectionFactory"/>
<propertyname="defaultDestination"ref="queueDestination01"/>
<propertyname="receiveTimeout"value="10000"/>
<propertyname="messageConverter">
<beanid="userMessageConverter"class="spring.UserMessageConverter"/>
</property>
</bean>
配置文件仍然基於之前的整合示例!
3.測試示例
基於之前的整合示例編寫如下測試代碼:
User user=newUser("危常煥",20,true);
// 發送消息
jmsTemplate.convertAndSend(user);
//接收消息
Object object = jmsTemplate.receiveAndConvert();
System.out.println(object);
注意:需要調用
convertAndSend
和
receiveAndConvert
方法發送和接收消息,運行結果如下:
toMessage :{"name":"危常煥","age":20,"sex":true}
fromMessage :User[name=危常煥, age=20, sex=true]
User[name=危常煥, age=20, sex=true]
OK!
5.事務管理JmsTransactionManager
-------------------------------------------------------------------------------------------------------------------------------