本文主要講述ActiveMQ與spring整合的方案。介紹知識點包括spring,jms,activemq基於配置文件模式管理消息,消息監聽器類型,消息轉換類介紹,spring對JMS事物管理。
1.spring整合activemq配置文件說明
1.1配置ConnectionFactory
ConnectionFactory是用於產生到JMS服務器的鏈接的,Spring提供了多個ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory對於建立JMS服務器鏈接的請求會一直返回同一個鏈接,並且會忽略Connection的close方法調用。CachingConnectionFactory繼承了SingleConnectionFactory,所以它擁有SingleConnectionFactory的所有功能,同時它還新增了緩存功能,它可以緩存Session、MessageProducer和MessageConsumer。這里使用SingleConnectionFactory來作為示例。
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"/>
Spring提供的ConnectionFactory只是Spring用於管理ConnectionFactory的,真正產生到JMS服務器鏈接的ConnectionFactory還得是由JMS服務廠商提供,並且需要把它注入到Spring提供的ConnectionFactory中。這里使用的是ActiveMQ實現的JMS,所以在這里真正的可以產生Connection的就應該是由ActiveMQ提供的ConnectionFactory。 ActiveMQ提供了一個PooledConnectionFactory,通過往里面注入一個ActiveMQConnectionFactory可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少資源消耗。當使用PooledConnectionFactory時,在定義一個ConnectionFactory時應該是如下定義:
<!-- ActiveMQ 連接工廠 --> <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> <!-- 如果連接網絡:tcp://ip:61616;未連接網絡:tcp://localhost:61616 以及用戶名,密碼--> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://192.168.3.3:61616" userName="admin" password="admin" /> <!-- Spring Caching連接工廠 --> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- 同上,同理 --> <!-- <constructor-arg ref="amqConnectionFactory" /> --> <!-- Session緩存數量 --> <property name="sessionCacheSize" value="100" /> </bean>
1.2配置生產者
配置好ConnectionFactory之后就需要配置生產者。生產者負責產生消息並發送到JMS服務器,這通常對應的是一個業務邏輯服務實現類。但是服務實現類是怎么進行消息的發送的呢?這通常是利用Spring提供的JmsTemplate類來實現的,所以配置生產者其實最核心的就是配置進行消息發送的JmsTemplate。對於消息發送者而言,它在發送消息的時候要知道自己該往哪里發,為此,在定義JmsTemplate的時候需要往里面注入一個Spring提供的ConnectionFactory對象。
<!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory"/> </bean>
在真正利用JmsTemplate進行消息發送的時候,需要知道消息發送的目的地,即destination。在Jms中有一個用來表示目的地的Destination接口,它里面沒有任何方法定義,只是用來做一個標識而已。當在使用JmsTemplate進行消息發送時沒有指定destination的時候將使用默認的Destination。默認Destination可以通過在定義jmsTemplate bean對象時通過屬性defaultDestination或defaultDestinationName來進行注入,defaultDestinationName對應的就是一個普通字符串。在ActiveMQ中實現了兩種類型的Destination,一個是點對點的ActiveMQQueue,另一個就是支持訂閱/發布模式的ActiveMQTopic。在定義這兩種類型的Destination時我們都可以通過一個name屬性來進行構造,如:
<!--這個是隊列目的地,點對點的--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的--> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic"/> </bean>
1.3配置消費者
生產者往指定目的地Destination發送消息后,接下來就是消費者對指定目的地的消息進行消費了。那么消費者是如何知道有生產者發送消息到指定目的地Destination了呢?這是通過Spring封裝的消息監聽容器MessageListenerContainer實現的,它負責接收信息,並把接收到的信息分發給真正的MessageListener進行處理。每個消費者對應每個目的地都需要有對應的MessageListenerContainer。對於消息監聽容器而言,除了要知道監聽哪個目的地之外,還需要知道到哪里去監聽,也就是說它還需要知道去監聽哪個JMS服務器,這是通過在配置MessageConnectionFactory的時候往里面注入一個ConnectionFactory來實現的。所以在配置一個MessageListenerContainer的時候有三個屬性必須指定:
一個是表示從哪里監聽的ConnectionFactory;一個是表示監聽什么的Destination;一個是接收到消息以后進行消息處理的MessageListener。
Spring一共提供了多種類型MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer。SimpleMessageListenerContainer會在一開始的時候就創建一個會話session和消費者Consumer,並且會使用標准的JMS MessageConsumer.setMessageListener()方法注冊監聽器讓JMS提供者調用監聽器的回調函數。它不會動態的適應運行時需要和參與外部的事務管理。兼容性方面,它非常接近於獨立的JMS規范,但一般不兼容Java EE的JMS限制。大多數情況下使用的DefaultMessageListenerContainer,跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer會動態的適應運行時需要,並且能夠參與外部的事務管理。它很好的平衡了對JMS提供者要求低、先進功能如事務參與和兼容Java EE環境。
<!--這個是隊列目的地--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>queue</value> </constructor-arg> </bean> <!-- 消息監聽器 --> <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/> <!-- 消息監聽容器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> </bean>
1.4定義處理消息的MessageListener
要定義處理消息的MessageListener只需要實現JMS規范中的MessageListener接口就可以了。MessageListener接口中只有一個方法onMessage方法,當接收到消息的時候會自動調用該方法。
至此生成者和消費者都配置完成了,這也就意味着spring整合ActiveMQ已經完成了。整個ActiveMQ.xml文件配置如下:

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"> <context:component-scan base-package="com.tiantian" /> <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!--這個是隊列目的地--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>queue</value> </constructor-arg> </bean> <!-- 消息監聽器 --> <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/> <!-- 消息監聽容器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> </bean> </beans>
1.5實例分析
編寫一個sessionAwareQueue目的隊列,向改隊列發送消息,接受消息成功后,並回復一條消息。監控消息函數為:ConsumerSessionAwareMessageListener。
xml配置文件
<!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory"/> <!-- 消息轉換器 --> <!-- <property name="messageConverter" ref="emailMessageConverter"/> --> </bean> <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.210.128:61616"/> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!--這個是隊列目的地--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>queue</value> </constructor-arg> </bean> <!--這個是sessionAwareQueue目的地--> <bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>sessionAwareQueue</value> </constructor-arg> </bean> <!-- 消息監聽器 --> <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"> <property name="messageConverter" ref="emailMessageConverter"/> </bean> <!-- 可以獲取session的MessageListener --> <bean id="consumerSessionAwareMessageListener" class="com.tiantian.springintejms.listener.ConsumerSessionAwareMessageListener"> <property name="destination" ref="queueDestination"/> </bean> <!-- 消息監聽容器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> </bean> <bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="sessionAwareQueue" /> <property name="messageListener" ref="consumerSessionAwareMessageListener" /> </bean> </beans>
消息生產者定義發送消息方法
@Component public class ProducerServiceImpl implements ProducerService { @Autowired private JmsTemplate jmsTemplate; private Destination responseDestination; public void sendMessage(Destination destination, final String message) { System.out.println("---------------生產者發送消息-----------------"); System.out.println("---------------生產者發了一個消息:" + message); jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } public void sendMessage(final Destination destination, final Serializable obj) { jmsTemplate.convertAndSend(destination, obj); } }
消費者定義接收消息方法
public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener<TextMessage> { private Destination destination; public void onMessage(TextMessage message, Session session) throws JMSException { System.out.println("收到一條消息"); System.out.println("消息內容是:" + message.getText()); MessageProducer producer = session.createProducer(destination); System.out.println("發送一條回復消息"); Message textMessage = session.createTextMessage("回復消息內容:ConsumerSessionAwareMessageListener。。。"); producer.send(textMessage); } public Destination getDestination() { return destination; } public void setDestination(Destination destination) { this.destination = destination; } }
測試發送消息
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("/applicationContext.xml") public class ProducerConsumerTest { @Autowired private ProducerService producerService; @Autowired @Qualifier("queueDestination") private Destination destination; @Autowired @Qualifier("sessionAwareQueue") private Destination sessionAwareQueue; @Test public void testSessionAwareMessageListener() { producerService.sendMessage(sessionAwareQueue, "測試SessionAwareMessageListener"); } }
示例運行截圖:
2.消息監聽器MessageListener介紹
2.1消息監聽器MessageListener
MessageListener是最原始的消息監聽器,它是JMS規范中定義的一個接口。其中定義了一個用於處理接收到的消息的onMessage方法,該方法只接收一個Message參數。示例代碼如下:public class ConsumerMessageListener implements MessageListener { public void onMessage(Message message) { //這里我們知道生產者發送的就是一個純文本消息,所以這里可以直接進行強制轉換,或者直接把onMessage方法的參數改成Message的子類TextMessage TextMessage textMsg = (TextMessage) message; System.out.println("接收到一個純文本消息。"); try { System.out.println("消息內容是:" + textMsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
2.2消息監聽器SessionAwareMessageListener
SessionAwareMessageListener是Spring為提供的,它不是標准的JMS MessageListener。MessageListener的設計只是純粹用來接收消息的,假如在使用MessageListener處理接收到的消息時我們需要發送一個消息通知對方我們已經收到這個消息了,這個時候就需要在代碼里面去重新獲取一個Connection或Session。SessionAwareMessageListener的設計就是為了方便在接收到消息后發送一個回復的消息,它同樣提供了一個處理接收到的消息的onMessage方法,但是這個方法可以同時接收兩個參數,一個是表示當前接收到的消息Message,另一個就是可以用來發送消息的Session對象。2.3MessageListenerAdapter
MessageListenerAdapter類實現了MessageListener接口和SessionAwareMessageListener接口,它的主要作用是將接收到的消息進行類型轉換,然后通過反射的形式把它交給一個普通的Java類進行處理。MessageListenerAdapter會把接收到的消息做如下轉換:
- TextMessage轉換為String對象;
- BytesMessage轉換為byte數組;
- MapMessage轉換為Map對象;
- ObjectMessage轉換為對應的Serializable對象。
在xml配置文件中可定義普通的java處理類,樣例代碼如下:
<!-- 消息監聽適配器 --> <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg> <bean class="com.tiantian.springintejms.listener.ConsumerListener"/> </constructor-arg> </bean> <!-- 消息監聽適配器 方案2 --> <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <property name="delegate"> <bean class="com.tiantian.springintejms.listener.ConsumerListener"/> </property> <property name="defaultListenerMethod" value="receiveMessage"/> </bean>
3.消息轉換器MessageConverter介紹
MessageConverter的作用主要有兩方面,一方面它可以把非標准化Message對象轉換成目標Message對象,這主要是用在發送消息的時候;另一方面它又可以把的Message對象轉換成對應的目標對象,這主要是用在接收消息的時候。MessageConverter可用spring提供的簡單模型或者自己編寫轉換定義類。Spring在初始化JmsTemplate的時候指定了其對應的MessageConverter為一個SimpleMessageConverter,所以如果平常沒有什么特殊要求的時候可以直接使用JmsTemplate的convertAndSend系列方法進行消息發送,而不必繁瑣的在調用send方法時自己new一個MessageCreator進行相應Message的創建。可在源碼中查看SimpleMessageConverter的定義,如果覺得它不能滿足業務的要求,那可以對它里面的部分方法進行重寫,或者是完全實現自定義的MessageConverter。
示例代碼
//方案1:調用對象轉換函數發送對象 TestMqBean bean = new TestMqBean(); bean.setAge(13); for (int i = 0; i < 10; i++) { bean.setName("send to data -" + i); producer.send(session.createObjectMessage(bean)); } //方案2:調用JMSconvertAndSend方法發送對象 public void sendMessage(final Destination destination, final Serializable obj) { jmsTemplate.convertAndSend(destination, obj); } //接收消息時,調用message.getObject()獲取發送的消息對象 public void receiveMessage(ObjectMessage message) throws JMSException { System.out.println(message.getObject()); }
4.spring對JMS的事務管理
Spring提供了一個JmsTransactionManager用於對JMS ConnectionFactory做事務管理。這將允許JMS應用利用Spring的事務管理特性。JmsTransactionManager在執行本地資源事務管理時將從指定的ConnectionFactory綁定一個ConnectionFactory/Session這樣的配對到線程中。JmsTemplate會自動檢測這樣的事務資源,並對它們進行相應操作。在Java EE環境中,ConnectionFactory會池化Connection和Session,這樣這些資源將會在整個事務中被有效地重復利用。在一個獨立的環境中,使用Spring的SingleConnectionFactory時所有的事務將公用一個Connection,但是每個事務將保留自己獨立的Session。JmsTemplate可以利用JtaTransactionManager和能夠進行分布式的 JMS ConnectionFactory處理分布式事務。 在Spring整合JMS的應用中,如果要進行本地的事務管理的話只需要在定義對應的消息監聽容器時指定其sessionTransacted屬性為true,如:<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> <property name="sessionTransacted" value="true"/> </bean>
該屬性值默認為false,這樣JMS在進行消息監聽的時候就會進行事務控制,當在接收消息時監聽器執行失敗時JMS就會對接收到的消息進行回滾,對於SessionAwareMessageListener在接收到消息后發送一個返回消息時也處於同一事務下,但是對於其他操作如數據庫訪問等將不屬於該事務控制。 如果想接收消息和數據庫訪問處於同一事務中,可配置一個外部的事務管理同時配置一個支持外部事務管理的消息監聽容器(如DefaultMessageListenerContainer)。要配置這樣一個參與分布式事務管理的消息監聽容器,可以配置一個JtaTransactionManager,當然底層的JMS ConnectionFactory需要能夠支持分布式事務管理,並正確地注冊JtaTransactionManager。這樣消息監聽器進行消息接收和對應的數據庫訪問就會處於同一數據庫控制下,當消息接收失敗或數據庫訪問失敗都會進行事務回滾操作。配置示例文件如下:
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="dataSource"/> </bean> <jee:jndi-lookup jndi-name="jdbc/mysql" id="dataSource"/> <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> <tx:annotation-driven transaction-manager="jtaTransactionManager"/> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> <property name="transactionManager" ref="jtaTransactionManager"/> </bean> <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>