ActiveMQ5.x不多做介紹了,主要是SpringBoot的整合
特點:
1)支持來自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各種跨語言客戶端和協議
2)支持許多高級功能,如消息組,虛擬目標,通配符和復合目標
3) 完全支持JMS 1.1和J2EE 1.4,支持瞬態,持久,事務和XA消息
4) Spring支持,ActiveMQ可以輕松嵌入到Spring應用程序中,並使用Spring的XML配置機制進行配置
5) 支持在流行的J2EE服務器(如TomEE,Geronimo,JBoss,GlassFish和WebLogic)中進行測試
6) 使用JDBC和高性能日志支持非常快速的持久化
下載:
http://activemq.apache.org/activemq-5153-release.html
實際開發推薦部署到Linux系統,具體操作網上也有教程
我這里為了方便,直接安裝在本地Windows機器上
如果想了解更多,查看官方文檔:
http://activemq.apache.org/getting-started.html
進入bin目錄win64目錄啟動activemq.bat即可
訪問localhost:8161進入首頁
訪問http://localhost:8161/admin/進入管理頁面,默認用戶名和密碼都是admin
整合:
依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
連接池
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
基本的配置
# ActiveMQ spring.activemq.broker-url=tcp://127.0.0.1:61616 spring.activemq.user=admin spring.activemq.password=admin spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=100
使用ActiveMQ必須要在SpringBoot啟動類中開啟JMS,並進行配置
package org.dreamtech.avtivemq; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.core.env.Environment; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.jms.core.JmsTemplate; @SpringBootApplication @EnableJms public class AvtivemqApplication { public static void main(String[] args) { SpringApplication.run(AvtivemqApplication.class, args); } @Autowired private Environment env; @Bean public ConnectionFactory connectionFactory() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(env.getProperty("spring.activemq.broker-url")); connectionFactory.setUserName(env.getProperty("spring.activemq.user")); connectionFactory.setPassword(env.getProperty("spring.activemq.password")); return connectionFactory; } @Bean public JmsTemplate genJmsTemplate() { return new JmsTemplate(connectionFactory()); } @Bean public JmsMessagingTemplate jmsMessageTemplate() { return new JmsMessagingTemplate(connectionFactory()); } }
點對點模型:
首先實現消息的發送
package org.dreamtech.avtivemq.service; import javax.jms.Destination; /** * 消息生產 * * @author Xu Yiqing * */ public interface ProducerService { /** * 使用指定消息隊列發送 * * @param destination * @param message */ void sendMsg(Destination destination, final String message); }
package org.dreamtech.avtivemq.service.impl; import javax.jms.Destination; import org.dreamtech.avtivemq.service.ProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service; @Service public class ProducerServiceImpl implements ProducerService { @Autowired private JmsMessagingTemplate jmsTemplate; @Override public void sendMsg(Destination destination, String message) { jmsTemplate.convertAndSend(destination, message); } }
package org.dreamtech.avtivemq.controller; import javax.jms.Destination; import org.apache.activemq.command.ActiveMQQueue; import org.dreamtech.avtivemq.service.ProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class OrderController { @Autowired private ProducerService producerService; @GetMapping("/order") private Object order(String msg) { Destination destination = new ActiveMQQueue("order.queue"); producerService.sendMsg(destination,msg); return "order"; } }
訪問:http://localhost:8080/order?msg=demo,然后查看ActiveMQ界面:
有生產者就就有消費者:監聽消息隊列
package org.dreamtech.avtivemq.jms; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class OrderConsumer { /** * 監聽指定消息隊列 * * @param text */ @JmsListener(destination = "order.queue") public void receiveQueue(String text) { System.out.println("[ OrderConsumer收到的報文 : " + text + " ]"); } }
由於實時監聽,一啟動SpringBoot就會打印:
[ OrderConsumer收到的報文 : demo ]
發布訂閱模型:比如抖音小視頻,某網紅發布新視頻,多名粉絲收到消息
默認ActiveMQ只支持點對點模型,想要開啟發布訂閱模型,需要進行配置
spring.jms.pub-sub-domain=true
Spring管理主題對象
@Bean public Topic topic() { return new ActiveMQTopic("demo.topic"); }
發布者
/** * 消息發布者 * * @param msg */ void publish(String msg);
@Autowired private JmsMessagingTemplate jmsTemplate; @Autowired private Topic topic; @Override public void publish(String msg) { jmsTemplate.convertAndSend(topic, msg); }
@Autowired private ProducerService producerService; @GetMapping("/topic") private Object topic(String msg) { producerService.publish(msg); return "success"; }
訂閱者(消費者):一人發布,多人訂閱
package org.dreamtech.avtivemq.jms; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class TopicConsumer { @JmsListener(destination = "demo.topic") public void receiver1(String text) { System.out.println("TopicConsumer : receiver1 : " + text); } @JmsListener(destination = "demo.topic") public void receiver2(String text) { System.out.println("TopicConsumer : receiver2 : " + text); } @JmsListener(destination = "demo.topic") public void receiver3(String text) { System.out.println("TopicConsumer : receiver3 : " + text); } }
啟動項目,訪問:
http://localhost:8080/topic?msg=666
打印如下
TopicConsumer : receiver1 : 666 TopicConsumer : receiver3 : 666 TopicConsumer : receiver2 : 666
那么點對點和發布訂閱模型可以一起使用嗎?
不可以
如何配置?
1.注釋掉 #spring.jms.pub-sub-domain=true
2.加入Bean:給topic定義獨立的JmsListenerContainer
@Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(activeMQConnectionFactory); return bean; }
3.@JmsListener如果不指定獨立的containerFactory的話是只能消費queue消息
@JmsListener(destination = "demo.topic", containerFactory = "jmsListenerContainerTopic") public void receiver1(String text) { System.out.println("TopicConsumer : receiver1 : " + text); }