一、本文章包含的內容
1、列舉了ActiveMQ中通過Queue方式發送、消費隊列的代碼(普通文本、json/xml字符串、對象數據)
2、spring+activemq方式
二、配置信息
1、activemq的pom.xml信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
<!--activemq Begin-->
<
dependency
>
<
groupId
>org.springframework</
groupId
>
<
artifactId
>spring-jms</
artifactId
>
<
version
>${spring.version}</
version
>
</
dependency
>
<!-- <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>${spring.version}</version>
</dependency>-->
<
dependency
>
<
groupId
>org.apache.activemq</
groupId
>
<
artifactId
>activemq-all</
artifactId
>
<
version
>5.14.0</
version
>
</
dependency
>
<!--activemq End-->
|
2、activemq的配置文件:spring-jms.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
<!-- 啟用spring mvc 注解 -->
<
context:component-scan
base-package
=
"org.soa.test.activemq"
/>
<!-- 配置JMS連接工廠 -->
<
bean
id
=
"connectionFactory"
class
=
"org.apache.activemq.ActiveMQConnectionFactory"
>
<!--解決接收消息拋出異常:javax.jms.JMSException: Failed to build body from content. Serializable class not available to broke-->
<
property
name
=
"trustAllPackages"
value
=
"true"
/>
<!-- 是否異步發送 -->
<
property
name
=
"useAsyncSend"
value
=
"true"
/>
</
bean
>
<!-- Queue模式 Begin -->
<!-- 定義消息隊列(Queue) -->
<
bean
id
=
"queueDestination"
class
=
"org.apache.activemq.command.ActiveMQQueue"
>
<!-- 設置消息隊列的名字 -->
<
constructor-arg
>
<
value
>defaultQueueName</
value
>
</
constructor-arg
>
</
bean
>
<!-- 配置JMS模板,Spring提供的JMS工具類,它發送、接收消息。(Queue) -->
<
bean
id
=
"jmsTemplate"
class
=
"org.springframework.jms.core.JmsTemplate"
>
<
property
name
=
"connectionFactory"
ref
=
"connectionFactory"
/>
<
property
name
=
"defaultDestination"
ref
=
"queueDestination"
/>
<
property
name
=
"pubSubDomain"
value
=
"false"
/>
<!--接收超時時間-->
<!--<property name="receiveTimeout" value="10000" />-->
</
bean
>
<!-- Queue模式 End -->
|
三、隊列發送端及測試程序
1、發送代碼
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
package
org.soa.test.activemq.queues;
import
org.soa.test.activemq.StudentInfo;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.beans.factory.annotation.Qualifier;
import
org.springframework.jms.core.JmsTemplate;
import
org.springframework.jms.core.MessageCreator;
import
org.springframework.stereotype.Component;
import
javax.jms.Destination;
import
javax.jms.JMSException;
import
javax.jms.Message;
import
javax.jms.Session;
import
java.util.List;
/**
* Created by JamesC on 16-9-22.
*/
@Component
public
class
ProduceMsg {
@Autowired
private
JmsTemplate jmsTemplate;
/**
* 向指定隊列發送消息
*/
public
void
sendMessage(Destination destination,
final
String msg) {
System.out.println(
"向隊列"
+ destination.toString() +
"發送了消息------------"
+ msg);
jmsTemplate.send(destination,
new
MessageCreator() {
public
Message createMessage(Session session)
throws
JMSException {
return
session.createTextMessage(msg);
}
});
}
/**
* 向默認隊列發送消息(默認隊列名稱在bean:queueDestination配置)
*/
public
void
sendMessage(
final
String msg) {
String destination = jmsTemplate.getDefaultDestination().toString();
System.out.println(
"向隊列"
+ destination +
"發送了消息------------"
+ msg);
jmsTemplate.send(
new
MessageCreator() {
public
Message createMessage(Session session)
throws
JMSException {
return
session.createTextMessage(msg);
}
});
}
/**
* 向默認隊列發送消息
*/
public
void
sendMessageConvertAndSend(
final
Object msg) {
String destination = jmsTemplate.getDefaultDestination().toString();
System.out.println(
"向隊列"
+ destination +
"發送了消息------------"
+ msg);
//使用內嵌的MessageConverter進行數據類型轉換,包括xml(JAXB)、json(Jackson)、普通文本、字節數組
jmsTemplate.convertAndSend(destination, msg);
}
/**
* 向指定隊列發送消息
*/
public
void
sendStudentInfo(Destination destination,
final
StudentInfo msg) {
System.out.println(
"向隊列"
+ destination.toString() +
"發送了消息------------"
+ msg);
jmsTemplate.send(destination,
new
MessageCreator() {
public
Message createMessage(Session session)
throws
JMSException {
return
session.createObjectMessage(msg);
}
});
}
}
|
2、測試程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
package
org.soa.test.activemq.queues;
import
com.alibaba.fastjson.JSON;
import
org.apache.activemq.command.ActiveMQQueue;
import
org.junit.Test;
import
org.junit.runner.RunWith;
import
org.soa.test.activemq.StudentInfo;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.context.ApplicationContext;
import
org.springframework.test.context.ContextConfiguration;
import
org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import
org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import
javax.jms.Destination;
import
java.util.Date;
/**
* Created by JamesC on 16-9-22.
*/
@RunWith
(SpringJUnit4ClassRunner.
class
)
@ContextConfiguration
(
"/spring-jms.xml"
)
public
class
ProduceMsgTest
extends
AbstractJUnit4SpringContextTests {
@Autowired
protected
ApplicationContext ctx;
/**
* 隊列名queue1 這里使用jms配置文件中的數據
*/
@Autowired
private
Destination queueDestination;
/**
* 隊列消息生產者
*/
@Autowired
private
ProduceMsg produceMessage;
//向默認隊列發消息(文本)
@Test
public
void
produceMsg_DefaultQueue() {
String msg =
"這里是向默認隊列發送的消息"
+
new
Date().toString();
produceMessage.sendMessage(msg);
}
//向默認隊列發消息(Json字符串)
@Test
public
void
produceMsg_Json() {
StudentInfo info =
new
StudentInfo();
info.setId(
1
);
info.setStdName(
"李磊"
);
info.setStdNo(
"001"
);
info.setEnterDate(
new
Date());
//隊列存放的是時間戳
String alibabaJson = JSON.toJSONString(info);
produceMessage.sendMessage(alibabaJson);
}
//向默認隊列發消息(使用convertAndSend發送對象)
@Test
public
void
produceMsg_ConvertAndSend() {
StudentInfo info =
new
StudentInfo();
info.setId(
1
);
info.setStdName(
"李磊"
);
info.setStdNo(
"001"
);
info.setEnterDate(
new
Date());
produceMessage.sendMessageConvertAndSend(info);
}
//向指定隊列發消息(文本)
@Test
public
void
produceMsg_CustomQueue() {
for
(
int
i =
0
; i <
20
; i++) {
ActiveMQQueue myDestination =
new
ActiveMQQueue(
"queueCustom"
);
produceMessage.sendMessage(myDestination,
"----發送消息給queueCustom"
);
}
}
//向指定隊列發消息(隊列名稱從XML讀取)
@Test
public
void
produceMsg_XmlQueue() {
for
(
int
i =
0
; i <
20
; i++) {
ActiveMQQueue destinationQueue = (ActiveMQQueue) applicationContext.getBean(
"queueDestination"
);
produceMessage.sendMessage(destinationQueue,
"----send my msg to queueXml"
);
}
}
//向指定隊列發消息(發送對象)
@Test
public
void
produceMsg_StudentInfo() {
StudentInfo info =
new
StudentInfo();
info.setId(
1
);
info.setStdName(
"李磊"
);
info.setStdNo(
"001"
);
info.setEnterDate(
new
Date());
ActiveMQQueue destination =
new
ActiveMQQueue(
"StudentInfo"
);
produceMessage.sendStudentInfo(destination, info);
}
}
|
四、隊列消費端及測試程序
1、消費代碼
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
package
org.soa.test.activemq.queues;
import
org.soa.test.activemq.StudentInfo;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.jms.core.JmsTemplate;
import
org.springframework.jms.support.JmsUtils;
import
org.springframework.stereotype.Component;
import
javax.jms.Destination;
import
javax.jms.JMSException;
import
javax.jms.ObjectMessage;
import
javax.jms.TextMessage;
/**
* Created by JamesC on 16-9-22.
*/
@Component
public
class
ConsumeMsg {
@Autowired
private
JmsTemplate jmsTemplate;
/**
* 接受消息
*/
public
String receive(Destination destination) {
TextMessage tm = (TextMessage) jmsTemplate.receive(destination);
String msg =
""
;
try
{
msg = tm.getText();
System.out.println(
"從隊列"
+ destination.toString() +
"收到了消息:\t"
+ msg);
}
catch
(JMSException e) {
e.printStackTrace();
return
""
;
}
return
msg;
}
/**
* 接受消息
*/
public
StudentInfo receiveStudentInfo() {
try
{
String destination = jmsTemplate.getDefaultDestination().toString();
ObjectMessage msg=(ObjectMessage)jmsTemplate.receive(destination);
return
(StudentInfo)msg.getObject();
}
catch
(JMSException e) {
//檢查性異常轉換為非檢查性異常
throw
JmsUtils.convertJmsAccessException(e);
}
}
/**
* 接受消息
*/
public
Object receiveConvertAndReceive() {
String destination = jmsTemplate.getDefaultDestination().toString();
Object msg = jmsTemplate.receiveAndConvert(destination);
return
msg;
}
}
|
2、測試程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package
org.soa.test.activemq.queues;
import
org.apache.activemq.command.ActiveMQQueue;
import
org.junit.Test;
import
org.junit.runner.RunWith;
import
org.soa.test.activemq.StudentInfo;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.test.context.ContextConfiguration;
import
org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* Created by JamesC on 16-9-22.
*/
@RunWith
(SpringJUnit4ClassRunner.
class
)
@ContextConfiguration
(
"/spring-jms.xml"
)
public
class
ConsumeMsgTest {
@Autowired
private
ConsumeMsg consumeMsg;
//從指定隊列接收消息(文本)
@Test
public
void
receiveMsg() {
//沒有消息阻塞一段時間后會拋異常
//java.lang.NullPointerException
ActiveMQQueue destination =
new
ActiveMQQueue(
"defaultQueueName"
);
consumeMsg.receive(destination);
}
//從指定隊列接收消息(StudentInfo對象消息)
@Test
public
void
receiveStudentInfo() {
StudentInfo msg = consumeMsg.receiveStudentInfo();
System.out.println(msg.getStdName());
}
//從指定隊列接收消息(Json對象)
@Test
public
void
receiveConvertAndReceive() {
StudentInfo msg =(StudentInfo) consumeMsg.receiveConvertAndReceive();
System.out.println(msg.getStdName());
}
}
|