點對點(P2P)
結構
創建生產者和消費者兩個springboot工程
導入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
生產者
步驟一:application.properties文件
spring.activemq.broker-url=tcp://127.0.0.1:61616 spring.activemq.user=admin spring.activemq.password=admin server.port=8080
步驟二:創建生產者類
package com.wn.p2p; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Component public class P2Pprovider { //注入JMSTemplate @Resource private JmsTemplate jmsTemplate; public void setMessage(){ //開啟持久化操作 jmsTemplate.setDeliveryMode(2); jmsTemplate.setExplicitQosEnabled(true); jmsTemplate.setDeliveryPersistent(true); //點對點創建隊列 ActiveMQQueue queue=new ActiveMQQueue("boot_queue"); jmsTemplate.convertAndSend(queue,"消息123"); } }
步驟三:創建controller
package com.wn.p2p; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class P2PController { @Resource private P2Pprovider p2Pprovider; @RequestMapping("/setMessage") public String setMessage(){ p2Pprovider.setMessage(); return "success"; } }
步驟四:啟動
這里可以試一下讓ActiveMQ服務器宕機,然后讓服務器重啟,看看數據有沒有做持久化的操作
結論:當服務器宕機,重啟服務器之后,沒有被消費的消息依然在數據庫中,這樣就做到了持久化操作。
消費者
步驟一:application.properties文件
spring.activemq.broker-url=tcp://127.0.0.1:61616 spring.activemq.user=admin spring.activemq.password=admin server.port=8081
步驟二:創建消費者類
package com.wn; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.jms.annotation.JmsListener; import javax.jms.JMSException; import javax.jms.TextMessage; @SpringBootApplication public class P2pConsumerApplication { public static void main(String[] args) { SpringApplication.run(P2pConsumerApplication.class, args); } @JmsListener(destination = "boot_queue") public void getMessage(TextMessage message) throws JMSException { System.out.println("接收到的信息:"+message.getText()); } }
步驟三:啟動
結論:被消費的消息,將不會做持久化操作,就會從隊列中清除。
發布/訂閱(Pub/Sub)
結構
導入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
消費者
步驟一:application.properties文件
spring.activemq.broker-url=tcp://127.0.0.1:61616 spring.activemq.user=admin spring.activemq.password=admin server.port=8082 #topic配置 #spring.jms.pub-sub-domain=true
步驟二:創建消費者類
package com.wn; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.listener.DefaultMessageListenerContainer; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.TextMessage; @SpringBootApplication public class PubSubConsumerApplication { public static void main(String[] args) { SpringApplication.run(PubSubConsumerApplication.class, args); } //不進行數據消費的,但是數據可以持久化 @Bean(name = "topicListenerFactory") public JmsListenerContainerFactory<DefaultMessageListenerContainer> topicListenerFactory(ConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setSubscriptionDurable(true);// Set this to "true" to register a durable subscription, factory.setClientId("A"); factory.setConnectionFactory(connectionFactory); return factory; } //消費者消費消息 @JmsListener(destination = "boot_topic",containerFactory = "topicListenerFactory") public void getMessage(TextMessage message) throws JMSException { System.out.println("接收到消息:"+message.getText()); } }
生產者
步驟一:applicationContext.properties文件
spring.activemq.broker-url=tcp://127.0.0.1:61616 spring.activemq.user=admin spring.activemq.password=admin server.port=8083
spring.jms.pub-sub-domain=true
步驟二:創建生產者類
package com.wn.pub_sub; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Component public class Pub_Sub_Provider { @Resource private JmsTemplate jmsTemplate; public void setMessage(){ //創建主題 ActiveMQTopic topic=new ActiveMQTopic("boot_topic");; //發送消息 jmsTemplate.convertAndSend(topic,"消息456"); } }
步驟三:創建controller
package com.wn.pub_sub; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class Pub_Sub_Controller { @Resource private Pub_Sub_Provider provider; @RequestMapping("/setMessage") public String setMessage(){ provider.setMessage(); return "success"; } }
步驟四:啟動