概述
如何使用spring-jms來簡化jms客戶端的開發?
這篇文章主要記錄如何配置以便以后復用,而非原理的講解,有些內容我 沒有掌握原理。
producer端
producer端負責發送,這里使用JmsTemplate。
spring配置
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans 5 http://www.springframework.org/schema/beans/spring-beans-4.2.xsd"> 6 7 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 8 <property name="brokerURL" value="tcp://localhost:61616" /> 9 </bean> 10 11 <!-- create template for send message --> 12 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 13 <!-- bind the connection factory --> 14 <property name="connectionFactory" ref="connectionFactory" /> 15 <property name="defaultDestinationName" value="jms-config" /> 16 </bean> 17 </beans>
JmsTemplate默認將jms-config解析為Queue類型的Destination。如果需要將其解析為Topic類型,需要為jmsTemplate指定屬性
pubSubDomain=true,配置如下:
1 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 2 <!-- bind the connection factory --> 3 <property name="connectionFactory" ref="connectionFactory" /> 4 <property name="pubSubDomain" value="true" /> 5 <property name="defaultDestinationName" value="jms-config" /> 6 </bean>
測試類
1 package cn.sinobest.asj.producer.springsupport.jt; 2 import javax.annotation.Resource; 3 4 import org.junit.Test; 5 import org.junit.runner.RunWith; 6 import org.springframework.jms.core.JmsTemplate; 7 import org.springframework.test.context.ContextConfiguration; 8 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 9 10 @RunWith(SpringJUnit4ClassRunner.class) // 配置spring組件運行時 11 @ContextConfiguration("/spring-jms-demo.xml") // 配置文件 12 public class JmsTemplateSendWithContextTest { 13 @Resource(name = "jmsTemplate") 14 private JmsTemplate jt; 15 16 @Test 17 public void testSendToDefaultDestination() { 18 String message = "you can config JmsTemplate in xml, then use it for send."; 19 jt.convertAndSend(message); 20 } 21 }
展示這個測試類,是想告訴大家使用Spring+JUnit4的注解編寫單元測試,可以非常方便的加載Spring配置並初始化bean資源;使用Resource注解可以獲取bean資源。如何使用JmsTemplate來發送消息,反而不是重點,因為在08. Spring-JmsTemplate之發送中,已經詳細介紹了相關的API。
consumer端
同步接收
這里使用JmsTemplate進行同步接收。上面已經給過了使用JmsTemplate發送的配置,
接收和發送的配置能有什么區別呢?
如果我們不希望客戶端一直阻塞等待消息,那么需要關心receiveTimeout屬性,單位毫秒。如果超過了這個時間,還沒有接收到消息,就返回null。
1 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 2 <!-- bind the connection factory --> 3 <property name="connectionFactory" ref="connectionFactory" /> 4 <property name="defaultDestinationName" value="jms-config" /> 5 <property name="receiveTimeout" value="3000" /> 6 </bean>
異步接收
異步接收是基於監聽器的接收,傳統的配置方式是配置一個ListenerContainer的bean,在這個bean里維護listener。還有一種精簡的配置方案,可以在一個container中放置多個listener。
傳統的配置
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans 5 http://www.springframework.org/schema/beans/spring-beans-4.2.xsd"> 6 7 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 8 <property name="brokerURL" value="tcp://localhost:61616" /> 9 </bean> 10 11 <bean id="messageListener" class="cn.sinobest.asj.consumer.springsupport.async.SimpleMessageListener" /> 12 <bean id="messageContainer" 13 class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 14 <property name="connectionFactory" ref="connectionFactory" /> 15 <property name="destinationName" value="jms-config" /> 16 <property name="messageListener" ref="messageListener" /> 17 </bean> 18 </beans>
其中,SimpleMessageListener是接口javax.jms.MessageListener的實現類。
DefaultMessageListenerContainer負責將messageListener注冊到connectionFactory的destination,一旦destination中有消息,就會將消息推送給
messageListener。
DefaultMessageListenerContainer有很多特性的配置,下面擇要介紹:
1.Destination及類型
使用下面的API,可以設置Destination
|
2.多線程
一個DMLC
的實例,只能管理一個
MessageListener實例,但是可以使用下面的方法設置多線程:
|
如果沒有使用事務,多線程可以顯著提高監聽器的接收速度。
3.確認模式
下面的API用來設置確認模式:
|
4.事務
下面的API,用來設置事務:
|
更多DefaultMessageListenerContainer的相關配置,參考其API。
精簡的配置
稱之為精簡版的配置,因為一個容器可以配置多個監聽器。
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans 5 http://www.springframework.org/schema/beans/spring-beans-4.2.xsd"> 6 7 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 8 <property name="brokerURL" value="tcp://localhost:61616" /> 9 </bean> 10 11 <bean id="messageListener" class="cn.sinobest.asj.consumer.springsupport.async.SimpleMessageListener" /> 12 <bean id="messageContainer" 13 class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 14 <property name="connectionFactory" ref="connectionFactory" /> 15 <property name="destinationName" value="jms-config" /> 16 <property name="messageListener" ref="messageListener" /> 17 </bean> 18 </beans>
listener-container作為容器,可以有多個listener子元素,每個
listener代表一個監聽器。
1.Destination類型
listener-container有
destination-type屬性,可以取值["queue", "topic"],默認為"queue",它決定了
listener將destination解析為Queue還是Topic類型。
2.pojo監聽器
listener會對消息進行轉換,所以ref的目標bean是一個pojo,method是這個pojo的方法的名字——這個方法的參數要和Message中的數據類型兼容。下面給出我們使用的
PojoListener:
1 package cn.sinobest.asj.consumer.springsupport.async; 2 3 /** 4 * 一個pojo作為listener,接收經過轉換后的消息. 5 * @author lijinlong 6 * 7 */ 8 public class PojoListener { 9 public void passMeMessage(String message) { 10 System.out.println("從queue收到消息:" + message); 11 } 12 }
3.多線程
為listener-container元素設置concurrency屬性,可以指定
線程數的下限和上限。這一點和
DefaultMessageListenerContainer的相同
。
4.確認模式和事務4
listener-container的acknowledge屬性,可以指定確認模式或者事務,取值范圍["auto", "client", "dups-ok", "transacted"];默認為"auto",事務使用"
transacted
"。
更多精簡配置相關的參數,參考
spring-jms-4.2.xsd。
可信任的包
ObjectMessage的使用機制是不安全的,ActiveMQ自5.12.2和5.13.0之后,強制consumer端聲明一份可信任的包列表,只有當ObjectMessage中的Object在可信任包內,才能被提取出來。
你可以這樣配置可信任包:
1 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 2 <property name="brokerURL" value="tcp://localhost:61616" /> 3 <property name="trustedPackages"> 4 <list> 5 <!-- you must add the package or parent-package of object which is put in ObjectMessage --> 6 <value>java.lang</value> 7 <value>java.sql</value> 8 <value>cn.sinobest</value> 9 </list> 10 </property> 11 </bean>
關於ObjectMessage安全性的說明,參考
http://activemq.apache.org/objectmessage.html
ConnectionFactory的bean配置
我在閱讀網友的博文的過程中,見過幾種配置的方式,但我並不了解它們之間的優劣區別。
1.ActiveMQConnectionFactory
在前面的配置中,我們已經接觸了這個配置:
1 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 2 <property name="brokerURL" value="tcp://localhost:61616" /> 3 </bean>
2.SingleConnectionFactory
使用org.springframework.jms.connection.SingleConnectionFactory對ActiveMQConnectionFactory進行包裝,建議用於測試或者單機的環境。
1 <bean id="connectionFactory" 2 class="org.springframework.jms.connection.SingleConnectionFactory"> 3 <property name="targetConnectionFactory"> 4 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 5 <property name="brokerURL" value="tcp://localhost:61616" /> 6 </bean> 7 </property> 8 </bean>
3.PooledConnectionFactory
使用org.apache.activemq.pool.PooledConnectionFactory對ActiveMQConnectionFactory進行包裝,暫不知有什么優化。
1 <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> 2 <property name="connectionFactory"> 3 <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 4 <property name="brokerURL" value="tcp://localhost:61616" /> 5 </bean> 6 </property> 7 </bean>
分布式事務
首先引入數據源的概念,數據源能提供數據或者能存儲數據,一般是兩個特點都具備。如數據庫、消息隊列,都是數據源。
如果你要使用多個數據源,那就要考慮分布式事務。比如從broker-A中接收消息,發送到broker-B。如果在發送的過程中發生錯誤,接收的消息就不應該確認,否則會被
broker-A移除,造成消息的丟失。或者你想從
broker-A中接收消息,寫入數據庫,也有同樣的問題。而分布式事務要處理的問題,就是涉及到多個數據源的事務問題,保證涉及多個數據源的操作要么同時成功,要么同時失敗。
關於分布式事務的討論,可選的參考
XA事務處理,下面針對兩種情景,講一下配置。
1.broker-A到broker-B
從broker-A中接收消息,發送到broker-B中。在這樣的情景中可能會出現:應用系統將數據發送到broker-B,發送失敗的數據先存儲到broker-A,然后由定時任務從broker-A中獲取數據,發送到broker-B。
在這里我們使用同步接收的方式,接收和發送都是基於JmsTemplate的。
Spring提供了org.springframework.transaction.jta.JtaTransactionManager,但是依賴於底層的應用服務器支持JTA全局事務。在這里我們沒有使用這樣的服務器,而是用Atomikos框架。
事務的配置:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:tx="http://www.springframework.org/schema/tx" 5 xsi:schemaLocation="http://www.springframework.org/schema/beans 6 http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 7 http://www.springframework.org/schema/tx 8 http://www.springframework.org/schema/tx/spring-tx-3.0.xsd"> 9 <!-- part-1 --> 10 <bean id="jtaTransactionManager" 11 class="org.springframework.transaction.jta.JtaTransactionManager"> 12 <property name="transactionManager"> 13 <bean class="com.atomikos.icatch.jta.UserTransactionManager" 14 init-method="init" destroy-method="close"> 15 <property name="forceShutdown" value="false" /> 16 </bean> 17 </property> 18 <property name="userTransaction"> 19 <bean class="com.atomikos.icatch.jta.UserTransactionImp"> 20 <property name="transactionTimeout" value="300" /> 21 </bean> 22 </property> 23 </bean> 24 25 <tx:annotation-driven transaction-manager="jtaTransactionManager" /> 26 27 <!-- part-2 --> 28 <bean id="sourceConnFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"> 29 <property name="uniqueResourceName" value="broker/source" /> 30 <property name="xaConnectionFactory"> 31 <bean class="org.apache.activemq.ActiveMQXAConnectionFactory"> 32 <property name="brokerURL" value="tcp://localhost:57015" /> 33 </bean> 34 </property> 35 <property name="maxPoolSize" value="10" /> 36 </bean> 37 38 <bean id="targetConnFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"> 39 <property name="uniqueResourceName" value="broker/target" /> 40 <property name="xaConnectionFactory"> 41 <bean class="org.apache.activemq.ActiveMQXAConnectionFactory"> 42 <property name="brokerURL" value="tcp://localhost:61616" /> 43 </bean> 44 </property> 45 <property name="maxPoolSize" value="10" /> 46 </bean> 47 48 <bean id="sourceJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 49 <property name="connectionFactory" ref="sourceConnFactory" /> 50 <property name="defaultDestinationName" value="asj.log" /> 51 <property name="receiveTimeout" value="3000" /> 52 <property name="sessionTransacted" value="true" /> 53 </bean> 54 55 <bean id="targetJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 56 <property name="connectionFactory" ref="targetConnFactory" /> 57 <property name="defaultDestinationName" value="asj.log" /> 58 <property name="sessionTransacted" value="true" /> 59 </bean> 60 </beans>
服務類獲取sourceJmsTemplate、targetJmsTemplate,在業務方法中使用前者接收消息,使用后者發送消息。業務代碼要注解為org.springframework.transaction.annotation.Transactional:
1 @Transactional 2 public boolean transport() throws JmsException{ 3 Message remsg = souJT.receive(); 4 if (remsg != null) { 5 final ObjectMessage omsg = (ObjectMessage) remsg; 6 7 tarJT.send(new MessageCreator() { 8 public Message createMessage(Session session) 9 throws JMSException { 10 Message result = session.createObjectMessage(omsg 11 .getObject()); 12 return result; 13 } 14 }); 15 } 16 return remsg != null; 17 }
實際上,我雖然在項目里用了上面的配置,但是對其原理卻不了解。另外有2個問題值得一說:
- 定時任務和事務的沖突
服務類是事務的,如果它同時作為定時任務,會有問題;后來把定時任務獨立了出來,定時調用服務類的業務方法。 - 事務和循環的沖突
本來計划在事務中使用循環,以souJT接收到的消息為null作為結束條件,每接收一條就發送一條。結果即使有更多的消息,也只執行一次,是不是由Atomikos造成的也無從知曉。后來就把循環放在了定時任務里,在循環體調用業務方法,根據返回的結果來判斷是否結束循環。
2.broker-B到數據庫
從broker-B中接收消息,寫入數據庫。這一次沒有使用Atomikos,也實現了當寫庫失敗時回滾消息的事務效果;而前一個小節broker-A到broker-B,如果不使用Atomikos就無法達到事務效果。或許因為它使用的是同步接收,而接下來我們使用的是異步接收。接收的方式是否有對分布式事務有關系,結論還不能確定。
在這里,我們甚至沒有再使用spring的JtaTransactionManager。
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:jms="http://www.springframework.org/schema/jms" 5 xsi:schemaLocation="http://www.springframework.org/schema/beans 6 http://www.springframework.org/schema/beans/spring-beans-4.2.xsd 7 http://www.springframework.org/schema/jms 8 http://www.springframework.org/schema/jms/spring-jms-4.2.xsd"> 9 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 10 <property name="brokerURL" value="${java.naming.provider.url}" /> 11 </bean> 12 13 <bean id="converteredLogListener" class="cn.sinobest.asj.log.listener.ConverteredLogListener" /> 14 15 <jms:listener-container connection-factory="connectionFactory" 16 concurrency="1" acknowledge="transacted"> 17 <jms:listener destination="asj.log" ref="converteredLogListener" 18 method="onLog" /> 19 </jms:listener-container> 20 </beans>
當然,我們必須聲明listener-container的acknowledge屬性為"transacted",以開啟事務。在ConverteredLogListener的onLog,調用服務組件將數據入庫,只要寫入失敗時拋出的異常能抵達onLog方法,事務就會回滾,所以請確保異常能拋出來。