一、ActiveMQ下载安装
1.进入官网https://activemq.apache.org/下载后解压即可
2.打开解压后的文件,根据自己的电脑位数打开activemq.bat即可启动成功
3.看到下面界面就表示启动成功了
4.在浏览器输入上面网址http://127.0.0.1:8161/admin/进行访问,默认账号密码为 admin admin 之后就可以看到下面界面:
二、创建Springboot/Springcloud项目实现消息的消费者。
发送消息的两种方式:
queue:一对一发送
topic:一对多发送
消息的消费者用来监听消息生产者发来的消息,在已有的项目集成也可以,其结构如下:
1.导入依赖
除Springboot/Springcloud基本必要依赖以外应引入下面两个依赖
需要注意的是如果你使用的Springboot版本是2.0则引入activemq-pool,如果使用Springboot2.0以上的版本就要引入pooled-jms,我这里使用的Springboot2.2.13,引入的是pooled-jms,它们两个的区别在于工厂的不同。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--springboot2.0 PooledConnectionFactory-->
<!-- <dependency>-->
<!-- <groupId>org.apache.activemq</groupId>-->
<!-- <artifactId>activemq-pool</artifactId>-->
<!-- </dependency>-->
<!-- springboot2.1+ JmsPooledConnectionFactory-->
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>
<dependency>
2.配置文件application.yml
需要注意的是默认端口号为61616,并非显示网页的8161端口
spring: activemq: broker-url: tcp://127.0.0.1:61616
user: admin password: admin pool: enabled: true #是否使用PooledConnectionFactory max-connections: 30 #默认为1 idle-timeout: 10000 #空闲的连接过期时间 默认30s #强制的连接过期时间,与idleTimeout的区别在于: #idleTimeout是在连接空闲一段时间失效, #而expiryTimeout不管当前连接的情况,只要达到指定时间就失效。默认为0 expiry-timeout: 20000
3.编写配置类Config
import javax.jms.ConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; @EnableJms //开启监听
@Configuration public class Config { public final static String TOPIC = "springboot.topic.test"; public final static String QUEUE = "springboot.queue.test"; // topic模式的ListenerContainer
@Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(activeMQConnectionFactory); bean.setPubSubDomain(true); return bean; } // queue模式的ListenerContainer
@Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(activeMQConnectionFactory); return bean; } }
4.编写消费者类JmsConsumer,接收消息并处理(一般操作数据库,这里只将其做打印处理)
import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.TextMessage; @Component public class JmsConsumer { @JmsListener(destination = Config.TOPIC,containerFactory = "jmsListenerContainerTopic") public void onMessageTopic(Message message) { //按照自己需求转换
TextMessage textMessage = (TextMessage) message; //ObjectMessage objectMessage= (ObjectMessage) message; //String msg= String.valueOf(objectMessage);
try { String text = textMessage.getText(); System.out.println("接收到的topic消息是:"+text); } catch (JMSException e) { e.printStackTrace(); } } @JmsListener(destination = Config.QUEUE,containerFactory = "jmsListenerContainerQueue") public void onMessageQueue(String msg){ System.out.println("接收到的queue消息是:"+msg); } }
5.编写启动类ActiveMQConsumerApp
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ActiveMQConsumerApp { public static void main(String[] args) { SpringApplication.run(ActiveMQConsumerApp.class,args); } }
三、创建Springboot/Springcloud项目实现消息的生产者。
发送消息的两种方式:
queue:一对一发送
topic:一对多发送
消息的生产者用来产生消息并发送消息,在已有的项目集成也可以,其结构如下:
1.添加依赖
除Springboot/Springcloud基本必要依赖以外应引入下面两个依赖,注意对应自己Springboot版本
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--springboot2.0 PooledConnectionFactory-->
<!-- <dependency>-->
<!-- <groupId>org.apache.activemq</groupId>-->
<!-- <artifactId>activemq-pool</artifactId>-->
<!-- </dependency>-->
<!-- springboot2.1+ JmsPooledConnectionFactory-->
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>
<dependency>
2.编写配置文件application.yml,注意为61616端口
spring: activemq: broker-url: tcp://127.0.0.1:61616
user: admin password: admin pool: enabled: true #是否使用PooledConnectionFactory max-connections: 30 #默认为1 idle-timeout: 10000 #空闲的连接过期时间 默认30s #强制的连接过期时间,与idleTimeout的区别在于: #idleTimeout是在连接空闲一段时间失效, #而expiryTimeout不管当前连接的情况,只要达到指定时间就失效。默认为0 expiry-timeout: 20000 server: port: 4455
3.编写配置类MyConfig
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.jms.Queue; import javax.jms.Topic; @Configuration public class MyConfig { public final static String TOPIC = "springboot.topic.test"; public final static String QUEUE = "springboot.queue.test"; @Bean public Queue queue() { return new ActiveMQQueue(MyConfig.QUEUE); } @Bean public Topic topic() { return new ActiveMQTopic(MyConfig.TOPIC); } }
4.编写类MyProvider,创建方法用于发送消息
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; @Component public class MyProvider { @Autowired private JmsTemplate jmsTemplate; public void sendMessage(Destination destination, final String text) { this.jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(text); } }); } }
5.编写Controller类SendMessage调用其发送方法分别用queue和topic方式进行发送消息
import com.star.jms.MyConfig; import com.star.jms.MyProvider; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.jms.Queue; import javax.jms.Topic; @RestController public class SendMessage { @Autowired private MyProvider myProvider; @Autowired private Queue queue; @Autowired private Topic topic; @RequestMapping("/queuesend") public String sendMsg_queue(){ //也可以不写配置类MyConfig使用下面三种方式获取queue //1.ActiveMQQueue queue = new ActiveMQQueue(MyConfig.QUEUE); //还得写配置类MyConfig //2.ActiveMQQueue queue = new ActiveMQQueue("Config.QUEUE"); //Config是消息的消费者里面写的配置类 //3.ActiveMQQueue queue = new ActiveMQQueue("springboot.queue.test"); //直接写死
myProvider.sendMessage(queue,"使用queue发送消息"); return "queue消息已发出,等待消费"; } @RequestMapping("/topicsend") public String sendMsg_topic(){ //也可以不写配置类MyConfig使用下面三种方式获取topic //1.ActiveMQQueue topic = new ActiveMQTopic(MyConfig.TOPIC); //还得写配置类MyConfig //2.ActiveMQQueue topic = new ActiveMQTopic("Config.TOPIC"); //Config是消息的消费者里面写的配置类 //3.ActiveMQQueue topic = new ActiveMQTopic("springboot.topic.test"); //直接写死
myProvider.sendMessage(topic,"使用topic发送消息"); return "topic消息已发出,等待消费"; } }
6.编写启动类ActiveMQProduceApp
@SpringBootApplication public class ActiveMQProduceApp { public static void main(String[] args) { SpringApplication.run(ActiveMQProduceApp.class,args); } }
四、启动测试
1.启动ActiveMq服务
2.启动消息消费者ActiveMQConsumerApp进行监听
3.启动消息的生产者ActiveMQProduceApp
4.访问Controller接口进行发送消息
5.在消息消费者ActiveMQConsumerApp可以看到接收到了消息,并做了打印处理
6.在ActiveMq可视化界面http://127.0.0.1:8161/admin/可以看到queue和topic消息发送和处理情况,账号密码都为admin