springboot 整合 ActiveMQ


springboot 整合 ActiveMQ

pom 坐標

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.6.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <!-- 整合 activemq -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>

    <!-- 這里我使用的是測試發送消息 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

yml 文件

server:
  port: 8080
spring:
  activemq:
    broker-url: tcp://localhost:61616
    packages:
      trusted: com.mozq.activemq.domain,com.mozq.activemq.entity,java.util
#      trust-all: true

消息生產者

使用 springboot 測試類實現

ObjectMessage 被發送的對象必須實現序列化。如果嵌套,都要實現序列化。Customer 對象有個成員是 LinkMan 對象,則 Customer 和 LinkMan 必須都實現序列化。發送集合對象。List<User> List 實現了序列化,其中的元素 User 也必須實現序列化。

package com.mozq.activemq;

import com.mozq.activemq.domain.Customer;
import com.mozq.activemq.domain.LinkMan;
import com.mozq.activemq.domain.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import javax.jms.*;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.*;

@RunWith(SpringRunner.class)
@SpringBootTest(classes= SmsApplication.class)
public class DemoTest {

    @Autowired
    private JmsTemplate jmsTemplate;

    /**
     * TextMessage 常用,可以處理字符串和對象。如果是對象需要轉為 json 字符串。
     */
    @Test
    public void sendText(){
        /*
        String message = "這是測試短信";
        jmsTemplate.convertAndSend("mozq.queue.text", message);
        */
        jmsTemplate.send("mozq.queue.text", session -> {
            TextMessage textMessage = session.createTextMessage("這是測試短信");
            return textMessage;
        });
    }

    /**
     * ObjectMessage 不安全,也挺常用的。
     */
    @Test
    public void sendObject(){
        List<String> strList = new ArrayList<>();
        strList.add("長河");
        strList.add("落日");

        jmsTemplate.convertAndSend("mozq.queue.obj", strList);
        /*
        jmsTemplate.send("mozq.queue.obj", session -> {
            ObjectMessage objectMessage = session.createObjectMessage((Serializable) strList);
            return objectMessage;
        });
        */
    }

    @Test
    public void sendObject2(){
        User user = new User();
        user.setName("曹操");
        user.setAge(20);
        jmsTemplate.send("mozq.queue.user", session -> {
            ObjectMessage objectMessage = session.createObjectMessage((Serializable) user);
            return objectMessage;
        });
    }

    /**
     * Customer 對象中有成員 LinkMan 對象。則 Customer 和 LinkMan 必須都實現序列化。
     */
    @Test
    public void sendObject3(){
        LinkMan linkMan = new LinkMan();
        linkMan.setName("關羽");
        Customer customer = new Customer();
        customer.setName("諸葛");
        customer.setLinkMan(linkMan);

        jmsTemplate.send("mozq.queue.customer", session -> {
            ObjectMessage objectMessage = session.createObjectMessage((Serializable) customer);
            return objectMessage;
        });
    }

    @Test
    public void sendMap(){
        Map<String, Object> user = new HashMap<>();
        user.put("name", "劉備");
        user.put("age", 18);
        jmsTemplate.convertAndSend("mozq.queue.map", user);

//        jmsTemplate.send("mozq.queue.map", session -> {
//            MapMessage mapMessage = session.createMapMessage();
//            mapMessage.setObject("name", "劉備");
//            mapMessage.setInt("age", 18);
//            return mapMessage;
//        });
    }

    /**
     * 特點提供了各種數據類型讀寫方法,讀取順序必須和寫入順序一致
     */
    @Test
    public void testBytes(){

        jmsTemplate.send("mozq.queue.bytes", session -> {
            try {
                BytesMessage bytesMessage = session.createBytesMessage();
                bytesMessage.writeLong(100L);
                bytesMessage.writeBytes("字節流".getBytes("UTF-8"));
                return bytesMessage;
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            return null;
        });

    }

    /**
     * 特點提供了各種數據類型讀寫方法,讀取順序必須和寫入順序一致
     */
    @Test
    public void testStream(){
        jmsTemplate.send("mozq.queue.stream", session -> {
            try {
                StreamMessage streamMessage = session.createStreamMessage();
                streamMessage.writeString("孫權");
                streamMessage.writeString("大喬");
                streamMessage.writeBytes("赤壁".getBytes("UTF-8"));
                return streamMessage;
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            return null;
        });
    }

}

消息消費者

package com.mozq.activemq.listener;

import com.mozq.activemq.domain.Customer;
import com.mozq.activemq.domain.User;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.*;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import java.util.List;

@Component
public class JmsQueueListener {

    @JmsListener(destination = "mozq.queue.text")
    public void receiveQueue(TextMessage textMessage){
        try {
            String text = textMessage.getText();
            System.out.println(text);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @JmsListener(destination = "mozq.queue.obj")
    public void receiveQueue(ObjectMessage objectMessage){
        try {
            List<String>  list = (List<String>) objectMessage.getObject();
            System.out.println(list);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @JmsListener(destination = "mozq.queue.user")
    public void receiveQueue2(ObjectMessage objectMessage){
        try {
            User user = (User) objectMessage.getObject();
            System.out.println(user);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @JmsListener(destination = "mozq.queue.customer")
    public void receiveQueue3(ObjectMessage objectMessage){
        try {
            Customer customer = (Customer) objectMessage.getObject();
            System.out.println(customer);//Customer{name='諸葛', linkMan=LinkMan{name='關羽'}}
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @JmsListener(destination = "mozq.queue.map")
    public void receiveQueue(MapMessage mapMessage){
        try {
            Enumeration mapNames = mapMessage.getMapNames();
            while(mapNames.hasMoreElements()){
                Object key = mapNames.nextElement();
                Object value = mapMessage.getObject((String) key);
                System.out.println(key + ":" + value);
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    /**
     * BytesMessage 特點提供了各種數據類型讀寫方法,讀取順序必須和寫入順序一致
     * @param bytesMessage
     */
    @JmsListener(destination = "mozq.queue.bytes")
    public void receiveQueue(BytesMessage bytesMessage){
        try {
            long num = bytesMessage.readLong();
            byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
            bytesMessage.readBytes(bytes);
            String text = new String(bytes, "UTF-8");
            System.out.println(num + ":" + text);
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * StreamMessage 特點提供了各種數據類型讀寫方法,讀取順序必須和寫入順序一致
     * @param streamMessage
     */
    @JmsListener(destination = "mozq.queue.stream")
    public void receiveQueue(StreamMessage streamMessage){
        try {
            String name = streamMessage.readString();
            String wife = streamMessage.readString();
            byte[] bytes = new byte[1024];
            streamMessage.readBytes(bytes);
            String text = new String(bytes, "UTF-8");
            System.out.println(name + ":" + wife + ":" + text);
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

    }

}

實體類 User Customer LinkMan

public class User implements Serializable {
    private  String name;
    private  int age;
}
public class Customer implements Serializable {
    private String name;
    private LinkMan linkMan;
}
// 如果 LinkMan 不實現序列化,用 ObjectMessage 發送 Customer 對象會報錯。
public class LinkMan implements Serializable {
    private String name;
}

bugs

java.lang.ClassCastException: com.mozq.activemq.domain.User cannot be cast to java.io.Serializable
錯誤代碼:
ObjectMessage objectMessage = session.createObjectMessage((Serializable) user);
原因:
	對象消息,發送的對象必須實現序列化接口。
Caused by: java.lang.ClassNotFoundException: Forbidden class com.mozq.activemq.domain.User! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
錯誤代碼:
	消息消費者中,接收到消息報錯。
原因:
	http://activemq.apache.org/objectmessage.html
	ObjectMessage 對象基於 Java 序列化機制,這個過程是不安全的。這就是為什么從 5.12.2 和 5.13.0 開始,ActiveMQ 強制用戶必須顯式指定可以使用 ObjectMessage 的包。
Security
ObjectMessage objects depend on Java serialization of marshal/unmarshal object payload. This process is generally considered unsafe as malicious payload can exploit the host system. That’s why starting with versions 5.12.2 and 5.13.0, ActiveMQ enforces users to explicitly whitelist packages that can be exchanged using ObjectMessages.

ActiveMQConnectionFactory. There are two additional methods defined:

  • The setTrustedPackages() method allows you to set the list of trusted packages you want to be to unserialize, like

    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    factory.setTrustedPackages(new ArrayList(Arrays.asList("org.apache.activemq.test,org.apache.camel.test".split(","))));
    
  • The setTrustAllPackages() allows you to turn off security check and trust all classes. It’s useful for testing purposes.

    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    factory.setTrustAllPackages(true);
    

如果使用的是配置文件,在 ActiveMQConnectionFactory 中加入參數

<property name="trustAllPackages" value="true"/>

如果使用的是 application.properties

spring:
  activemq:
    broker-url: tcp://localhost:61616
    packages:
      trusted: com.mozq.activemq.domain,com.mozq.activemq.entity
#      trust-all: true
查看官方 application.properties 屬性說明
https://docs.spring.io/spring-boot/docs/2.1.7.RELEASE/reference/html/common-application-properties.html

spring.activemq.packages.trust-all= # Whether to trust all packages. 布爾值。
spring.activemq.packages.trusted= # Comma-separated list of specific packages to trust (when not trusting all packages). 逗號分隔的包名列表。
public class ActiveMQConnectionFactory{
    public void setTrustedPackages(List<String> trustedPackages) {
        this.trustedPackages = trustedPackages;
    }

    public void setTrustAllPackages(boolean trustAllPackages) {
        this.trustAllPackages = trustAllPackages;
    }
}
public class ActiveMQProperties { 
    private final ActiveMQProperties.Packages packages = new ActiveMQProperties.Packages();
    
    public static class Packages {
        private Boolean trustAll;
        private List<String> trusted = new ArrayList();

        public void setTrustAll(Boolean trustAll) {
            this.trustAll = trustAll;
        }

        public void setTrusted(List<String> trusted) {
            this.trusted = trusted;
        }
    }
}
Caused by: java.io.NotSerializableException: com.mozq.activemq.domain.LinkMan
原因:
	使用 ObjectMessage ,如果被發送的對象的引用鏈上的所有對象都必須實現序列化。Customer 對象中含有 LinkMan 對象,引用鏈有2個對象,只有 Customer 對象實現了序列化,而 LinkMan 對象沒有實現序列化,報錯。必須都實現序列化。

文章

MS五種消息的發送/接收的例子
https://chenjumin.iteye.com/blog/687124
application.properties 屬性
https://docs.spring.io/spring-boot/docs/2.1.7.RELEASE/reference/html/common-application-properties.html

ObjectMessage 消息的安全性

http://activemq.apache.org/objectmessage.html

關於MQ,你必須知道的 (大牛)

https://www.cnblogs.com/zhuoqingsen/p/MQ.html

springboot與ActiveMQ整合

https://www.cnblogs.com/elvinle/p/8457596.html

第十七章:springboot 整合 activeMQ

https://my.oschina.net/u/3387320/blog/3009301


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM