1.springboot+ActiveMQ


1.項目結構如下

 

 

 pom.xml文件如下

 1  <dependencies>
 2     <dependency>
 3       <groupId>junit</groupId>
 4       <artifactId>junit</artifactId>
 5       <version>4.12</version>
 6     </dependency>
 7     <dependency>
 8       <groupId>org.springframework.boot</groupId>
 9       <artifactId>spring-boot-starter-activemq</artifactId>
10     </dependency>
11 
12     <!--消息隊列連接池-->
13     <!--<dependency>-->
14       <!--<groupId>org.apache.activemq</groupId>-->
15       <!--<artifactId>activemq-pool</artifactId>-->
16       <!--<version>5.15.0</version>-->
17     <!--</dependency>-->
18   </dependencies>
pom.xml

 

2.創建消息提供者

 

pom.xml文件如下

 1  <dependencies>
 2     <dependency>
 3       <groupId>junit</groupId>
 4       <artifactId>junit</artifactId>
 5       <version>4.12</version>
 6     </dependency>
 7     <dependency>
 8       <groupId>org.springframework.boot</groupId>
 9       <artifactId>spring-boot-starter-activemq</artifactId>
10     </dependency>
11 
12     <!--消息隊列連接池-->
13     <!--<dependency>-->
14       <!--<groupId>org.apache.activemq</groupId>-->
15       <!--<artifactId>activemq-pool</artifactId>-->
16       <!--<version>5.15.0</version>-->
17     <!--</dependency>-->
18   </dependencies>
pom.xml

 

 

 2.1 創建屬性文件如下:

 1 server.port=8080
 2 server.servlet.context-path=/pro
 3 spring.activemq.user=admin
 4 spring.activemq.password=admin
 5 #集群配置
 6 #spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
 7 #整合jms測試,安裝在別的機器,防火牆和端口號記得開放
 8 spring.activemq.broker-url=tcp://192.168.117.152:61617
 9 queueName=publish.queue
10 topicName=publish.topic
11 
12 
13 
14 #ActiveMQ是消息隊列技術,為解決高並發問題而生
15 #ActiveMQ生產者消費者模型(生產者和消費者可以跨平台、跨系統)
16 #ActiveMQ支持如下兩種消息傳輸方式
17 #點對點模式,生產者生產了一個消息,只能由一個消費者進行消費
18 #發布/訂閱模式,生產者生產了一個消息,可以由多個消費者進行消費
application.properties

 

2.2 定義消息隊列類ActiveMQConfig.java

 1 package cn.kgc.config;
 2 
 3 import org.apache.activemq.ActiveMQConnectionFactory;
 4 import org.apache.activemq.command.ActiveMQQueue;
 5 import org.apache.activemq.command.ActiveMQTopic;
 6 import org.springframework.beans.factory.annotation.Value;
 7 import org.springframework.context.annotation.Bean;
 8 import org.springframework.context.annotation.Configuration;
 9 import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
10 import org.springframework.jms.config.JmsListenerContainerFactory;
11 
12 import javax.jms.Queue;
13 import javax.jms.Topic;
14 //定義消息隊列
15 @Configuration
16 public class ActiveMQConfig {
17 
18     @Value("${queueName}")
19     private String queueName;
20 
21     @Value("${topicName}")
22     private String topicName;
23 
24     @Value("${spring.activemq.user}")
25     private String usrName;
26 
27     @Value("${spring.activemq.password}")
28     private  String password;
29 
30     @Value("${spring.activemq.broker-url}")
31     private  String brokerUrl;
32 
33     //定義存放消息的隊列
34     @Bean
35     public Queue queue(){
36         return new ActiveMQQueue(queueName);
37     }
38 
39     @Bean
40     public Topic topic(){
41         return new ActiveMQTopic(topicName);
42     }
43 
44     @Bean
45     public ActiveMQConnectionFactory connectionFactory() {
46         return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
47     }
48 
49     @Bean
50     public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
51         DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
52         bean.setConnectionFactory(connectionFactory);
53         return bean;
54     }
55 
56     @Bean
57     public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
58         DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
59         //設置為發布訂閱方式, 默認情況下使用的生產消費者方式
60         bean.setPubSubDomain(true);
61         bean.setConnectionFactory(connectionFactory);
62         return bean;
63     }
64 }
ActiveMQConfig.java

 

2.3 定義消息隊列類PublishController.java

 1 package cn.kgc.controller;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.jms.annotation.JmsListener;
 5 import org.springframework.jms.core.JmsMessagingTemplate;
 6 import org.springframework.web.bind.annotation.RequestMapping;
 7 import org.springframework.web.bind.annotation.RequestParam;
 8 import org.springframework.web.bind.annotation.RestController;
 9 
10 import javax.jms.Queue;
11 import javax.jms.Topic;
12 
13 @RestController
14 @RequestMapping("/publish")
15 public class PublishController {
16 
17     //注入springboot封裝的工具類
18     @Autowired
19     private JmsMessagingTemplate jms;
20 
21     //注入存放消息的隊列,用於下列方法一
22     @Autowired
23     private Queue queue;
24 
25     @Autowired
26     private Topic topic;
27 
28     @RequestMapping("/queue")
29     public String queue(){
30 
31         for (int i = 0; i < 10 ; i++){
32 
33             //方法一:添加消息到消息隊列
34             jms.convertAndSend(queue, "queue"+i);
35         }
36 
37         return "queue 發送成功";
38     }
39 
40     @JmsListener(destination = "out.queue")
41     public void consumerMsg(String msg){
42         System.out.println(msg);
43     }
44 
45     @RequestMapping("/topic")
46     public String topic(){
47 
48         for (int i = 0; i < 10 ; i++){
49             jms.convertAndSend(topic, "topic"+i);
50         }
51 
52         return "topic 發送成功";
53     }
54 }
PublishController.java

 

2.4 啟動類

 1 package cn.kgc;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 import org.springframework.jms.annotation.EnableJms;
 6 //啟動消息隊列
 7 @EnableJms
 8 @SpringBootApplication
 9 public class ProviderApplication {
10 
11     public static void main(String[] args) {
12         SpringApplication.run(ProviderApplication.class, args);
13     }
14 
15 }
ProviderApplication.java

 

3.創建調用者項目a

 

 

 

3.1 編輯屬性文件application.properties

1 server.port=8081
2 server.servlet.context-path=/cona
3 spring.activemq.user=admin
4 spring.activemq.password=admin
5 spring.activemq.broker-url=tcp://192.168.117.152:61617
6 queueName=publish.queue
7 topicName=publish.topic
application.properties

 

3.2 消息隊列參數類ActiveMQConfig.java

 1 package cn.kgc.config;
 2 
 3 import org.apache.activemq.ActiveMQConnectionFactory;
 4 import org.apache.activemq.command.ActiveMQQueue;
 5 import org.apache.activemq.command.ActiveMQTopic;
 6 import org.springframework.beans.factory.annotation.Value;
 7 import org.springframework.context.annotation.Bean;
 8 import org.springframework.context.annotation.Configuration;
 9 import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
10 import org.springframework.jms.config.JmsListenerContainerFactory;
11 
12 import javax.jms.Queue;
13 import javax.jms.Topic;
14 
15 @Configuration
16 public class ActiveMQConfig {
17 
18     @Value("${queueName}")
19     private String queueName;
20 
21     @Value("${topicName}")
22     private String topicName;
23 
24     @Value("${spring.activemq.user}")
25     private String usrName;
26 
27     @Value("${spring.activemq.password}")
28     private  String password;
29 
30     @Value("${spring.activemq.broker-url}")
31     private  String brokerUrl;
32 
33     @Bean
34     public Queue queue(){
35         return new ActiveMQQueue(queueName);
36     }
37 
38     @Bean
39     public Topic topic(){
40         return new ActiveMQTopic(topicName);
41     }
42 
43     @Bean
44     public ActiveMQConnectionFactory connectionFactory() {
45         return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
46     }
47 
48     @Bean
49     public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
50         DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
51         bean.setConnectionFactory(connectionFactory);
52         return bean;
53     }
54 
55     @Bean
56     public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
57         DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
58         //設置為發布訂閱方式, 默認情況下使用的生產消費者方式
59         bean.setPubSubDomain(true);
60         bean.setConnectionFactory(connectionFactory);
61         return bean;
62     }
63 }
ActiveMQConfig.java

 

3.3 隊列監聽

 1 package cn.kgc.listener;
 2 
 3 import org.springframework.jms.annotation.JmsListener;
 4 import org.springframework.messaging.handler.annotation.SendTo;
 5 import org.springframework.stereotype.Component;
 6 
 7 @Component
 8 public class QueueListener {
 9     @JmsListener(destination = "publish.queue", containerFactory = "jmsListenerContainerQueue")
10     @SendTo("out.queue")
11     public String receive(String text){
12         System.out.println("QueueListener: consumer-a 收到一條信息: " + text);
13         return "consumer-a received : " + text;
14     }
15 }
QueueListener.java

 

3.4 消息主題監聽

 1 package cn.kgc.listener;
 2 
 3 import org.springframework.jms.annotation.JmsListener;
 4 import org.springframework.stereotype.Component;
 5 
 6 @Component
 7 public class TopocListener {
 8 
 9     @JmsListener(destination = "publish.topic", containerFactory = "jmsListenerContainerTopic")
10     public void receive(String text){
11         System.out.println("TopicListener: consumer-a 收到一條信息: " + text);
12     }
13 }
TopocListener.java

 

3.5 控制類 PublishController.java

 1 package cn.kgc.controller;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.jms.annotation.JmsListener;
 5 import org.springframework.jms.core.JmsMessagingTemplate;
 6 import org.springframework.web.bind.annotation.RequestMapping;
 7 import org.springframework.web.bind.annotation.RestController;
 8 
 9 import javax.jms.Queue;
10 import javax.jms.Topic;
11 
12 @RestController
13 @RequestMapping("/publish")
14 public class PublishController {
15     @Autowired
16     private JmsMessagingTemplate jms;
17 
18     @Autowired
19     private Queue queue;
20 
21     @Autowired
22     private Topic topic;
23 
24     @RequestMapping("/queue")
25     public String queue(){
26 
27         for (int i = 0; i < 10 ; i++){
28             jms.convertAndSend(queue, "queue"+i);
29         }
30 
31         return "queue 發送成功";
32     }
33 
34     @JmsListener(destination = "out.queue")
35     public void consumerMsg(String msg){
36         System.out.println(msg);
37     }
38 
39     @RequestMapping("/topic")
40     public String topic(){
41 
42         for (int i = 0; i < 10 ; i++){
43             jms.convertAndSend(topic, "topic"+i);
44         }
45 
46         return "topic 發送成功";
47     }
48 }
PublishController.java

 

4.創建調用者項目b

 

 

4.1 編輯屬性文件application.properties

1 server.port=8082
2 server.servlet.context-path=/conb
3 spring.activemq.user=admin
4 spring.activemq.password=admin
5 spring.activemq.broker-url=tcp://192.168.117.152:61617
6 queueName=publish.queue
7 topicName=publish.topic
application.properties

 

4.2 消息隊列參數類ActiveMQConfig.java

ActiveMQConfig.java

 

4.3 隊列監聽

 1 package cn.kgc.listener;
 2 
 3 import org.springframework.jms.annotation.JmsListener;
 4 import org.springframework.messaging.handler.annotation.SendTo;
 5 import org.springframework.stereotype.Component;
 6 
 7 @Component
 8 public class QueueListener {
 9     @JmsListener(destination = "publish.queue", containerFactory = "jmsListenerContainerQueue")
10     @SendTo("out.queue")
11     public String receive(String text){
12         System.out.println("QueueListener: consumer-b 收到一條信息: " + text);
13         return "consumer-a received : " + text;
14     }
15 }
QueueListener.java

 

4.4 消息主題監聽

 1 package cn.kgc.listener;
 2 
 3 import org.springframework.jms.annotation.JmsListener;
 4 import org.springframework.stereotype.Component;
 5 
 6 @Component
 7 public class TopocListener {
 8 
 9     @JmsListener(destination = "publish.topic", containerFactory = "jmsListenerContainerTopic")
10     public void receive(String text){
11         System.out.println("TopicListener: consumer-b 收到一條信息: " + text);
12     }
13 }
TopocListener.java

 

4.5 控制類 PublishController.java

 1 package cn.kgc.controller;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.jms.annotation.JmsListener;
 5 import org.springframework.jms.core.JmsMessagingTemplate;
 6 import org.springframework.web.bind.annotation.RequestMapping;
 7 import org.springframework.web.bind.annotation.RestController;
 8 
 9 import javax.jms.Queue;
10 import javax.jms.Topic;
11 
12 @RestController
13 @RequestMapping("/publish")
14 public class PublishController {
15     @Autowired
16     private JmsMessagingTemplate jms;
17 
18     @Autowired
19     private Queue queue;
20 
21     @Autowired
22     private Topic topic;
23 
24     @RequestMapping("/queue")
25     public String queue(){
26 
27         for (int i = 0; i < 10 ; i++){
28             jms.convertAndSend(queue, "queue"+i);
29         }
30 
31         return "queue 發送成功";
32     }
33 
34     @JmsListener(destination = "out.queue")
35     public void consumerMsg(String msg){
36         System.out.println(msg);
37     }
38 
39     @RequestMapping("/topic")
40     public String topic(){
41 
42         for (int i = 0; i < 10 ; i++){
43             jms.convertAndSend(topic, "topic"+i);
44         }
45 
46         return "topic 發送成功";
47     }
48 }
PublishController.java

 

請求:http://localhost:8080/pro/publish/queue

 

Number Of Pending Messages:消息隊列中待處理的消息
Number Of Consumers:消費者的數量
Messages Enqueued:累計進入過消息隊列的總量
Messages Dequeued:累計消費過的消息總量

 

 

 

 

 

 

 

 

 請求:http://localhost:8080/pro/publish/topic

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM