需求 : 工厂员工完成某道工序后,需要将消息推送给 检查人员
也可以使用 WebSockets ,前端更容易实现
思路: 使用activeMQ推送消息,前端实时接收消息
实现 :
1.基于springBoot的activeMQ 实现起来比较方便 配置文件如下
############ activemq
spring.activemq.broker-url=tcp://127.0.0.1:61616
# 账号
spring.activemq.user=xxx
# 密码
spring.activemq.password=xxx
# 等待消息发送响应的时间。设置为0等待永远。
spring.activemq.send-timeout=0
queueName=polling.queue
topicName=polling.topic
# 默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
spring.jms.pub-sub-domain=true
2.java代码 配置文件

package workstation.open.config; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.config.SimpleJmsListenerContainerFactory; import org.springframework.jms.core.JmsMessagingTemplate; import javax.jms.ConnectionFactory; import javax.jms.Queue; import javax.jms.Topic; /** * @author ThreeNut * @date 2021/5/18 13:55 */ @Configuration public class ActiveMQConfig { @Value("${queueName}") private String queueName; @Value("${topicName}") private String topicName; @Value("${spring.activemq.user}") private String usrName; @Value("${spring.activemq.password}") private String password; @Value("${spring.activemq.broker-url}") private String brokerUrl; @Bean public Queue queue(){ return new ActiveMQQueue(queueName); } @Bean public Topic topic(){ return new ActiveMQTopic(topicName); } @Bean public ActiveMQConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(usrName, password, brokerUrl); } @Bean public JmsMessagingTemplate jmsMessageTemplate(){ return new JmsMessagingTemplate(connectionFactory()); } // 在Queue模式中,对消息的监听需要对containerFactory进行配置 @Bean("queueListener") public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(false); return factory; } //在Topic模式中,对消息的监听需要对containerFactory进行配置 @Bean("topicListener") public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); return factory; } }
3.java代码 消息推送 消息生产者
3.1 Queue(点到点)模式在点对点的传输方式中,消息数据被持久化,每条消息都能被消费,没有监听QUEUE地址也能被消费,数据不会丢失,一对一的发布接受策略,保证数据完整。
3.2Topic主题模式,就是订阅模式,不保证消息都能被接收到 发送方式 :群发
package workstation.open.mq; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Component; import javax.jms.Destination; import javax.jms.Topic; /** * 消息生产 * @author BinPeng * @date 2021/5/18 13:47 */ @Component public class PollingQueueProducer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; /** * 群发消息 */ @Autowired private Topic topic; public String sendQueue(String msg) { this.sendMessage(this.topic,msg); System.out.println("发送消息成功!!!"); return "success"; } private void sendMessage(Destination destination,String msg){ jmsMessagingTemplate.convertAndSend(destination, msg); } }
4.后端测试(仅用来在后端测试,看是否接收到消息)
package workstation.open.mq; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import workstation.common.util.ReqResult; import javax.servlet.http.Cookie; import java.util.LinkedList; import java.util.Queue; @Component public class QueueConsumer {
/** * queue模式的消费者 * @param message 消息 */ @JmsListener(destination="${topicName}", containerFactory="topicListener") public ReqResult readActiveQueue(String message) { Queue<String> queue = new LinkedList<String>(); queue.offer(message); System.out.println(message +"----------------------------------"); return new ReqResult().put("msg",queue); } }
5.基于前端VUE使用Stompjs接收ActiveMQ消息实现方法 -- 消息消费者
准备工作:
npm install stompjs
npm install net
5.1 linkparam.js 配置文件
export const MQ_SERVICE = 'ws://127.0.0.1:61614/stomp' // mq服务地址(默认监听消息端口)
export const MQ_USERNAME = 'xxx' // mq连接用户名
export const MQ_PASSWORD = 'xxx' //mq连接密码
5.2 sock.vue
<template> <div></div> </template> <script> import Stomp from 'stompjs' import { MQ_SERVICE, MQ_USERNAME, MQ_PASSWORD } from '../../utils/linkparam' export default { name: 'entry', data () { return { client: Stomp.client(MQ_SERVICE), topic : '/topic/iqc_check_list' } }, created () { this.connect() }, methods: { onConnected: function (frame) { console.log('Connected: ---------' + frame.body) // 主题模式 this.client.subscribe(this.topic, this.responseCallback, { id: 20210820 }) }, onFailed: function (frame) { }, responseCallback: function (frame) { if(frame.body != null){ this.$notify.info({ showClose: true, message: '有新的检验单 ' + frame.body, position: 'bottom-right', duration: '3000', onClose: () => { // 执行查询方法 this.$parent.query() }, }); } }, connect: function () { let headers = { 'login': MQ_USERNAME, 'passcode': MQ_PASSWORD } // 心跳发送频率 this.client.heartbeat.outgoing = 50000; // 心跳接收频率 this.client.heartbeat.incoming = 50000; this.client.connect(headers, this.onConnected, this.onFailed) } } } </script>
5.3 在APP.vue 文件下添加(因为要在此页面展示所以添加如下,展示页面按照你自己的实际情况配置即可)
import sock from './components/sock'
components:{
sock
}
至此前后端配置结束
总结: 需注意 如果监听的是远程服务 需要配置为外网ip地址,端口 61614和61616加入安全组. 监听端口为: 61614
activeMQ 客户端地址:http://localhost:8161/admin/topics.jsp