本文記錄學習在Spring Boot中使用MQ。
一 什么是MQ
MQ全稱(Message Queue)又名消息隊列,是一種異步通訊的中間件。它的作用類似於郵局,發信人(生產者)只需要將信(消息)交給郵局,然后由郵局再將信(消息)發送給具體的接收者(消費者),具體發送過程與時間發信人可以不關注,也不會影響發信人做其它事情。目前常見的MQ有activemq、kafka、rabbitmq、zeromq、rocketmq等。
使用MQ的優點主要有:
1 方法的異步執行 使用MQ可以將耗時的同步操作通過以發送消息的方式進行了異步化處理,減少了由於同步而等待的時間;
2 程序之間松耦合 使用MQ可以減少了服務之間的耦合性,不同的服務可以通過消息隊列進行通信,只要約定好消息的內容格式就行;
JMS(Java Message Service)即java消息服務,是一個Java平台中關於面向消息中間件(MOM)的 API,用於在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。JMS的消息機制有2種模型,一種是1對1(Point to Point)的隊列的消息,這種消息,只能被一個消費者消費;另一種是一對多的發布/訂閱(Topic)消息,一條消息可以被多個消費者消費。ActiveMq是對JMS的一個實現。
二 SpringBoot集成Active MQ
官網下載一個服務程序,解壓后直接啟動服務就可以了,下載地址:http://activemq.apache.org/activemq-5158-release.html
SpringBoot也對Active MQ提供了支持,我們使用時引入具體的依賴即可,修改pom.xml文件,添加依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
在application.properties文件中配置Active MQ服務器的連接信息
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
#消息模式 true:廣播(Topic),false:隊列(Queue),默認時false
#spring.jms.pub-sub-domain=true
完成以上配置信息后,當我們在啟動SpringBoot項目時,會自動幫我們完成初始化操作,並提供一個JmsMessagingTemplate,提提供了我們常用發送消息的各種方法供我們使用。我們只需要在使用的地方注入JmsMessagingTemplate即可使用。
發送隊列消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqApplicationTests {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Test
public void testQueueMsg(){
//創建名稱為zyQueue的隊列
Queue queue = new ActiveMQQueue("zyQueue");
//向隊列發送消息
jmsMessagingTemplate.convertAndSend(queue,"這是一個隊列消息!");
}
}
消息的接收方,監聽消息隊列,當隊列中有消息時就可以獲取到消息
@Component
public class Consumer {
private static DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,sss");
/**
* destination 目標地址即隊列
*/
@JmsListener(destination = "zyQueue")
public void receiveMessage(String text){
System.out.println("接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
}
}
執行測試方法發送消息可以看到,控制台輸出的消費者接受到消息
隊列消息只能有一個消費者,如果有多個消費者同時監聽一個隊列時,只能有一個拿到消息,我們測試,修改發送方法,循環發送10調消息
@Test
public void testQueueMsg(){
//創建名稱為zyQueue的隊列
Queue queue = new ActiveMQQueue("zyQueue");
//向隊列發送消息
for (int i=0;i<10;i++) {
jmsMessagingTemplate.convertAndSend(queue,"這是第"+i+"個隊列消息!");
}
}
在Consumer 類中再添加一個消費者,監聽隊列zyQueue
@JmsListener(destination = "zyQueue")
public void receiveMessage(String text){
System.out.println("接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
}
@JmsListener(destination = "zyQueue")
public void receiveMessage1(String text){
System.out.println("1接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
}
執行發送消息,看到控制台輸出的結果,2個消費者平分了這10條消息
如果希望監聽同一個隊列的多個消費者都能接收到所有消息,我們就只能發送Topic消息了,我們修改application.properties中的
#消息模式 true:廣播(Topic),false:隊列(Queue),默認時false
spring.jms.pub-sub-domain=true
表示要發送發布/訂閱消息,發送消息的隊列改用Topic發送消息,如下
@Test
public void testTopicMsg(){
Topic topic = new ActiveMQTopic("zyTopic");
for (int i=0;i<5;i++){
jmsMessagingTemplate.convertAndSend(topic,"這是第"+i+"個Topic消息!");
}
}
我們在Consumer 類中添加兩個消費者來監聽zyTopic隊列,接受消息
@JmsListener(destination = "zyTopic")
public void receiveTopicMessage1(String text){
System.out.println("消費者1接收消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
}
@JmsListener(destination = "zyTopic")
public void receiveTopicMessage2(String text){
System.out.println("消費者2接收消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
}
執行發消息方法,可以看到控制台輸出的內容,2個消費者都完整的接收到了5條消息
我們在測試發送消息時修改了屬性文件中的配置信息,才可以發送對應的類型的消息,這是由於SpringBoot中默認的是隊列消息(查看源碼可以知道,監聽器默認使用的DefaultJmsListenerContainerFactory),如果我們想在不修改配置信息的情況下可以同時發送Queue和Topic消息怎么辦呢,我們需要手動的更改初始的配置類,分別針對Queue和Topic消息提供JmsListenerContainerFactory
新建一個配置類,如下
@SpringBootConfiguration
public class ActiveMqConfig {
@Bean("queueListenerFactory")
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//設置消息模型為隊列
factory.setPubSubDomain(false);
return factory;
}
@Bean("topicListenerFactory")
public JmsListenerContainerFactory topicListenerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//設置消息模型為隊列
factory.setPubSubDomain(true);
return factory;
}
}
在容器啟動時會針對兩種消息類型,初始化得到兩個不同的JmsListenerContainerFactory。下來再修改消費者類,在 @JmsListener 注解中指定 containerFactory,如
@JmsListener(destination = "zyQueue", containerFactory = "queueListenerFactory")
public void receiveMessage(String text){
System.out.println("接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
}
@JmsListener(destination = "zyTopic", containerFactory = "topicListenerFactory")
public void receiveTopicMessage1(String text){
System.out.println("消費者1接收消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
}
Queue消息使用 queueListenerFactory,Topic消息使用 topicListenerFactory,然后注釋掉屬性文件中的消息模式配置就可以了。