AMQ學習筆記 - 13. Spring-jms的配置


概述


如何使用spring-jms來簡化jms客戶端的開發?
這篇文章主要記錄如何配置以便以后復用,而非原理的講解,有些內容我 沒有掌握原理。

producer端


producer端負責發送,這里使用JmsTemplate。

spring配置

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xsi:schemaLocation="http://www.springframework.org/schema/beans
 5         http://www.springframework.org/schema/beans/spring-beans-4.2.xsd">
 6         
 7     <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 8         <property name="brokerURL" value="tcp://localhost:61616" />
 9     </bean>
10     
11     <!-- create template for send message -->
12     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
13         <!-- bind the connection factory -->
14         <property name="connectionFactory" ref="connectionFactory" />
15         <property name="defaultDestinationName" value="jms-config" />
16     </bean>
17 </beans>

 

JmsTemplate默認將jms-config解析為Queue類型的Destination。如果需要將其解析為Topic類型,需要為jmsTemplate指定屬性 pubSubDomain=true,配置如下:
1 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
2     <!-- bind the connection factory -->
3     <property name="connectionFactory" ref="connectionFactory" />
4     <property name="pubSubDomain" value="true" />
5     <property name="defaultDestinationName" value="jms-config" />
6 </bean>

測試類

 1 package cn.sinobest.asj.producer.springsupport.jt;
 2 import javax.annotation.Resource;
 3 
 4 import org.junit.Test;
 5 import org.junit.runner.RunWith;
 6 import org.springframework.jms.core.JmsTemplate;
 7 import org.springframework.test.context.ContextConfiguration;
 8 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 9 
10 @RunWith(SpringJUnit4ClassRunner.class) // 配置spring組件運行時
11 @ContextConfiguration("/spring-jms-demo.xml") // 配置文件
12 public class JmsTemplateSendWithContextTest {
13     @Resource(name = "jmsTemplate")
14     private JmsTemplate jt;
15     
16     @Test
17     public void testSendToDefaultDestination() {
18         String message = "you can config JmsTemplate in xml, then use it for send.";
19         jt.convertAndSend(message);
20     }
21 }

 

展示這個測試類,是想告訴大家使用Spring+JUnit4的注解編寫單元測試,可以非常方便的加載Spring配置並初始化bean資源;使用Resource注解可以獲取bean資源。如何使用JmsTemplate來發送消息,反而不是重點,因為在08. Spring-JmsTemplate之發送中,已經詳細介紹了相關的API。

consumer端


consumer端負責接收,接收有同步、異步兩種方式, 03. 消息的接收方式 中有所介紹。

同步接收

這里使用JmsTemplate進行同步接收。上面已經給過了使用JmsTemplate發送的配置, 接收和發送的配置能有什么區別呢?
如果我們不希望客戶端一直阻塞等待消息,那么需要關心receiveTimeout屬性,單位毫秒。如果超過了這個時間,還沒有接收到消息,就返回null。
1 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
2     <!-- bind the connection factory -->
3     <property name="connectionFactory" ref="connectionFactory" />
4     <property name="defaultDestinationName" value="jms-config" />
5     <property name="receiveTimeout" value="3000" />
6 </bean>

異步接收

異步接收是基於監聽器的接收,傳統的配置方式是配置一個ListenerContainer的bean,在這個bean里維護listener。還有一種精簡的配置方案,可以在一個container中放置多個listener。

傳統的配置

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xsi:schemaLocation="http://www.springframework.org/schema/beans
 5         http://www.springframework.org/schema/beans/spring-beans-4.2.xsd">
 6         
 7     <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 8         <property name="brokerURL" value="tcp://localhost:61616" />
 9     </bean>
10     
11     <bean id="messageListener" class="cn.sinobest.asj.consumer.springsupport.async.SimpleMessageListener" />
12     <bean id="messageContainer"
13         class="org.springframework.jms.listener.DefaultMessageListenerContainer">
14         <property name="connectionFactory" ref="connectionFactory" />
15         <property name="destinationName" value="jms-config" />
16         <property name="messageListener" ref="messageListener" />
17     </bean>
18 </beans>

 

其中,SimpleMessageListener是接口javax.jms.MessageListener的實現類。

DefaultMessageListenerContainer負責將messageListener注冊到connectionFactory的destination,一旦destination中有消息,就會將消息推送給 messageListener。
DefaultMessageListenerContainer有很多特性的配置,下面擇要介紹:

1.Destination及類型

使用下面的API,可以設置Destination
  • public void setPubSubDomain(boolean pubSubDomain)
    設置destination的類型,true-Topics,false-Queues;默認為false。
  • public void setDestinationName(String destinationName)
    設置destination的name,結合pubSubDomain使用,根據destinationName解析為具體的Destination。
  • public void setDestination(Destination destination)
    設置destination。

2.多線程

一個DMLC 的實例,只能管理一個 MessageListener實例,但是可以使用下面的方法設置多線程:
  • public void setConcurrency(String concurrency)
    通過"lower-upper"格式的字符串,設置線程數的下限和上限,如"5-10";或者僅設置上限,下限默認為1,如"10"。
如果沒有使用事務,多線程可以顯著提高監聽器的接收速度。

3.確認模式

下面的API用來設置確認模式:
  • public void setSessionAcknowledgeMode(int sessionAcknowledgeMode)
    設置確認模式,sessionAcknowledgeMode可以取javax.jms.Session.AUTO_ACKNOWLEDGE(默認),javax.jms.Session.CLIENT_ACKNOWLEDGE,javax.jms.Session.DUPS_OK_ACKNOWLEDGE。
  • public void setSessionAcknowledgeModeName(String constantName)
    通過名字來設置確認模式,默認為"AUTO_ACKNOWLEDGE"。

4.事務

下面的API,用來設置事務:
  • public void setSessionTransacted(boolean sessionTransacted)
    設置是否使用事務,默認為false。
 
更多DefaultMessageListenerContainer的相關配置,參考其API。

精簡的配置

稱之為精簡版的配置,因為一個容器可以配置多個監聽器。
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xsi:schemaLocation="http://www.springframework.org/schema/beans
 5         http://www.springframework.org/schema/beans/spring-beans-4.2.xsd">
 6         
 7     <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 8         <property name="brokerURL" value="tcp://localhost:61616" />
 9     </bean>
10     
11     <bean id="messageListener" class="cn.sinobest.asj.consumer.springsupport.async.SimpleMessageListener" />
12     <bean id="messageContainer"
13         class="org.springframework.jms.listener.DefaultMessageListenerContainer">
14         <property name="connectionFactory" ref="connectionFactory" />
15         <property name="destinationName" value="jms-config" />
16         <property name="messageListener" ref="messageListener" />
17     </bean>
18 </beans> 
listener-container作為容器,可以有多個listener子元素,每個 listener代表一個監聽器。

1.Destination類型

listener-container有 destination-type屬性,可以取值["queue", "topic"],默認為"queue",它決定了 listener將destination解析為Queue還是Topic類型。

2.pojo監聽器

listener會對消息進行轉換,所以ref的目標bean是一個pojo,method是這個pojo的方法的名字——這個方法的參數要和Message中的數據類型兼容。下面給出我們使用的 PojoListener:
 1 package cn.sinobest.asj.consumer.springsupport.async;
 2 
 3 /**
 4  * 一個pojo作為listener,接收經過轉換后的消息.
 5  * @author lijinlong
 6  *
 7  */
 8 public class PojoListener {
 9     public void passMeMessage(String message) {
10         System.out.println("從queue收到消息:" + message);
11     }
12 } 

3.多線程

為listener-container元素設置concurrency屬性,可以指定 線程數的下限和上限。這一點和 DefaultMessageListenerContainer的相同

4.確認模式和事務4

listener-container的acknowledge屬性,可以指定確認模式或者事務,取值范圍["auto", "client", "dups-ok", "transacted"];默認為"auto",事務使用" transacted "。
 
更多精簡配置相關的參數,參考 spring-jms-4.2.xsd

可信任的包

ObjectMessage的使用機制是不安全的,ActiveMQ自5.12.2和5.13.0之后,強制consumer端聲明一份可信任的包列表,只有當ObjectMessage中的Object在可信任包內,才能被提取出來。
你可以這樣配置可信任包:
 1 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 2     <property name="brokerURL" value="tcp://localhost:61616" />
 3     <property name="trustedPackages">
 4         <list>
 5             <!-- you must add the package or parent-package of object which is put in ObjectMessage -->
 6             <value>java.lang</value>
 7             <value>java.sql</value>
 8             <value>cn.sinobest</value>
 9         </list>
10     </property>
11 </bean>
關於ObjectMessage安全性的說明,參考 http://activemq.apache.org/objectmessage.html

ConnectionFactory的bean配置


我在閱讀網友的博文的過程中,見過幾種配置的方式,但我並不了解它們之間的優劣區別。

1.ActiveMQConnectionFactory

在前面的配置中,我們已經接觸了這個配置:
1 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
2     <property name="brokerURL" value="tcp://localhost:61616" />
3 </bean>

2.SingleConnectionFactory

 
使用org.springframework.jms.connection.SingleConnectionFactory對ActiveMQConnectionFactory進行包裝,建議用於測試或者單機的環境。
1 <bean id="connectionFactory"
2     class="org.springframework.jms.connection.SingleConnectionFactory">
3     <property name="targetConnectionFactory">
4         <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
5             <property name="brokerURL" value="tcp://localhost:61616" />
6         </bean>
7     </property>
8 </bean>

3.PooledConnectionFactory

使用org.apache.activemq.pool.PooledConnectionFactory對ActiveMQConnectionFactory進行包裝,暫不知有什么優化。
1 <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
2     <property name="connectionFactory">
3         <bean class="org.apache.activemq.ActiveMQConnectionFactory">
4             <property name="brokerURL" value="tcp://localhost:61616" />
5         </bean>
6     </property>
7 </bean>

分布式事務


首先引入數據源的概念,數據源能提供數據或者能存儲數據,一般是兩個特點都具備。如數據庫、消息隊列,都是數據源。
如果你要使用多個數據源,那就要考慮分布式事務。比如從broker-A中接收消息,發送到broker-B。如果在發送的過程中發生錯誤,接收的消息就不應該確認,否則會被 broker-A移除,造成消息的丟失。或者你想從 broker-A中接收消息,寫入數據庫,也有同樣的問題。而分布式事務要處理的問題,就是涉及到多個數據源的事務問題,保證涉及多個數據源的操作要么同時成功,要么同時失敗。
關於分布式事務的討論,可選的參考 XA事務處理,下面針對兩種情景,講一下配置。

1.broker-A到broker-B

從broker-A中接收消息,發送到broker-B中。在這樣的情景中可能會出現:應用系統將數據發送到broker-B,發送失敗的數據先存儲到broker-A,然后由定時任務從broker-A中獲取數據,發送到broker-B。
在這里我們使用同步接收的方式,接收和發送都是基於JmsTemplate的。
Spring提供了org.springframework.transaction.jta.JtaTransactionManager,但是依賴於底層的應用服務器支持JTA全局事務。在這里我們沒有使用這樣的服務器,而是用Atomikos框架。
事務的配置:
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xmlns:tx="http://www.springframework.org/schema/tx"
 5     xsi:schemaLocation="http://www.springframework.org/schema/beans
 6         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 7         http://www.springframework.org/schema/tx
 8         http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
 9     <!-- part-1 -->
10     <bean id="jtaTransactionManager"
11         class="org.springframework.transaction.jta.JtaTransactionManager">
12         <property name="transactionManager">
13             <bean class="com.atomikos.icatch.jta.UserTransactionManager"
14                 init-method="init" destroy-method="close">
15                 <property name="forceShutdown" value="false" />
16             </bean>
17         </property>
18         <property name="userTransaction">
19             <bean class="com.atomikos.icatch.jta.UserTransactionImp">
20                 <property name="transactionTimeout" value="300" />
21             </bean>
22         </property>
23     </bean>
24     
25     <tx:annotation-driven transaction-manager="jtaTransactionManager" />
26     
27     <!-- part-2 -->
28     <bean id="sourceConnFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean">
29         <property name="uniqueResourceName" value="broker/source" />
30         <property name="xaConnectionFactory">
31             <bean class="org.apache.activemq.ActiveMQXAConnectionFactory">
32                 <property name="brokerURL" value="tcp://localhost:57015" />
33             </bean>
34         </property>
35         <property name="maxPoolSize" value="10" />
36     </bean>
37     
38     <bean id="targetConnFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean">
39         <property name="uniqueResourceName" value="broker/target" />
40         <property name="xaConnectionFactory">
41             <bean class="org.apache.activemq.ActiveMQXAConnectionFactory">
42                 <property name="brokerURL" value="tcp://localhost:61616" />
43             </bean>
44         </property>
45         <property name="maxPoolSize" value="10" />
46     </bean>
47     
48     <bean id="sourceJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
49         <property name="connectionFactory" ref="sourceConnFactory" />
50         <property name="defaultDestinationName" value="asj.log" />
51         <property name="receiveTimeout" value="3000" />
52         <property name="sessionTransacted" value="true" />
53     </bean>
54     
55     <bean id="targetJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
56         <property name="connectionFactory" ref="targetConnFactory" />
57         <property name="defaultDestinationName" value="asj.log" />
58         <property name="sessionTransacted" value="true" />
59     </bean>
60 </beans>

 

服務類獲取sourceJmsTemplate、targetJmsTemplate,在業務方法中使用前者接收消息,使用后者發送消息。業務代碼要注解為org.springframework.transaction.annotation.Transactional:

 1 @Transactional
 2 public boolean transport() throws JmsException{
 3     Message remsg = souJT.receive();
 4     if (remsg != null) {
 5         final ObjectMessage omsg = (ObjectMessage) remsg;
 6         
 7         tarJT.send(new MessageCreator() {
 8             public Message createMessage(Session session)
 9                     throws JMSException {
10                 Message result = session.createObjectMessage(omsg
11                         .getObject());
12                 return result;
13             }
14         });
15     }
16     return remsg != null;
17 }

 

實際上,我雖然在項目里用了上面的配置,但是對其原理卻不了解。另外有2個問題值得一說:

  1. 定時任務和事務的沖突
    服務類是事務的,如果它同時作為定時任務,會有問題;后來把定時任務獨立了出來,定時調用服務類的業務方法。
  2. 事務和循環的沖突
    本來計划在事務中使用循環,以souJT接收到的消息為null作為結束條件,每接收一條就發送一條。結果即使有更多的消息,也只執行一次,是不是由Atomikos造成的也無從知曉。后來就把循環放在了定時任務里,在循環體調用業務方法,根據返回的結果來判斷是否結束循環。

2.broker-B到數據庫

從broker-B中接收消息,寫入數據庫。這一次沒有使用Atomikos,也實現了當寫庫失敗時回滾消息的事務效果;而前一個小節broker-A到broker-B,如果不使用Atomikos就無法達到事務效果。或許因為它使用的是同步接收,而接下來我們使用的是異步接收。接收的方式是否有對分布式事務有關系,結論還不能確定。
在這里,我們甚至沒有再使用spring的JtaTransactionManager。
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xmlns:jms="http://www.springframework.org/schema/jms"
 5     xsi:schemaLocation="http://www.springframework.org/schema/beans
 6         http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
 7         http://www.springframework.org/schema/jms
 8         http://www.springframework.org/schema/jms/spring-jms-4.2.xsd">
 9     <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
10         <property name="brokerURL" value="${java.naming.provider.url}" />
11     </bean>
12     
13     <bean id="converteredLogListener" class="cn.sinobest.asj.log.listener.ConverteredLogListener" />
14     
15     <jms:listener-container connection-factory="connectionFactory"
16         concurrency="1" acknowledge="transacted">
17         <jms:listener destination="asj.log" ref="converteredLogListener"
18             method="onLog" />
19     </jms:listener-container>
20 </beans>

 

當然,我們必須聲明listener-container的acknowledge屬性為"transacted",以開啟事務。在ConverteredLogListener的onLog,調用服務組件將數據入庫,只要寫入失敗時拋出的異常能抵達onLog方法,事務就會回滾,所以請確保異常能拋出來。






免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM