SpringBoot 2.x (13):整合ActiveMQ


ActiveMQ5.x不多做介紹了,主要是SpringBoot的整合

特點:
1)支持來自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各種跨語言客戶端和協議
2)支持許多高級功能,如消息組,虛擬目標,通配符和復合目標
3) 完全支持JMS 1.1和J2EE 1.4,支持瞬態,持久,事務和XA消息
4) Spring支持,ActiveMQ可以輕松嵌入到Spring應用程序中,並使用Spring的XML配置機制進行配置
5) 支持在流行的J2EE服務器(如TomEE,Geronimo,JBoss,GlassFish和WebLogic)中進行測試
6) 使用JDBC和高性能日志支持非常快速的持久化

 

下載:

http://activemq.apache.org/activemq-5153-release.html

實際開發推薦部署到Linux系統,具體操作網上也有教程

我這里為了方便,直接安裝在本地Windows機器上

 

如果想了解更多,查看官方文檔:

http://activemq.apache.org/getting-started.html

 

進入bin目錄win64目錄啟動activemq.bat即可

訪問localhost:8161進入首頁

訪問http://localhost:8161/admin/進入管理頁面,默認用戶名和密碼都是admin

 

整合:

依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

連接池

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>

基本的配置

# ActiveMQ
spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100

使用ActiveMQ必須要在SpringBoot啟動類中開啟JMS,並進行配置

package org.dreamtech.avtivemq;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;

@SpringBootApplication
@EnableJms
public class AvtivemqApplication {

    public static void main(String[] args) {
        SpringApplication.run(AvtivemqApplication.class, args);
    }

    @Autowired
    private Environment env;

    @Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(env.getProperty("spring.activemq.broker-url"));
        connectionFactory.setUserName(env.getProperty("spring.activemq.user"));
        connectionFactory.setPassword(env.getProperty("spring.activemq.password"));
        return connectionFactory;
    }

    @Bean
    public JmsTemplate genJmsTemplate() {
        return new JmsTemplate(connectionFactory());

    }

    @Bean
    public JmsMessagingTemplate jmsMessageTemplate() {
        return new JmsMessagingTemplate(connectionFactory());
    }
}

點對點模型:

首先實現消息的發送

package org.dreamtech.avtivemq.service;

import javax.jms.Destination;

/**
 * 消息生產
 * 
 * @author Xu Yiqing
 *
 */
public interface ProducerService {
    /**
     * 使用指定消息隊列發送
     * 
     * @param destination
     * @param message
     */
    void sendMsg(Destination destination, final String message);
}
package org.dreamtech.avtivemq.service.impl;

import javax.jms.Destination;

import org.dreamtech.avtivemq.service.ProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

@Service
public class ProducerServiceImpl implements ProducerService {
    @Autowired
    private JmsMessagingTemplate jmsTemplate;

    @Override
    public void sendMsg(Destination destination, String message) {
        jmsTemplate.convertAndSend(destination, message);
    }

}
package org.dreamtech.avtivemq.controller;

import javax.jms.Destination;

import org.apache.activemq.command.ActiveMQQueue;
import org.dreamtech.avtivemq.service.ProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class OrderController {
    @Autowired
    private ProducerService producerService;
    
    @GetMapping("/order")
    private Object order(String msg) {
        Destination destination = new ActiveMQQueue("order.queue");
        producerService.sendMsg(destination,msg);
        return "order";
    }
}

訪問:http://localhost:8080/order?msg=demo,然后查看ActiveMQ界面:

有生產者就就有消費者:監聽消息隊列

package org.dreamtech.avtivemq.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class OrderConsumer {
    /**
     * 監聽指定消息隊列
     * 
     * @param text
     */
    @JmsListener(destination = "order.queue")
    public void receiveQueue(String text) {
        System.out.println("[ OrderConsumer收到的報文 : " + text + " ]");
    }
}

由於實時監聽,一啟動SpringBoot就會打印:

[ OrderConsumer收到的報文 : demo ]

 

發布訂閱模型:比如抖音小視頻,某網紅發布新視頻,多名粉絲收到消息

默認ActiveMQ只支持點對點模型,想要開啟發布訂閱模型,需要進行配置

spring.jms.pub-sub-domain=true

Spring管理主題對象

    @Bean
    public Topic topic() {
        return new ActiveMQTopic("demo.topic");
    }

發布者

    /**
     * 消息發布者
     * 
     * @param msg
     */
    void publish(String msg);
    @Autowired
    private JmsMessagingTemplate jmsTemplate;
    @Autowired
    private Topic topic;

    @Override
    public void publish(String msg) {
        jmsTemplate.convertAndSend(topic, msg);
    }
    @Autowired
    private ProducerService producerService;
    @GetMapping("/topic")
    private Object topic(String msg) {
        producerService.publish(msg);
        return "success";
    }

訂閱者(消費者):一人發布,多人訂閱

package org.dreamtech.avtivemq.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class TopicConsumer {
    @JmsListener(destination = "demo.topic")
    public void receiver1(String text) {
        System.out.println("TopicConsumer : receiver1 : " + text);
    }

    @JmsListener(destination = "demo.topic")
    public void receiver2(String text) {
        System.out.println("TopicConsumer : receiver2 : " + text);
    }

    @JmsListener(destination = "demo.topic")
    public void receiver3(String text) {
        System.out.println("TopicConsumer : receiver3 : " + text);
    }
}

啟動項目,訪問:

http://localhost:8080/topic?msg=666

打印如下

TopicConsumer : receiver1 : 666
TopicConsumer : receiver3 : 666
TopicConsumer : receiver2 : 666

 

那么點對點和發布訂閱模型可以一起使用嗎?

不可以

如何配置?

1.注釋掉 #spring.jms.pub-sub-domain=true

2.加入Bean:給topic定義獨立的JmsListenerContainer

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }

3.@JmsListener如果不指定獨立的containerFactory的話是只能消費queue消息

    @JmsListener(destination = "demo.topic", containerFactory = "jmsListenerContainerTopic")
    public void receiver1(String text) {
        System.out.println("TopicConsumer : receiver1 : " + text);
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM