一、點對點
1、提供者目錄展示
2、導入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- spring boot web支持:mvc,aop... --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
3、生產者application.yml配置文件
4、生產者MyProvider,通過JMSTemplate模板發送消息
package com.zn.p2p; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * 生產者,通過JMSTemplate模板發送消息 */ @Component public class MyProvider { //注入JMSTemplate模板 @Resource private JmsTemplate jmsTemplate; //創建方法 public void sendMessage(){ //點對點,創建隊列 ActiveMQQueue queue=new ActiveMQQueue("SpringBoot_Queue"); //發送消息 jmsTemplate.convertAndSend(queue,"生產者產生的消息!"); } }
5、客戶端訪問ProviderController
package com.zn.p2p; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * 客戶端訪問的方法 */ @RestController public class ProvideController { @Resource private MyProvider provider; @RequestMapping("/sendMessage") public String sendMessage(){ provider.sendMessage(); return "sucess!!"; } }
6、provider啟動類StartProvider
package com.zn; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StartProvider { public static void main(String[] args) { SpringApplication.run(StartProvider.class,args); } }
7、消費者目錄展示
8、導入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- spring boot web支持:mvc,aop... --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
9、消費者application.yml配置文件
10、consumer啟動類StartP2PConsumer
package com.zn; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.annotation.JmsListeners; import javax.jms.JMSException; import javax.jms.TextMessage; @SpringBootApplication public class StartP2PConsumer { public static void main(String[] args) { SpringApplication.run(StartP2PConsumer.class,args); } //消費者消費 @JmsListener(destination = "SpringBoot_Queue") public void getMessage(TextMessage message) throws JMSException { System.out.println("消費者獲取到消息:"+message.getText()); } }
11、啟動提供者並訪問
12、啟動消費者
二、發布訂閱
1、消費者目錄展示
2、導入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- spring boot web支持:mvc,aop... --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
3、消費者application.yml配置文件
4、consumer啟動類StartTopicConsumer
package com.zn; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.TextMessage; @SpringBootApplication public class StartTopicConsumer { public static void main(String[] args) { SpringApplication.run(StartTopicConsumer.class,args); } //springboot默認只配置queue類型消息,如果要使用topic類型的消息,則需要配置該bean @Bean public JmsListenerContainerFactory jmsTopicListenerContainerFactory(ConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //這里必須設置為true,false則表示是queue類型 factory.setPubSubDomain(true); return factory; } //消費者消費 destination隊列或者主題的名字 @JmsListener(destination = "SpringBoot_Topic",containerFactory = "jmsTopicListenerContainerFactory") public void getMessage(TextMessage message) throws JMSException { System.out.println("消費者獲取到消息:"+message.getText()); } }
5、提供者目錄展示
6、導入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- spring boot web支持:mvc,aop... --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
7、TopicProvider
package com.zn.topic; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * 生產者,通過JMSTemplate模板發送消息 */ @Component public class TopicProvider { //注入JMSTemplate模板 @Resource private JmsTemplate jmsTemplate; //創建方法 public void sendMessage(){ //發布訂閱,創建主題 ActiveMQTopic topic=new ActiveMQTopic("SpringBoot_Topic"); //springboot默認是queue jmsTemplate.setPubSubDomain(true); //發送消息 jmsTemplate.convertAndSend(topic,"生產者產生topic的消息"); } }
8、ProvideController
package com.zn.controller; import com.zn.p2p.MyProvider; import com.zn.topic.TopicProvider; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * 客戶端訪問的方法 */ @RestController public class ProvideController { //topic調用 @Resource private TopicProvider topicProvider; @RequestMapping("/sendMessage") public String sendMessage(){ topicProvider.sendMessage(); return "success"; } }
9、啟動消費者訂閱消息
10、啟動生產者
11、消費者控制台效果
三、SpringBoot整合ActiveMQ開啟持久化---隊列持久化
1、步驟1、2、3同上p2p
4、生產者MyProvider,通過JMSTemplate模板發送消息
package com.zn.p2p; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * 生產者,通過JMSTemplate模板發送消息 */ @Component public class MyProvider { //注入JMSTemplate模板 @Resource private JmsTemplate jmsTemplate; //創建方法 public void sendMessage(){ //開啟持久化 jmsTemplate.setDeliveryMode(2); jmsTemplate.setExplicitQosEnabled(true); jmsTemplate.setDeliveryPersistent(true); //點對點,創建隊列 ActiveMQQueue queue=new ActiveMQQueue("SpringBoot_Queue"); //發送消息 jmsTemplate.convertAndSend(queue,"生產者產生的消息"); } }
5、步驟5、6、7、8、9、10同上
11、開啟生產者
12、開啟消費者
這里可以試一下讓ActiveMQ服務器宕機,然后讓服務器重啟,看看數據有沒有做持久化的操作
13、結論:當服務器宕機,重啟服務器之后,沒有被消費的消息依然在數據庫中,這樣就做到了持久化操作。
四、SpringBoot整合ActiveMQ開啟持久化---主題持久化
不會進行數據消費的,但是數據可以持久化
1、步驟1、2、3同上
4、consumer啟動類StartTopicConsumer
package com.zn; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.listener.DefaultMessageListenerContainer; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Session; import javax.jms.TextMessage; @SpringBootApplication public class StartTopicConsumer { public static void main(String[] args) { SpringApplication.run(StartTopicConsumer.class,args); }
@Bean(name = "topicListenerFactory") public JmsListenerContainerFactory<DefaultMessageListenerContainer> topicListenerFactory(ConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setSubscriptionDurable(true);// Set this to "true" to register a durable subscription, factory.setClientId("A"); factory.setConnectionFactory(connectionFactory); return factory; } //消費者消費 destination隊列或者主題的名字 @JmsListener(destination = "boot_topic",containerFactory = "topicListenerFactory") public void getMessage(TextMessage message, Session session) throws JMSException { System.out.println("消費者獲取到消息:"+message.getText()); } }
9、步驟5、6、7、8同上
10、啟動消費者
11、啟動生產者
12、消費者控制台