需求 : 工廠員工完成某道工序后,需要將消息推送給 檢查人員
也可以使用 WebSockets ,前端更容易實現
思路: 使用activeMQ推送消息,前端實時接收消息
實現 :
1.基於springBoot的activeMQ 實現起來比較方便 配置文件如下
############ activemq
spring.activemq.broker-url=tcp://127.0.0.1:61616
# 賬號
spring.activemq.user=xxx
# 密碼
spring.activemq.password=xxx
# 等待消息發送響應的時間。設置為0等待永遠。
spring.activemq.send-timeout=0
queueName=polling.queue
topicName=polling.topic
# 默認情況下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
spring.jms.pub-sub-domain=true
2.java代碼 配置文件

package workstation.open.config; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.config.SimpleJmsListenerContainerFactory; import org.springframework.jms.core.JmsMessagingTemplate; import javax.jms.ConnectionFactory; import javax.jms.Queue; import javax.jms.Topic; /** * @author ThreeNut * @date 2021/5/18 13:55 */ @Configuration public class ActiveMQConfig { @Value("${queueName}") private String queueName; @Value("${topicName}") private String topicName; @Value("${spring.activemq.user}") private String usrName; @Value("${spring.activemq.password}") private String password; @Value("${spring.activemq.broker-url}") private String brokerUrl; @Bean public Queue queue(){ return new ActiveMQQueue(queueName); } @Bean public Topic topic(){ return new ActiveMQTopic(topicName); } @Bean public ActiveMQConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(usrName, password, brokerUrl); } @Bean public JmsMessagingTemplate jmsMessageTemplate(){ return new JmsMessagingTemplate(connectionFactory()); } // 在Queue模式中,對消息的監聽需要對containerFactory進行配置 @Bean("queueListener") public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(false); return factory; } //在Topic模式中,對消息的監聽需要對containerFactory進行配置 @Bean("topicListener") public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); return factory; } }
3.java代碼 消息推送 消息生產者
3.1 Queue(點到點)模式在點對點的傳輸方式中,消息數據被持久化,每條消息都能被消費,沒有監聽QUEUE地址也能被消費,數據不會丟失,一對一的發布接受策略,保證數據完整。
3.2Topic主題模式,就是訂閱模式,不保證消息都能被接收到 發送方式 :群發
package workstation.open.mq; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Component; import javax.jms.Destination; import javax.jms.Topic; /** * 消息生產 * @author BinPeng * @date 2021/5/18 13:47 */ @Component public class PollingQueueProducer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; /** * 群發消息 */ @Autowired private Topic topic; public String sendQueue(String msg) { this.sendMessage(this.topic,msg); System.out.println("發送消息成功!!!"); return "success"; } private void sendMessage(Destination destination,String msg){ jmsMessagingTemplate.convertAndSend(destination, msg); } }
4.后端測試(僅用來在后端測試,看是否接收到消息)
package workstation.open.mq; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import workstation.common.util.ReqResult; import javax.servlet.http.Cookie; import java.util.LinkedList; import java.util.Queue; @Component public class QueueConsumer {
/** * queue模式的消費者 * @param message 消息 */ @JmsListener(destination="${topicName}", containerFactory="topicListener") public ReqResult readActiveQueue(String message) { Queue<String> queue = new LinkedList<String>(); queue.offer(message); System.out.println(message +"----------------------------------"); return new ReqResult().put("msg",queue); } }
5.基於前端VUE使用Stompjs接收ActiveMQ消息實現方法 -- 消息消費者
准備工作:
npm install stompjs
npm install net
5.1 linkparam.js 配置文件
export const MQ_SERVICE = 'ws://127.0.0.1:61614/stomp' // mq服務地址(默認監聽消息端口)
export const MQ_USERNAME = 'xxx' // mq連接用戶名
export const MQ_PASSWORD = 'xxx' //mq連接密碼
5.2 sock.vue
<template> <div></div> </template> <script> import Stomp from 'stompjs' import { MQ_SERVICE, MQ_USERNAME, MQ_PASSWORD } from '../../utils/linkparam' export default { name: 'entry', data () { return { client: Stomp.client(MQ_SERVICE), topic : '/topic/iqc_check_list' } }, created () { this.connect() }, methods: { onConnected: function (frame) { console.log('Connected: ---------' + frame.body) // 主題模式 this.client.subscribe(this.topic, this.responseCallback, { id: 20210820 }) }, onFailed: function (frame) { }, responseCallback: function (frame) { if(frame.body != null){ this.$notify.info({ showClose: true, message: '有新的檢驗單 ' + frame.body, position: 'bottom-right', duration: '3000', onClose: () => { // 執行查詢方法 this.$parent.query() }, }); } }, connect: function () { let headers = { 'login': MQ_USERNAME, 'passcode': MQ_PASSWORD } // 心跳發送頻率 this.client.heartbeat.outgoing = 50000; // 心跳接收頻率 this.client.heartbeat.incoming = 50000; this.client.connect(headers, this.onConnected, this.onFailed) } } } </script>
5.3 在APP.vue 文件下添加(因為要在此頁面展示所以添加如下,展示頁面按照你自己的實際情況配置即可)
import sock from './components/sock'
components:{
sock
}
至此前后端配置結束
總結: 需注意 如果監聽的是遠程服務 需要配置為外網ip地址,端口 61614和61616加入安全組. 監聽端口為: 61614
activeMQ 客戶端地址:http://localhost:8161/admin/topics.jsp