一、本文章包含的内容
1、列举了ActiveMQ中通过Queue方式发送、消费队列的代码(普通文本、json/xml字符串、对象数据)
2、spring+activemq方式
二、配置信息
1、activemq的pom.xml信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
<!--activemq Begin-->
<
dependency
>
<
groupId
>org.springframework</
groupId
>
<
artifactId
>spring-jms</
artifactId
>
<
version
>${spring.version}</
version
>
</
dependency
>
<!-- <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>${spring.version}</version>
</dependency>-->
<
dependency
>
<
groupId
>org.apache.activemq</
groupId
>
<
artifactId
>activemq-all</
artifactId
>
<
version
>5.14.0</
version
>
</
dependency
>
<!--activemq End-->
|
2、activemq的配置文件:spring-jms.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
<!-- 启用spring mvc 注解 -->
<
context:component-scan
base-package
=
"org.soa.test.activemq"
/>
<!-- 配置JMS连接工厂 -->
<
bean
id
=
"connectionFactory"
class
=
"org.apache.activemq.ActiveMQConnectionFactory"
>
<!--解决接收消息抛出异常:javax.jms.JMSException: Failed to build body from content. Serializable class not available to broke-->
<
property
name
=
"trustAllPackages"
value
=
"true"
/>
<!-- 是否异步发送 -->
<
property
name
=
"useAsyncSend"
value
=
"true"
/>
</
bean
>
<!-- Queue模式 Begin -->
<!-- 定义消息队列(Queue) -->
<
bean
id
=
"queueDestination"
class
=
"org.apache.activemq.command.ActiveMQQueue"
>
<!-- 设置消息队列的名字 -->
<
constructor-arg
>
<
value
>defaultQueueName</
value
>
</
constructor-arg
>
</
bean
>
<!-- 配置JMS模板,Spring提供的JMS工具类,它发送、接收消息。(Queue) -->
<
bean
id
=
"jmsTemplate"
class
=
"org.springframework.jms.core.JmsTemplate"
>
<
property
name
=
"connectionFactory"
ref
=
"connectionFactory"
/>
<
property
name
=
"defaultDestination"
ref
=
"queueDestination"
/>
<
property
name
=
"pubSubDomain"
value
=
"false"
/>
<!--接收超时时间-->
<!--<property name="receiveTimeout" value="10000" />-->
</
bean
>
<!-- Queue模式 End -->
|
三、队列发送端及测试程序
1、发送代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
package
org.soa.test.activemq.queues;
import
org.soa.test.activemq.StudentInfo;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.beans.factory.annotation.Qualifier;
import
org.springframework.jms.core.JmsTemplate;
import
org.springframework.jms.core.MessageCreator;
import
org.springframework.stereotype.Component;
import
javax.jms.Destination;
import
javax.jms.JMSException;
import
javax.jms.Message;
import
javax.jms.Session;
import
java.util.List;
/**
* Created by JamesC on 16-9-22.
*/
@Component
public
class
ProduceMsg {
@Autowired
private
JmsTemplate jmsTemplate;
/**
* 向指定队列发送消息
*/
public
void
sendMessage(Destination destination,
final
String msg) {
System.out.println(
"向队列"
+ destination.toString() +
"发送了消息------------"
+ msg);
jmsTemplate.send(destination,
new
MessageCreator() {
public
Message createMessage(Session session)
throws
JMSException {
return
session.createTextMessage(msg);
}
});
}
/**
* 向默认队列发送消息(默认队列名称在bean:queueDestination配置)
*/
public
void
sendMessage(
final
String msg) {
String destination = jmsTemplate.getDefaultDestination().toString();
System.out.println(
"向队列"
+ destination +
"发送了消息------------"
+ msg);
jmsTemplate.send(
new
MessageCreator() {
public
Message createMessage(Session session)
throws
JMSException {
return
session.createTextMessage(msg);
}
});
}
/**
* 向默认队列发送消息
*/
public
void
sendMessageConvertAndSend(
final
Object msg) {
String destination = jmsTemplate.getDefaultDestination().toString();
System.out.println(
"向队列"
+ destination +
"发送了消息------------"
+ msg);
//使用内嵌的MessageConverter进行数据类型转换,包括xml(JAXB)、json(Jackson)、普通文本、字节数组
jmsTemplate.convertAndSend(destination, msg);
}
/**
* 向指定队列发送消息
*/
public
void
sendStudentInfo(Destination destination,
final
StudentInfo msg) {
System.out.println(
"向队列"
+ destination.toString() +
"发送了消息------------"
+ msg);
jmsTemplate.send(destination,
new
MessageCreator() {
public
Message createMessage(Session session)
throws
JMSException {
return
session.createObjectMessage(msg);
}
});
}
}
|
2、测试程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
package
org.soa.test.activemq.queues;
import
com.alibaba.fastjson.JSON;
import
org.apache.activemq.command.ActiveMQQueue;
import
org.junit.Test;
import
org.junit.runner.RunWith;
import
org.soa.test.activemq.StudentInfo;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.context.ApplicationContext;
import
org.springframework.test.context.ContextConfiguration;
import
org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import
org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import
javax.jms.Destination;
import
java.util.Date;
/**
* Created by JamesC on 16-9-22.
*/
@RunWith
(SpringJUnit4ClassRunner.
class
)
@ContextConfiguration
(
"/spring-jms.xml"
)
public
class
ProduceMsgTest
extends
AbstractJUnit4SpringContextTests {
@Autowired
protected
ApplicationContext ctx;
/**
* 队列名queue1 这里使用jms配置文件中的数据
*/
@Autowired
private
Destination queueDestination;
/**
* 队列消息生产者
*/
@Autowired
private
ProduceMsg produceMessage;
//向默认队列发消息(文本)
@Test
public
void
produceMsg_DefaultQueue() {
String msg =
"这里是向默认队列发送的消息"
+
new
Date().toString();
produceMessage.sendMessage(msg);
}
//向默认队列发消息(Json字符串)
@Test
public
void
produceMsg_Json() {
StudentInfo info =
new
StudentInfo();
info.setId(
1
);
info.setStdName(
"李磊"
);
info.setStdNo(
"001"
);
info.setEnterDate(
new
Date());
//队列存放的是时间戳
String alibabaJson = JSON.toJSONString(info);
produceMessage.sendMessage(alibabaJson);
}
//向默认队列发消息(使用convertAndSend发送对象)
@Test
public
void
produceMsg_ConvertAndSend() {
StudentInfo info =
new
StudentInfo();
info.setId(
1
);
info.setStdName(
"李磊"
);
info.setStdNo(
"001"
);
info.setEnterDate(
new
Date());
produceMessage.sendMessageConvertAndSend(info);
}
//向指定队列发消息(文本)
@Test
public
void
produceMsg_CustomQueue() {
for
(
int
i =
0
; i <
20
; i++) {
ActiveMQQueue myDestination =
new
ActiveMQQueue(
"queueCustom"
);
produceMessage.sendMessage(myDestination,
"----发送消息给queueCustom"
);
}
}
//向指定队列发消息(队列名称从XML读取)
@Test
public
void
produceMsg_XmlQueue() {
for
(
int
i =
0
; i <
20
; i++) {
ActiveMQQueue destinationQueue = (ActiveMQQueue) applicationContext.getBean(
"queueDestination"
);
produceMessage.sendMessage(destinationQueue,
"----send my msg to queueXml"
);
}
}
//向指定队列发消息(发送对象)
@Test
public
void
produceMsg_StudentInfo() {
StudentInfo info =
new
StudentInfo();
info.setId(
1
);
info.setStdName(
"李磊"
);
info.setStdNo(
"001"
);
info.setEnterDate(
new
Date());
ActiveMQQueue destination =
new
ActiveMQQueue(
"StudentInfo"
);
produceMessage.sendStudentInfo(destination, info);
}
}
|
四、队列消费端及测试程序
1、消费代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
package
org.soa.test.activemq.queues;
import
org.soa.test.activemq.StudentInfo;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.jms.core.JmsTemplate;
import
org.springframework.jms.support.JmsUtils;
import
org.springframework.stereotype.Component;
import
javax.jms.Destination;
import
javax.jms.JMSException;
import
javax.jms.ObjectMessage;
import
javax.jms.TextMessage;
/**
* Created by JamesC on 16-9-22.
*/
@Component
public
class
ConsumeMsg {
@Autowired
private
JmsTemplate jmsTemplate;
/**
* 接受消息
*/
public
String receive(Destination destination) {
TextMessage tm = (TextMessage) jmsTemplate.receive(destination);
String msg =
""
;
try
{
msg = tm.getText();
System.out.println(
"从队列"
+ destination.toString() +
"收到了消息:\t"
+ msg);
}
catch
(JMSException e) {
e.printStackTrace();
return
""
;
}
return
msg;
}
/**
* 接受消息
*/
public
StudentInfo receiveStudentInfo() {
try
{
String destination = jmsTemplate.getDefaultDestination().toString();
ObjectMessage msg=(ObjectMessage)jmsTemplate.receive(destination);
return
(StudentInfo)msg.getObject();
}
catch
(JMSException e) {
//检查性异常转换为非检查性异常
throw
JmsUtils.convertJmsAccessException(e);
}
}
/**
* 接受消息
*/
public
Object receiveConvertAndReceive() {
String destination = jmsTemplate.getDefaultDestination().toString();
Object msg = jmsTemplate.receiveAndConvert(destination);
return
msg;
}
}
|
2、测试程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package
org.soa.test.activemq.queues;
import
org.apache.activemq.command.ActiveMQQueue;
import
org.junit.Test;
import
org.junit.runner.RunWith;
import
org.soa.test.activemq.StudentInfo;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.test.context.ContextConfiguration;
import
org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* Created by JamesC on 16-9-22.
*/
@RunWith
(SpringJUnit4ClassRunner.
class
)
@ContextConfiguration
(
"/spring-jms.xml"
)
public
class
ConsumeMsgTest {
@Autowired
private
ConsumeMsg consumeMsg;
//从指定队列接收消息(文本)
@Test
public
void
receiveMsg() {
//没有消息阻塞一段时间后会抛异常
//java.lang.NullPointerException
ActiveMQQueue destination =
new
ActiveMQQueue(
"defaultQueueName"
);
consumeMsg.receive(destination);
}
//从指定队列接收消息(StudentInfo对象消息)
@Test
public
void
receiveStudentInfo() {
StudentInfo msg = consumeMsg.receiveStudentInfo();
System.out.println(msg.getStdName());
}
//从指定队列接收消息(Json对象)
@Test
public
void
receiveConvertAndReceive() {
StudentInfo msg =(StudentInfo) consumeMsg.receiveConvertAndReceive();
System.out.println(msg.getStdName());
}
}
|