https://blog.csdn.net/qincidong/article/details/76114434
SpringBoot集成activeMQ
1.添加依賴:
<!-- activemq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
2.在application.properties中加入activemq的配置
spring.activemq.broker-url=tcp://192.168.74.135:61616 spring.activemq.user=admin spring.activemq.password=admin spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=50 spring.activemq.pool.expiry-timeout=10000 spring.activemq.pool.idle-timeout=30000
- 1
- 2
- 3
- 4
- 5
- 6
- 7
3.創建一個消息生產者
@Component public class JMSProducer { @Autowired private JmsTemplate jmsTemplate; public void sendMessage(Destination destination,String message) { this.jmsTemplate.convertAndSend(destination,message); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
4.創建一個消息消費者
@Component public class JMSConsumer { private final static Logger logger = LoggerFactory.getLogger(JMSConsumer.class); @JmsListener(destination = "springboot.queue.test") public void receiveQueue(String msg) { logger.info("接收到消息:{}",msg); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
5.測試類
public class JmsTest extends BaseTest{ @Autowired private JMSProducer jmsProducer; @Test public void testJms() { Destination destination = new ActiveMQQueue("springboot.queue.test"); for (int i=0;i<10;i++) { jmsProducer.sendMessage(destination,"hello,world!" + i); } } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
BaseTest代碼如下:
@RunWith(SpringRunner.class) @SpringBootTest(classes = com.sample.activity.web.Application.class) public abstract class BaseTest { }
- 1
- 2
- 3
- 4
6.發送和接收TOPIC消息
默認只能發送和接收queue消息,如果要發送和接收topic消息,需要在application.properties文件中加入:
spring.jms.pub-sub-domain=true
- 1
發送和接收的代碼同queue一樣。
但是這樣有另外一個問題:無法發送和接收queue消息。那么如何同時支持發送和接收queue/topic消息呢?
7.支持同時發送和接收queue/topic
i. 新建一個JMS的配置類:
@Configuration public class JmsConfig { public final static String TOPIC = "springboot.topic.test"; public final static String QUEUE = "springboot.queue.test"; @Bean public Queue queue() { return new ActiveMQQueue(QUEUE); } @Bean public Topic topic() { return new ActiveMQTopic(TOPIC); } // topic模式的ListenerContainer @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(activeMQConnectionFactory); return bean; } // queue模式的ListenerContainer @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(activeMQConnectionFactory); return bean; } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
ii. 消息消費者的代碼改成如下:
@Component public class JMSConsumer { private final static Logger logger = LoggerFactory.getLogger(JMSConsumer.class); @JmsListener(destination = JmsConfig.TOPIC,containerFactory = "jmsListenerContainerTopic") public void onTopicMessage(String msg) { logger.info("接收到topic消息:{}",msg); } @JmsListener(destination = JmsConfig.QUEUE,containerFactory = "jmsListenerContainerQueue") public void onQueueMessage(String msg) { logger.info("接收到queue消息:{}",msg); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
可以看到,這里指定了ConnectionFactory。
iii. 測試類:
public class JmsTest extends BaseTest{ @Autowired private JMSProducer jmsProducer; @Autowired private Topic topic; @Autowired private Queue queue; @Test public void testJms() { for (int i=0;i<10;i++) { jmsProducer.sendMessage(queue,"queue,world!" + i); jmsProducer.sendMessage(topic, "topic,world!" + i); } } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
springboot中activemq的一些配置屬性參考:springboot activemq配置屬性