springboot/springcloud集成ActiveMQ實現消息發送及消費消息


一、ActiveMQ下載安裝

1.進入官網https://activemq.apache.org/下載后解壓即可

2.打開解壓后的文件,根據自己的電腦位數打開activemq.bat即可啟動成功

 3.看到下面界面就表示啟動成功了

 4.在瀏覽器輸入上面網址http://127.0.0.1:8161/admin/進行訪問,默認賬號密碼為 admin  admin 之后就可以看到下面界面:

二、創建Springboot/Springcloud項目實現消息的消費者。

發送消息的兩種方式:

queue:一對一發送

topic:一對多發送

消息的消費者用來監聽消息生產者發來的消息,在已有的項目集成也可以,其結構如下:

1.導入依賴

除Springboot/Springcloud基本必要依賴以外應引入下面兩個依賴

需要注意的是如果你使用的Springboot版本是2.0則引入activemq-pool,如果使用Springboot2.0以上的版本就要引入pooled-jms,我這里使用的Springboot2.2.13,引入的是pooled-jms,它們兩個的區別在於工廠的不同。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
        
<!--springboot2.0  PooledConnectionFactory-->
<!--        <dependency>-->
<!--            <groupId>org.apache.activemq</groupId>-->
<!--            <artifactId>activemq-pool</artifactId>-->
<!--        </dependency>-->

<!--        springboot2.1+  JmsPooledConnectionFactory-->
<dependency>
    <groupId>org.messaginghub</groupId>
    <artifactId>pooled-jms</artifactId>
</dependency>
<dependency>

2.配置文件application.yml

需要注意的是默認端口號為61616,並非顯示網頁的8161端口

spring: activemq: broker-url: tcp://127.0.0.1:61616
 user: admin password: admin pool: enabled: true #是否使用PooledConnectionFactory max-connections: 30 #默認為1 idle-timeout: 10000 #空閑的連接過期時間 默認30s #強制的連接過期時間,與idleTimeout的區別在於: #idleTimeout是在連接空閑一段時間失效, #而expiryTimeout不管當前連接的情況,只要達到指定時間就失效。默認為0 expiry-timeout: 20000

3.編寫配置類Config

import javax.jms.ConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; @EnableJms //開啟監聽
@Configuration public class Config { public final static String TOPIC = "springboot.topic.test"; public final static String QUEUE = "springboot.queue.test"; // topic模式的ListenerContainer
 @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(activeMQConnectionFactory); bean.setPubSubDomain(true); return bean; } // queue模式的ListenerContainer
 @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(activeMQConnectionFactory); return bean; } }

4.編寫消費者類JmsConsumer,接收消息並處理(一般操作數據庫,這里只將其做打印處理)

import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.TextMessage; @Component public class JmsConsumer { @JmsListener(destination = Config.TOPIC,containerFactory = "jmsListenerContainerTopic") public void onMessageTopic(Message message) { //按照自己需求轉換
        TextMessage textMessage = (TextMessage) message; //ObjectMessage objectMessage= (ObjectMessage) message; //String msg= String.valueOf(objectMessage);

        try { String text = textMessage.getText(); System.out.println("接收到的topic消息是:"+text); } catch (JMSException e) { e.printStackTrace(); } } @JmsListener(destination = Config.QUEUE,containerFactory = "jmsListenerContainerQueue") public  void onMessageQueue(String msg){ System.out.println("接收到的queue消息是:"+msg); } }

5.編寫啟動類ActiveMQConsumerApp

import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ActiveMQConsumerApp { public static void main(String[] args) { SpringApplication.run(ActiveMQConsumerApp.class,args); } }

三、創建Springboot/Springcloud項目實現消息的生產者。

發送消息的兩種方式:

queue:一對一發送

topic:一對多發送

消息的生產者用來產生消息並發送消息,在已有的項目集成也可以,其結構如下:

1.添加依賴

除Springboot/Springcloud基本必要依賴以外應引入下面兩個依賴,注意對應自己Springboot版本

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
        
<!--springboot2.0  PooledConnectionFactory-->
<!--        <dependency>-->
<!--            <groupId>org.apache.activemq</groupId>-->
<!--            <artifactId>activemq-pool</artifactId>-->
<!--        </dependency>-->

<!--        springboot2.1+  JmsPooledConnectionFactory-->
<dependency>
    <groupId>org.messaginghub</groupId>
    <artifactId>pooled-jms</artifactId>
</dependency>
<dependency>

2.編寫配置文件application.yml,注意為61616端口

spring: activemq: broker-url: tcp://127.0.0.1:61616
 user: admin password: admin pool: enabled: true #是否使用PooledConnectionFactory max-connections: 30 #默認為1 idle-timeout: 10000 #空閑的連接過期時間 默認30s #強制的連接過期時間,與idleTimeout的區別在於: #idleTimeout是在連接空閑一段時間失效, #而expiryTimeout不管當前連接的情況,只要達到指定時間就失效。默認為0 expiry-timeout: 20000 server: port: 4455

3.編寫配置類MyConfig

import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.jms.Queue; import javax.jms.Topic; @Configuration public class MyConfig { public final static String TOPIC = "springboot.topic.test"; public final static String QUEUE = "springboot.queue.test"; @Bean public Queue queue() { return new ActiveMQQueue(MyConfig.QUEUE); } @Bean public Topic topic() { return new ActiveMQTopic(MyConfig.TOPIC); } }

4.編寫類MyProvider,創建方法用於發送消息

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; @Component public class MyProvider { @Autowired private JmsTemplate jmsTemplate; public void sendMessage(Destination destination, final String text) { this.jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(text); } }); } }

5.編寫Controller類SendMessage調用其發送方法分別用queue和topic方式進行發送消息

import com.star.jms.MyConfig; import com.star.jms.MyProvider; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.jms.Queue; import javax.jms.Topic; @RestController public class SendMessage { @Autowired private MyProvider myProvider; @Autowired private Queue queue; @Autowired private Topic topic; @RequestMapping("/queuesend") public String sendMsg_queue(){ //也可以不寫配置類MyConfig使用下面三種方式獲取queue //1.ActiveMQQueue queue = new ActiveMQQueue(MyConfig.QUEUE); //還得寫配置類MyConfig //2.ActiveMQQueue queue = new ActiveMQQueue("Config.QUEUE"); //Config是消息的消費者里面寫的配置類 //3.ActiveMQQueue queue = new ActiveMQQueue("springboot.queue.test"); //直接寫死
 myProvider.sendMessage(queue,"使用queue發送消息"); return "queue消息已發出,等待消費"; } @RequestMapping("/topicsend") public String sendMsg_topic(){ //也可以不寫配置類MyConfig使用下面三種方式獲取topic //1.ActiveMQQueue topic = new ActiveMQTopic(MyConfig.TOPIC); //還得寫配置類MyConfig //2.ActiveMQQueue topic = new ActiveMQTopic("Config.TOPIC"); //Config是消息的消費者里面寫的配置類 //3.ActiveMQQueue topic = new ActiveMQTopic("springboot.topic.test"); //直接寫死
 myProvider.sendMessage(topic,"使用topic發送消息"); return "topic消息已發出,等待消費"; } }

6.編寫啟動類ActiveMQProduceApp

@SpringBootApplication public class ActiveMQProduceApp { public static void main(String[] args) { SpringApplication.run(ActiveMQProduceApp.class,args); } }

四、啟動測試

1.啟動ActiveMq服務

2.啟動消息消費者ActiveMQConsumerApp進行監聽

3.啟動消息的生產者ActiveMQProduceApp

4.訪問Controller接口進行發送消息

5.在消息消費者ActiveMQConsumerApp可以看到接收到了消息,並做了打印處理

 6.在ActiveMq可視化界面http://127.0.0.1:8161/admin/可以看到queue和topic消息發送和處理情況,賬號密碼都為admin

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM