SpringBoot整合ActiveMQ實現持久化


點對點(P2P)

  結構

    創建生產者和消費者兩個springboot工程

    

   導入依賴

    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

  生產者

    步驟一:application.properties文件

spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=admin
spring.activemq.password=admin
server.port=8080

    步驟二:創建生產者類

package com.wn.p2p;

import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class P2Pprovider {
    //注入JMSTemplate
    @Resource
    private JmsTemplate jmsTemplate;

    public void setMessage(){
        //開啟持久化操作
        jmsTemplate.setDeliveryMode(2);
        jmsTemplate.setExplicitQosEnabled(true);
        jmsTemplate.setDeliveryPersistent(true);
        //點對點創建隊列
        ActiveMQQueue queue=new ActiveMQQueue("boot_queue");
        jmsTemplate.convertAndSend(queue,"消息123");
    }
}

    步驟三:創建controller

package com.wn.p2p;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class P2PController {
    @Resource
    private P2Pprovider p2Pprovider;

    @RequestMapping("/setMessage")
    public String setMessage(){
        p2Pprovider.setMessage();
        return "success";
    }
}

    步驟四:啟動

      

      

       這里可以試一下讓ActiveMQ服務器宕機,然后讓服務器重啟,看看數據有沒有做持久化的操作

      結論:當服務器宕機,重啟服務器之后,沒有被消費的消息依然在數據庫中,這樣就做到了持久化操作。

  消費者

    步驟一:application.properties文件

spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=admin
spring.activemq.password=admin
server.port=8081

    步驟二:創建消費者類

package com.wn;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.JmsListener;

import javax.jms.JMSException;
import javax.jms.TextMessage;

@SpringBootApplication
public class P2pConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(P2pConsumerApplication.class, args);
    }

    @JmsListener(destination = "boot_queue")
    public void getMessage(TextMessage message) throws JMSException {
        System.out.println("接收到的信息:"+message.getText());
    }
}

    步驟三:啟動

      

      

      結論:被消費的消息,將不會做持久化操作,就會從隊列中清除。

 

發布/訂閱(Pub/Sub)

  結構

    

  導入依賴

    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

  消費者

    步驟一:application.properties文件

spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=admin
spring.activemq.password=admin
server.port=8082

#topic配置
#spring.jms.pub-sub-domain=true

    步驟二:創建消費者類

package com.wn;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.TextMessage;

@SpringBootApplication
public class PubSubConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(PubSubConsumerApplication.class, args);
    }

   //不進行數據消費的,但是數據可以持久化
    @Bean(name = "topicListenerFactory")
    public JmsListenerContainerFactory<DefaultMessageListenerContainer> topicListenerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

        factory.setSubscriptionDurable(true);// Set this to "true" to register a durable subscription,

        factory.setClientId("A");

        factory.setConnectionFactory(connectionFactory);
        return factory;

    }

    //消費者消費消息
    @JmsListener(destination = "boot_topic",containerFactory = "topicListenerFactory")
    public void getMessage(TextMessage message) throws JMSException {
        System.out.println("接收到消息:"+message.getText());
    }
}

  生產者

    步驟一:applicationContext.properties文件

spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=admin
spring.activemq.password=admin
server.port=8083
spring.jms.pub-sub-domain=true

    步驟二:創建生產者類

package com.wn.pub_sub;

import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class Pub_Sub_Provider {

    @Resource
    private JmsTemplate jmsTemplate;

    public void setMessage(){
        //創建主題
        ActiveMQTopic topic=new ActiveMQTopic("boot_topic");;
        //發送消息
        jmsTemplate.convertAndSend(topic,"消息456");
    }

}

    步驟三:創建controller

package com.wn.pub_sub;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class Pub_Sub_Controller {
    @Resource
    private Pub_Sub_Provider provider;

    @RequestMapping("/setMessage")
    public String setMessage(){
        provider.setMessage();
        return "success";
    }
}

    步驟四:啟動

      

      

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM