本文章的完整代碼可從我的github中下載:https://github.com/huangbowen521/SpringJMSSample.git
上一篇文章中介紹了如何安裝和運行ActiveMQ。這一章主要講述如何使用Spring JMS向ActiveMQ的Message Queue中發消息和讀消息。
首先需要在項目中引入依賴庫。
spring-core: 用於啟動Spring容器,加載bean。
spring-jms:使用Spring JMS提供的API。
activemq-all:使用ActiveMQ提供的API。
在本示例中我使用maven來導入相應的依賴庫。
pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.0.2.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>4.0.2.RELEASE</version> </dependency> </dependencies>
|
接下來配置與ActiveMQ的連接,以及一個自定義的MessageSender。
springJMSConfiguration.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="location"> <value>application.properties</value> </property> </bean> <!-- Activemq connection factory --> <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <constructor-arg index="0" value="${jms.broker.url}"/> </bean> <!-- ConnectionFactory Definition --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="amqConnectionFactory"/> </bean> <!-- Default Destination Queue Definition--> <bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="${jms.queue.name}"/> </bean> <!-- JmsTemplate Definition --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestination" ref="defaultDestination"/> </bean> <!-- Message Sender Definition --> <bean id="messageSender" class="huangbowen.net.jms.MessageSender"> <constructor-arg index="0" ref="jmsTemplate"/> </bean> </beans>
|
在此配置文件中,我們配置了一個ActiveMQ的connection factory,使用的是ActiveMQ提供的ActiveMQConnectionFactory類。然后又配置了一個Spring JMS提供的CachingConnectionFactory。我們定義了一個ActiveMQQueue作為消息的接收Queue。並創建了一個JmsTemplate,使用了之前創建的ConnectionFactory和Message Queue作為參數。最后自定義了一個MessageSender,使用該JmsTemplate進行消息發送。
以下MessageSender的實現。
MessageSender.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
package huangbowen.net.jms; import org.springframework.jms.core.JmsTemplate; public class MessageSender { private final JmsTemplate jmsTemplate; public MessageSender(final JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public void send(final String text) { jmsTemplate.convertAndSend(text); } }
|
這個MessageSender很簡單,就是通過jmsTemplate發送一個字符串信息。
我們還需要配置一個Listener來監聽和處理當前的Message Queue。
springJMSReceiver.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- Message Receiver Definition --> <bean id="messageReceiver" class="huangbowen.net.jms.MessageReceiver"> </bean> <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destinationName" value="${jms.queue.name}"/> <property name="messageListener" ref="messageReceiver"/> </bean> </beans>
|
在上述xml文件中,我們自定義了一個MessageListener,並且使用Spring提供的SimpleMessageListenerContainer作為Container。
以下是MessageLinser的具體實現。
MessageReceiver.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
package huangbowen.net.jms; import javax.jms.*; public class MessageReceiver implements MessageListener { public void onMessage(Message message) { if(message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { String text = textMessage.getText(); System.out.println(String.format("Received: %s",text)); } catch (JMSException e) { e.printStackTrace(); } } } }
|
這個MessageListener也相當的簡單,就是從Queue中讀取出消息以后輸出到當前控制台中。
另外有關ActiveMQ的url和所使用的Message Queue的配置在application.properties文件中。
application.properties
1
2
|
jms.broker.url=tcp://localhost:61616 jms.queue.name=bar
|
好了,配置大功告成。如何演示那?我創建了兩個Main方法,一個用於發送消息到ActiveMQ的MessageQueue中,一個用於從MessageQueue中讀取消息。
SenderApp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
package huangbowen.net; import huangbowen.net.jms.MessageSender; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.util.StringUtils; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; public class SenderApp { public static void main( String[] args ) throws IOException { MessageSender sender = getMessageSender(); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); String text = br.readLine(); while (!StringUtils.isEmpty(text)) { System.out.println(String.format("send message: %s", text)); sender.send(text); text = br.readLine(); } } public static MessageSender getMessageSender() { ApplicationContext context = new ClassPathXmlApplicationContext("springJMSConfiguration.xml"); return (MessageSender) context.getBean("messageSender"); } }
|
ReceiverApp.java
1
2
3
4
5
6
7
8
9
10
|
package huangbowen.net; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ReceiverApp { public static void main( String[] args ) { new ClassPathXmlApplicationContext("springJMSConfiguration.xml", "springJMSReceiver.xml"); } }
|
OK,如果運行的話要先將ActiveMQ服務啟動起來(更多啟動方式參見我上篇文章)。
1
|
$:/usr/local/Cellar/activemq/5.8.0/libexec$ activemq start xbean:./conf/activemq-demo.xml
|
然后運行SenderApp中的Main方法,就可以在控制台中輸入消息發送到ActiveMQ的Message Queue中了。運行ReceiverApp中的Main方法,則會從Queue中將消息讀出來,打印到控制台。
這就是使用Spring JMS與ActiveMQ交互的一個簡單例子了。完整代碼可從https://github.com/huangbowen521/SpringJMSSample下載。