springboot 整合 ActiveMQ
pom 坐標
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- 整合 activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 這里我使用的是測試發送消息 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
yml 文件
server:
port: 8080
spring:
activemq:
broker-url: tcp://localhost:61616
packages:
trusted: com.mozq.activemq.domain,com.mozq.activemq.entity,java.util
# trust-all: true
消息生產者
使用 springboot 測試類實現
ObjectMessage 被發送的對象必須實現序列化。如果嵌套,都要實現序列化。Customer 對象有個成員是 LinkMan 對象,則 Customer 和 LinkMan 必須都實現序列化。發送集合對象。List<User>
List 實現了序列化,其中的元素 User 也必須實現序列化。
package com.mozq.activemq;
import com.mozq.activemq.domain.Customer;
import com.mozq.activemq.domain.LinkMan;
import com.mozq.activemq.domain.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import javax.jms.*;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.*;
@RunWith(SpringRunner.class)
@SpringBootTest(classes= SmsApplication.class)
public class DemoTest {
@Autowired
private JmsTemplate jmsTemplate;
/**
* TextMessage 常用,可以處理字符串和對象。如果是對象需要轉為 json 字符串。
*/
@Test
public void sendText(){
/*
String message = "這是測試短信";
jmsTemplate.convertAndSend("mozq.queue.text", message);
*/
jmsTemplate.send("mozq.queue.text", session -> {
TextMessage textMessage = session.createTextMessage("這是測試短信");
return textMessage;
});
}
/**
* ObjectMessage 不安全,也挺常用的。
*/
@Test
public void sendObject(){
List<String> strList = new ArrayList<>();
strList.add("長河");
strList.add("落日");
jmsTemplate.convertAndSend("mozq.queue.obj", strList);
/*
jmsTemplate.send("mozq.queue.obj", session -> {
ObjectMessage objectMessage = session.createObjectMessage((Serializable) strList);
return objectMessage;
});
*/
}
@Test
public void sendObject2(){
User user = new User();
user.setName("曹操");
user.setAge(20);
jmsTemplate.send("mozq.queue.user", session -> {
ObjectMessage objectMessage = session.createObjectMessage((Serializable) user);
return objectMessage;
});
}
/**
* Customer 對象中有成員 LinkMan 對象。則 Customer 和 LinkMan 必須都實現序列化。
*/
@Test
public void sendObject3(){
LinkMan linkMan = new LinkMan();
linkMan.setName("關羽");
Customer customer = new Customer();
customer.setName("諸葛");
customer.setLinkMan(linkMan);
jmsTemplate.send("mozq.queue.customer", session -> {
ObjectMessage objectMessage = session.createObjectMessage((Serializable) customer);
return objectMessage;
});
}
@Test
public void sendMap(){
Map<String, Object> user = new HashMap<>();
user.put("name", "劉備");
user.put("age", 18);
jmsTemplate.convertAndSend("mozq.queue.map", user);
// jmsTemplate.send("mozq.queue.map", session -> {
// MapMessage mapMessage = session.createMapMessage();
// mapMessage.setObject("name", "劉備");
// mapMessage.setInt("age", 18);
// return mapMessage;
// });
}
/**
* 特點提供了各種數據類型讀寫方法,讀取順序必須和寫入順序一致
*/
@Test
public void testBytes(){
jmsTemplate.send("mozq.queue.bytes", session -> {
try {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeLong(100L);
bytesMessage.writeBytes("字節流".getBytes("UTF-8"));
return bytesMessage;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
});
}
/**
* 特點提供了各種數據類型讀寫方法,讀取順序必須和寫入順序一致
*/
@Test
public void testStream(){
jmsTemplate.send("mozq.queue.stream", session -> {
try {
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("孫權");
streamMessage.writeString("大喬");
streamMessage.writeBytes("赤壁".getBytes("UTF-8"));
return streamMessage;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
});
}
}
消息消費者
package com.mozq.activemq.listener;
import com.mozq.activemq.domain.Customer;
import com.mozq.activemq.domain.User;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.*;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import java.util.List;
@Component
public class JmsQueueListener {
@JmsListener(destination = "mozq.queue.text")
public void receiveQueue(TextMessage textMessage){
try {
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
@JmsListener(destination = "mozq.queue.obj")
public void receiveQueue(ObjectMessage objectMessage){
try {
List<String> list = (List<String>) objectMessage.getObject();
System.out.println(list);
} catch (JMSException e) {
e.printStackTrace();
}
}
@JmsListener(destination = "mozq.queue.user")
public void receiveQueue2(ObjectMessage objectMessage){
try {
User user = (User) objectMessage.getObject();
System.out.println(user);
} catch (JMSException e) {
e.printStackTrace();
}
}
@JmsListener(destination = "mozq.queue.customer")
public void receiveQueue3(ObjectMessage objectMessage){
try {
Customer customer = (Customer) objectMessage.getObject();
System.out.println(customer);//Customer{name='諸葛', linkMan=LinkMan{name='關羽'}}
} catch (JMSException e) {
e.printStackTrace();
}
}
@JmsListener(destination = "mozq.queue.map")
public void receiveQueue(MapMessage mapMessage){
try {
Enumeration mapNames = mapMessage.getMapNames();
while(mapNames.hasMoreElements()){
Object key = mapNames.nextElement();
Object value = mapMessage.getObject((String) key);
System.out.println(key + ":" + value);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* BytesMessage 特點提供了各種數據類型讀寫方法,讀取順序必須和寫入順序一致
* @param bytesMessage
*/
@JmsListener(destination = "mozq.queue.bytes")
public void receiveQueue(BytesMessage bytesMessage){
try {
long num = bytesMessage.readLong();
byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(bytes);
String text = new String(bytes, "UTF-8");
System.out.println(num + ":" + text);
} catch (JMSException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
/**
* StreamMessage 特點提供了各種數據類型讀寫方法,讀取順序必須和寫入順序一致
* @param streamMessage
*/
@JmsListener(destination = "mozq.queue.stream")
public void receiveQueue(StreamMessage streamMessage){
try {
String name = streamMessage.readString();
String wife = streamMessage.readString();
byte[] bytes = new byte[1024];
streamMessage.readBytes(bytes);
String text = new String(bytes, "UTF-8");
System.out.println(name + ":" + wife + ":" + text);
} catch (JMSException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
實體類 User Customer LinkMan
public class User implements Serializable {
private String name;
private int age;
}
public class Customer implements Serializable {
private String name;
private LinkMan linkMan;
}
// 如果 LinkMan 不實現序列化,用 ObjectMessage 發送 Customer 對象會報錯。
public class LinkMan implements Serializable {
private String name;
}
bugs
java.lang.ClassCastException: com.mozq.activemq.domain.User cannot be cast to java.io.Serializable
錯誤代碼:
ObjectMessage objectMessage = session.createObjectMessage((Serializable) user);
原因:
對象消息,發送的對象必須實現序列化接口。
Caused by: java.lang.ClassNotFoundException: Forbidden class com.mozq.activemq.domain.User! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
錯誤代碼:
消息消費者中,接收到消息報錯。
原因:
http://activemq.apache.org/objectmessage.html
ObjectMessage 對象基於 Java 序列化機制,這個過程是不安全的。這就是為什么從 5.12.2 和 5.13.0 開始,ActiveMQ 強制用戶必須顯式指定可以使用 ObjectMessage 的包。
Security
ObjectMessage objects depend on Java serialization of marshal/unmarshal object payload. This process is generally considered unsafe as malicious payload can exploit the host system. That’s why starting with versions 5.12.2 and 5.13.0, ActiveMQ enforces users to explicitly whitelist packages that can be exchanged using ObjectMessages.
ActiveMQConnectionFactory
. There are two additional methods defined:
-
The
setTrustedPackages()
method allows you to set the list of trusted packages you want to be to unserialize, likeActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); factory.setTrustedPackages(new ArrayList(Arrays.asList("org.apache.activemq.test,org.apache.camel.test".split(","))));
-
The
setTrustAllPackages()
allows you to turn off security check and trust all classes. It’s useful for testing purposes.ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); factory.setTrustAllPackages(true);
如果使用的是配置文件,在 ActiveMQConnectionFactory 中加入參數
<property name="trustAllPackages" value="true"/>
如果使用的是 application.properties
spring:
activemq:
broker-url: tcp://localhost:61616
packages:
trusted: com.mozq.activemq.domain,com.mozq.activemq.entity
# trust-all: true
查看官方 application.properties 屬性說明
https://docs.spring.io/spring-boot/docs/2.1.7.RELEASE/reference/html/common-application-properties.html
spring.activemq.packages.trust-all= # Whether to trust all packages. 布爾值。
spring.activemq.packages.trusted= # Comma-separated list of specific packages to trust (when not trusting all packages). 逗號分隔的包名列表。
public class ActiveMQConnectionFactory{
public void setTrustedPackages(List<String> trustedPackages) {
this.trustedPackages = trustedPackages;
}
public void setTrustAllPackages(boolean trustAllPackages) {
this.trustAllPackages = trustAllPackages;
}
}
public class ActiveMQProperties {
private final ActiveMQProperties.Packages packages = new ActiveMQProperties.Packages();
public static class Packages {
private Boolean trustAll;
private List<String> trusted = new ArrayList();
public void setTrustAll(Boolean trustAll) {
this.trustAll = trustAll;
}
public void setTrusted(List<String> trusted) {
this.trusted = trusted;
}
}
}
Caused by: java.io.NotSerializableException: com.mozq.activemq.domain.LinkMan
原因:
使用 ObjectMessage ,如果被發送的對象的引用鏈上的所有對象都必須實現序列化。Customer 對象中含有 LinkMan 對象,引用鏈有2個對象,只有 Customer 對象實現了序列化,而 LinkMan 對象沒有實現序列化,報錯。必須都實現序列化。
文章
MS五種消息的發送/接收的例子
https://chenjumin.iteye.com/blog/687124
application.properties 屬性
https://docs.spring.io/spring-boot/docs/2.1.7.RELEASE/reference/html/common-application-properties.html
ObjectMessage 消息的安全性
http://activemq.apache.org/objectmessage.html
https://www.cnblogs.com/zhuoqingsen/p/MQ.html
https://www.cnblogs.com/elvinle/p/8457596.html
第十七章:springboot 整合 activeMQ