消息隊列(MQ)越來越火,在java開發的項目也屬於比較常見的技術,MQ的相關使用也成java開發人員必備的技能。筆者公司采用的MQ是ActiveMQ,且消息都是用的點對點的模式。本文記錄了實現Spring整合ActivateMQ的全過程及如何使用MQ,便於后續查閱。
一、項目的搭建
采用maven構建項目,免去了copy jar包的麻煩。因此,我們創建了一個java類型的Maven Project
(1)項目結構圖
先把項目結構圖看一下,便於對項目的理解。

(2)pom.xml
我們需要加入以下的依賴:
- spring-jms
- activemq-all
具體見下面代碼
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zxy</groupId>
<artifactId>spring-activemq</artifactId>
<version>1.0-SNAPSHOT</version>
<name>spring-activemq</name>
<dependencies>
<!-- spring-jms 依賴 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.10.RELEASE</version>
</dependency>
<!-- activemq依賴 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.1</version>
</dependency>
</dependencies>
</project>
24
1
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2
<modelVersion>4.0.0</modelVersion>
3
<groupId>com.zxy</groupId>
4
<artifactId>spring-activemq</artifactId>
5
<version>1.0-SNAPSHOT</version>
6
<name>spring-activemq</name>
7
8
<dependencies>
9
<!-- spring-jms 依賴 -->
10
<dependency>
11
<groupId>org.springframework</groupId>
12
<artifactId>spring-jms</artifactId>
13
<version>4.3.10.RELEASE</version>
14
</dependency>
15
16
<!-- activemq依賴 -->
17
<dependency>
18
<groupId>org.apache.activemq</groupId>
19
<artifactId>activemq-all</artifactId>
20
<version>5.9.1</version>
21
</dependency>
22
</dependencies>
23
24
</project>
二、整合+寫代碼
先說明下,公司只用了ActiveMQ的點對點消息模式(queue),並沒有用發布訂閱模式(topic)。所以下文的整合也是按queue類型消息來配置的。
(1)配置applicationContext.xml
Spring整合ActiveMQ步驟如下:
- 注冊ActiveMQ連接工廠—— 配置與ActiveMQ相關的一些基本配置
- 注冊Spring Cache連接工廠—— 類似於數據庫連接池一樣的東西,用於提高效率。后續Connection和Session都是通過它來獲取,不直接和ActiveMQ發生關系
- 注冊JmsTemplate —— 主要用來發送MQ消息
- 注冊Queue監聽 —— 主要用來配置MQ消息的消費者
說明:因為JmsTemplate每次發送消息都需要創建Connection和Session,效率低。所以使用
Spring的CachingConnectionFactory連接工廠來管理Connection和Session,原理類似於數據庫連接池
具體配置代碼如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:c="http://www.springframework.org/schema/c"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.3.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd">
<!-- 開啟包掃描 (減少在xml中注冊bean)-->
<context:component-scan base-package="com.zxy.mq" />
<!-- #### ActiveMq配置 start ####-->
<!-- 1. ActiveMq連接工廠 -->
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<!--2. Spring Caching 連接工廠(類似數據庫線程池的東西,減少連接的創建。) -->
<!-- 由於jmsTemplate每次發送消息都需要創建連接和創建session了,所以引入這個類似連接池的連接工廠,優化Mq的性能 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目標連接工廠 指向 ActiveMq工廠 -->
<property name="targetConnectionFactory" ref="activeMQConnectionFactory" />
<!-- session緩存的最大個數-->
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 3.配置jmsTemplate,用於發送發送mq消息 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<!-- 設置 jmsTemplate 不支持訂閱模式,即:只支持queue模式。
如果項目需要同時支持queue和topic,那么就需要另外注冊一個jmsTemplate(把pubSubDomain設為true)-->
<property name="pubSubDomain" value="false"></property>
</bean>
<!-- 4.定義Queue監聽器 -->
<jms:listener-container destination-type="queue" connection-factory="connectionFactory">
<!-- TODO 每添加一個queue的監聽,都需要在這里添加一個配置 -->
<!-- 這樣配置就可以方便的對多個隊列監聽 , 每增加一個隊列只需要添加一個 jms:listener -->
<!-- destination:隊列名稱, ref:指向對應的監聽器對象 -->
<!-- 示例: <jms:listener destination="queueName" ref="consumerBean" /> -->
</jms:listener-container>
<!-- #### ActiveMq配置 end ####-->
</beans>
48
1
2
<beans xmlns="http://www.springframework.org/schema/beans"
3
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4
xmlns:context="http://www.springframework.org/schema/context"
5
xmlns:jms="http://www.springframework.org/schema/jms"
6
xmlns:c="http://www.springframework.org/schema/c"
7
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.3.xsd
8
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
9
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd">
10
11
<!-- 開啟包掃描 (減少在xml中注冊bean)-->
12
<context:component-scan base-package="com.zxy.mq" />
13
14
<!-- #### ActiveMq配置 start ####-->
15
<!-- 1. ActiveMq連接工廠 -->
16
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
17
<property name="brokerURL" value="tcp://localhost:61616" />
18
</bean>
19
20
<!--2. Spring Caching 連接工廠(類似數據庫線程池的東西,減少連接的創建。) -->
21
<!-- 由於jmsTemplate每次發送消息都需要創建連接和創建session了,所以引入這個類似連接池的連接工廠,優化Mq的性能 -->
22
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
23
<!-- 目標連接工廠 指向 ActiveMq工廠 -->
24
<property name="targetConnectionFactory" ref="activeMQConnectionFactory" />
25
<!-- session緩存的最大個數-->
26
<property name="sessionCacheSize" value="100" />
27
</bean>
28
29
<!-- 3.配置jmsTemplate,用於發送發送mq消息 -->
30
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
31
<property name="connectionFactory" ref="connectionFactory" />
32
<!-- 設置 jmsTemplate 不支持訂閱模式,即:只支持queue模式。
33
如果項目需要同時支持queue和topic,那么就需要另外注冊一個jmsTemplate(把pubSubDomain設為true)-->
34
<property name="pubSubDomain" value="false"></property>
35
</bean>
36
37
<!-- 4.定義Queue監聽器 -->
38
<jms:listener-container destination-type="queue" connection-factory="connectionFactory">
39
<!-- TODO 每添加一個queue的監聽,都需要在這里添加一個配置 -->
40
<!-- 這樣配置就可以方便的對多個隊列監聽 , 每增加一個隊列只需要添加一個 jms:listener -->
41
<!-- destination:隊列名稱, ref:指向對應的監聽器對象 -->
42
<!-- 示例: <jms:listener destination="queueName" ref="consumerBean" /> -->
43
44
</jms:listener-container>
45
<!-- #### ActiveMq配置 end ####-->
46
47
</beans>
48
(2)寫一個通用的MQ消息生產者
一般我們傳輸mq消息都是json字符串。因此,文本類型的消息就能滿足我們常見的需求了。所以我們可以認為文本消息就是通用的MQ消息類型。
代碼如下:
package com.zxy.mq.producer;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
/**
* 通用的ActiveMQ queue消息生產者
* @author ZENG.XIAO.YAN
* @time 2018-11-13 10:48:20
* @version v1.0
*/
@Component
public class CommonMqQueueProducer {
@Autowired
private JmsTemplate jmsTemplate;
/**
* 發送點對點的文本類型的Mq消息
* @param queue 隊列的名字
* @param message 文本消息(一般直接傳輸json字符串,所以可以認為文本消息是最通用的)
*/
public void send(String queue, String message) {
jmsTemplate.send(queue, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}
35
1
package com.zxy.mq.producer;
2
import javax.jms.JMSException;
3
import javax.jms.Message;
4
import javax.jms.Session;
5
import org.springframework.beans.factory.annotation.Autowired;
6
import org.springframework.jms.core.JmsTemplate;
7
import org.springframework.jms.core.MessageCreator;
8
import org.springframework.stereotype.Component;
9
10
/**
11
* 通用的ActiveMQ queue消息生產者
12
* @author ZENG.XIAO.YAN
13
* @time 2018-11-13 10:48:20
14
* @version v1.0
15
*/
16
17
public class CommonMqQueueProducer {
18
19
private JmsTemplate jmsTemplate;
20
21
/**
22
* 發送點對點的文本類型的Mq消息
23
* @param queue 隊列的名字
24
* @param message 文本消息(一般直接傳輸json字符串,所以可以認為文本消息是最通用的)
25
*/
26
public void send(String queue, String message) {
27
jmsTemplate.send(queue, new MessageCreator() {
28
29
public Message createMessage(Session session) throws JMSException {
30
return session.createTextMessage(message);
31
}
32
});
33
}
34
35
}
(3)寫2個消費者
消費者類需要實現
MessageListener接口,然后重寫onMessage方法,且消費者需要交給Spring容器來實例化。
消費者A的代碼如下:
package com.zxy.mq.consumer;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;
@Component
public class TestAConsumer implements MessageListener {
@Override
public void onMessage(Message message) {
// myQueueA的消費者
try {
String text = ((TextMessage) message).getText();
System.out.println(this.getClass().getSimpleName() + "接受到消息---->" + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
22
1
package com.zxy.mq.consumer;
2
import javax.jms.JMSException;
3
import javax.jms.Message;
4
import javax.jms.MessageListener;
5
import javax.jms.TextMessage;
6
import org.springframework.stereotype.Component;
7
8
9
public class TestAConsumer implements MessageListener {
10
11
12
public void onMessage(Message message) {
13
// myQueueA的消費者
14
try {
15
String text = ((TextMessage) message).getText();
16
System.out.println(this.getClass().getSimpleName() + "接受到消息---->" + text);
17
} catch (JMSException e) {
18
e.printStackTrace();
19
}
20
}
21
22
}
消費者B的代碼如下:
package com.zxy.mq.consumer;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;
@Component
public class TestBConsumer implements MessageListener {
@Override
public void onMessage(Message message) {
// myQueueB的消費者
try {
String text = ((TextMessage) message).getText();
System.out.println(this.getClass().getSimpleName() + "接受到消息---->" + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
22
1
package com.zxy.mq.consumer;
2
import javax.jms.JMSException;
3
import javax.jms.Message;
4
import javax.jms.MessageListener;
5
import javax.jms.TextMessage;
6
import org.springframework.stereotype.Component;
7
8
9
public class TestBConsumer implements MessageListener {
10
11
12
public void onMessage(Message message) {
13
// myQueueB的消費者
14
try {
15
String text = ((TextMessage) message).getText();
16
System.out.println(this.getClass().getSimpleName() + "接受到消息---->" + text);
17
} catch (JMSException e) {
18
e.printStackTrace();
19
}
20
}
21
22
}
(4)queue監聽器中配置Listener並指向消費者
由於消費者需要監聽queue消息,因此
需要在applicationContext.xml的Queue監聽器里面添加Listener並ref對應消費者。
配置消費者A監聽的queue的名字為myQueueA,消費者B監聽的queue的名字為myQueueB。
具體的配置見下圖

三、測試
利用通用生產者來發送mq消息,然后消費者收到消息后會打印到控制台。
(1)測試代碼
發送10個mq消息,myQueueA隊列5個,myQueueB隊列5個
測試代碼如下:
package com.zxy.mq.test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.zxy.mq.producer.CommonMqQueueProducer;
/**
* MQ消息測試類
* @author ZENG.XIAO.YAN
* @time 2018-11-15 14:04:35
* @version v1.0
*/
public class MqTestDemo {
private static ApplicationContext applicationContext;
// 靜態代碼塊加載Spring容器
static {
applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
System.out.println("spring 容器已啟動。。。");
}
public static void main(String[] args) {
CommonMqQueueProducer mqQueueProducer = applicationContext.getBean(CommonMqQueueProducer.class);
for (int i = 1; i < 11; i++) {
// 奇數給myQueueA發,偶數給myQueueB發
if (i % 2 == 1) {
mqQueueProducer.send("myQueueA", "Mq消息A" + i);
} else {
mqQueueProducer.send("myQueueB", "Mq消息B" + i);
}
}
}
}
x
1
package com.zxy.mq.test;
2
import org.springframework.context.ApplicationContext;
3
import org.springframework.context.support.ClassPathXmlApplicationContext;
4
import com.zxy.mq.producer.CommonMqQueueProducer;
5
6
/**
7
* MQ消息測試類
8
* @author ZENG.XIAO.YAN
9
* @time 2018-11-15 14:04:35
10
* @version v1.0
11
*/
12
public class MqTestDemo {
13
private static ApplicationContext applicationContext;
14
// 靜態代碼塊加載Spring容器
15
static {
16
applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
17
System.out.println("spring 容器已啟動。。。");
18
}
19
20
public static void main(String[] args) {
21
CommonMqQueueProducer mqQueueProducer = applicationContext.getBean(CommonMqQueueProducer.class);
22
for (int i = 1; i < 11; i++) {
23
// 奇數給myQueueA發,偶數給myQueueB發
24
if (i % 2 == 1) {
25
mqQueueProducer.send("myQueueA", "Mq消息A" + i);
26
} else {
27
mqQueueProducer.send("myQueueB", "Mq消息B" + i);
28
}
29
}
30
}
31
32
}
(2)測試結果
通過控制台可以發現,對應queue的消費者接受到了消息,說明Spring整合ActiveMQ整合成功了。
控制台輸出結果見下圖

四、小結
(1)由於使用ActiveMQ官方原生的代碼來發送MQ消息的代碼比較復雜,因此采用JmsTemplate來發送MQ消息
(2)由於JmsTemplate發送MQ消息時每次都要創建Connection和Session。因此引入Spring提供的CachingConnectionFactory,起到類似於數據庫連接池的效果
(3)注冊JmsTemplate時,pubSubDomain這個屬性的值要特別注意。默認值是false,也就是說默認只是支持queue模式,不支持topic模式。
但是,如果將它改為true,則不支持queue模式。因此如果項目需要同時支持queue和topic模式,那么需要注冊2個JmsTemplate,同時監聽容器(
<jms:listener-container>
)也需要注冊2個