SpringBoot入門 (九) MQ使用


本文記錄學習在Spring Boot中使用MQ。

一 什么是MQ

  MQ全稱(Message Queue)又名消息隊列,是一種異步通訊的中間件。它的作用類似於郵局,發信人(生產者)只需要將信(消息)交給郵局,然后由郵局再將信(消息)發送給具體的接收者(消費者),具體發送過程與時間發信人可以不關注,也不會影響發信人做其它事情。目前常見的MQ有activemq、kafka、rabbitmq、zeromq、rocketmq等。

  使用MQ的優點主要有:

  1 方法的異步執行 使用MQ可以將耗時的同步操作通過以發送消息的方式進行了異步化處理,減少了由於同步而等待的時間;

  2 程序之間松耦合 使用MQ可以減少了服務之間的耦合性,不同的服務可以通過消息隊列進行通信,只要約定好消息的內容格式就行;

  JMS(Java Message Service)即java消息服務,是一個Java平台中關於面向消息中間件(MOM)的 API,用於在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。JMS的消息機制有2種模型,一種是1對1(Point to Point)的隊列的消息,這種消息,只能被一個消費者消費;另一種是一對多的發布/訂閱(Topic)消息,一條消息可以被多個消費者消費。ActiveMq是對JMS的一個實現。

二 SpringBoot集成Active MQ

  官網下載一個服務程序,解壓后直接啟動服務就可以了,下載地址:http://activemq.apache.org/activemq-5158-release.html

  SpringBoot也對Active MQ提供了支持,我們使用時引入具體的依賴即可,修改pom.xml文件,添加依賴

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

  在application.properties文件中配置Active MQ服務器的連接信息

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
#消息模式 true:廣播(Topic),false:隊列(Queue),默認時false
#spring.jms.pub-sub-domain=true

  完成以上配置信息后,當我們在啟動SpringBoot項目時,會自動幫我們完成初始化操作,並提供一個JmsMessagingTemplate,提提供了我們常用發送消息的各種方法供我們使用。我們只需要在使用的地方注入JmsMessagingTemplate即可使用。

  發送隊列消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqApplicationTests {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Test
    public void testQueueMsg(){
        //創建名稱為zyQueue的隊列
        Queue queue = new ActiveMQQueue("zyQueue");
        //向隊列發送消息
        jmsMessagingTemplate.convertAndSend(queue,"這是一個隊列消息!");
    }
}

  消息的接收方,監聽消息隊列,當隊列中有消息時就可以獲取到消息

@Component
public class Consumer {

    private static DateFormat df =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,sss");

    /**
     * destination 目標地址即隊列
     */
    @JmsListener(destination = "zyQueue")
    public void receiveMessage(String text){
        System.out.println("接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
    }
}

  執行測試方法發送消息可以看到,控制台輸出的消費者接受到消息

隊列消息只能有一個消費者,如果有多個消費者同時監聽一個隊列時,只能有一個拿到消息,我們測試,修改發送方法,循環發送10調消息

@Test
    public void testQueueMsg(){
        //創建名稱為zyQueue的隊列
        Queue queue = new ActiveMQQueue("zyQueue");
        //向隊列發送消息
        for (int i=0;i<10;i++) {
            jmsMessagingTemplate.convertAndSend(queue,"這是第"+i+"個隊列消息!");
        }
    }

  在Consumer 類中再添加一個消費者,監聽隊列zyQueue

@JmsListener(destination = "zyQueue")
    public void receiveMessage(String text){
        System.out.println("接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
    }

    @JmsListener(destination = "zyQueue")
    public void receiveMessage1(String text){
        System.out.println("1接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
    }

  執行發送消息,看到控制台輸出的結果,2個消費者平分了這10條消息

  如果希望監聽同一個隊列的多個消費者都能接收到所有消息,我們就只能發送Topic消息了,我們修改application.properties中的

#消息模式 true:廣播(Topic),false:隊列(Queue),默認時false
spring.jms.pub-sub-domain=true

  表示要發送發布/訂閱消息,發送消息的隊列改用Topic發送消息,如下

@Test
    public void testTopicMsg(){
        Topic topic = new ActiveMQTopic("zyTopic");
        for (int i=0;i<5;i++){
            jmsMessagingTemplate.convertAndSend(topic,"這是第"+i+"個Topic消息!");
        }
    }

  我們在Consumer 類中添加兩個消費者來監聽zyTopic隊列,接受消息

@JmsListener(destination = "zyTopic")
    public void receiveTopicMessage1(String text){
        System.out.println("消費者1接收消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
    }

    @JmsListener(destination = "zyTopic")
    public void receiveTopicMessage2(String text){
        System.out.println("消費者2接收消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
    }

  執行發消息方法,可以看到控制台輸出的內容,2個消費者都完整的接收到了5條消息

 

   我們在測試發送消息時修改了屬性文件中的配置信息,才可以發送對應的類型的消息,這是由於SpringBoot中默認的是隊列消息(查看源碼可以知道,監聽器默認使用的DefaultJmsListenerContainerFactory),如果我們想在不修改配置信息的情況下可以同時發送Queue和Topic消息怎么辦呢,我們需要手動的更改初始的配置類,分別針對Queue和Topic消息提供JmsListenerContainerFactory

  新建一個配置類,如下

@SpringBootConfiguration
public class ActiveMqConfig {

    @Bean("queueListenerFactory")
    public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //設置消息模型為隊列
        factory.setPubSubDomain(false);
        return factory;
    }
    
    @Bean("topicListenerFactory")
    public JmsListenerContainerFactory topicListenerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //設置消息模型為隊列
        factory.setPubSubDomain(true);
        return factory;
    }
}

  在容器啟動時會針對兩種消息類型,初始化得到兩個不同的JmsListenerContainerFactory。下來再修改消費者類,在 @JmsListener 注解中指定 containerFactory,如

@JmsListener(destination = "zyQueue", containerFactory = "queueListenerFactory")
    public void receiveMessage(String text){
        System.out.println("接收隊列消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
    }

@JmsListener(destination = "zyTopic", containerFactory = "topicListenerFactory")
    public void receiveTopicMessage1(String text){
        System.out.println("消費者1接收消息時間:"+ df.format(new Date()) +", 接收到消息內容:"+text);
    }

  Queue消息使用 queueListenerFactory,Topic消息使用 topicListenerFactory,然后注釋掉屬性文件中的消息模式配置就可以了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM