引入依賴:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.1.5.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-client -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-pool -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.10</version>
</dependency>
yml配置:
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
pool:
enabled: true
packages:
trust-all: true
jms:
pub-sub-domain: false
mq:
queue: q
topic: t
配置類:
package com.dc.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;
/**
* @author :llf
* @date :Created in 2019-11-23 17:14
* @description:${description}
* @version: v1.0
*/
@Component
public class ActiveMqConfig {
@Autowired
private Environment env;
private String mqqueuename;
private String mqtopicname;
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(env.getProperty("spring.activemq.broker-url"));
connectionFactory.setUserName(env.getProperty("spring.activemq.user"));
connectionFactory.setPassword(env.getProperty("spring.activemq.password"));
return connectionFactory;
}
@Bean
public JmsTemplate genJmsTemplate() {
return new JmsTemplate(connectionFactory());
}
@Bean
public JmsMessagingTemplate jmsMessageTemplate() {
return new JmsMessagingTemplate(connectionFactory());
}
@Bean
public Topic topic() {
if(mqtopicname == null){
mqtopicname = "topic";
}
return new ActiveMQTopic(mqtopicname);
}
@Bean
public Queue queue() {
if (mqqueuename == null){
mqqueuename = "queue";
}
return new ActiveMQQueue(mqqueuename);
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
public String getMqqueuename() {
return mqqueuename;
}
public void setMqqueuename(String mqqueuename) {
this.mqqueuename = mqqueuename;
}
public String getMqtopicname() {
return mqtopicname;
}
public void setMqtopicname(String mqtopicname) {
this.mqtopicname = mqtopicname;
}
}
發送和訂閱類
@Autowired
private ActiveMqConfig activeMqConfig;
/**
* queue生產者
* @return
*/
@RequestMapping("/sendQueue")
public String sendQueue() {
String message = "queue生產者信息。。。。。。。。。。。";
activeMqConfig.setMqqueuename("q");
activeMqConfig.jmsMessageTemplate().convertAndSend(activeMqConfig.queue(), message);
return "消息發送成功!message=" + message;
}
/*
* 發送 主題消息
*/
@RequestMapping("/sendTopic")
public String sendTopic() {
String message = "topic生產者信息。。。。。。。。。。。";
activeMqConfig.setMqtopicname("t");
activeMqConfig.jmsMessageTemplate().convertAndSend(activeMqConfig.topic(),message);
return "topic生產者消息發送成功!message=" + message;
}
/**
* queue消費者
* @param message
*/
@JmsListener(destination = "${mq.queue}" )
public void readActiveQueue(String message) {
System.out.println("queue消費者接受到:" + message);
}
@JmsListener(destination = "${mq.topic}",containerFactory = "jmsListenerContainerTopic")
public void readActiveTopic1(String message) {
System.out.println("topic消費者接受到:" + message);
}
總結:
queue支持存在多個消費者,對一個消息而言只能被消費一次,如果這個消息未被消費,依舊會保存,直到被消費。
topic可被多個消費者消費,生產者發送一條消息,不管是否被消費都不會被保存。
