springBoot(11)---整合Active消息隊列


Springboot整合Active消息隊列

       簡單理解:

       Active是Apache公司旗下的一個消息總線,ActiveMQ是一個開源兼容Java Message Service(JMS) 面向消息的中件間. 是一個提供松耦合的應用程序架構.

       主要用來在服務與服務之間進行異步通信的。

一、搭建步驟

    1、相應jar包

 <!-- 整合消息隊列ActiveMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        
  <!-- 如果配置線程池則加入 -->
        <dependency>  
            <groupId>org.apache.activemq</groupId>  
            <artifactId>activemq-pool</artifactId>  
        </dependency>    

    2、application.properties文件

#整合jms測試,安裝在別的機器,防火牆和端口號記得開放
spring.activemq.broker-url=tcp://47.96.44.110:61616

spring.activemq.user=admin
spring.activemq.password=admin
#下列配置要增加依賴
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100

#集群配置(后續需要在配上)
#spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
#消息隊列默認是點對點的,如果需要發布/訂閱模式那么需要加上下面注解(如果同時需要點對點發布訂閱這里也需注釋掉)
# spring.jms.pub-sub-domain=true

 3、Springboot主類

<!-- 主類需要多加一個@EnableJms注解,不過貌似我沒有加的時候,也能運行,為安全起見姑且加上 -->
@SpringBootApplication
@EnableJms

4.5.......根據不同消息模式來寫了。

 

二、點對點案例

   我在這里案例中創建了兩個點對點隊列,所以他會有兩個queue對象,同樣對應每個queue對象,都會有單一對應的消費者。

      1、Springboot主類

@SpringBootApplication
@EnableJms public class Main {

    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
    
    //新建一個的Queue對象,交給sringboot管理,這個queue的名稱叫"first.queue".
    @Bean
    public Queue queue(){
        return new ActiveMQQueue("first.queue");
    }
}

      2.1、first.queue對應消費者

@Component
public class FirstConsumer {

    //名為"first.queue"消息隊列的消費者,通過JmsListener進行監聽有沒有消息,有消息會立刻讀取過來
    @JmsListener(destination="first.queue")
    public void receiveQueue(String text){
        System.out.println("FirstConsumer收到的報文為:"+text);
    }
}

       2.2、two.queue對應消費者(后面會創建)

@Component
public class TwoConsumer {

    //名為"two.queue"消息隊列的消費者
    @JmsListener(destination="two.queue")
    public void receiveQueue(String text){
        System.out.println("TwoConsumer收到的報文為:"+text);
    }
}

      3、Service類

   /**
    * 功能描述:消息生產
    */
public interface ProducerService {

    // 功能描述:指定消息隊列,還有消息    
    public void sendMessage(Destination destination, final String message);
    

    // 功能描述:使用默認消息隊列, 發送消息
    public void sendMessage( final String message);

}

      4、ServiceImpl實現類

/**
 * 功能描述:消息生產者實現類
 */
@Service
public class ProducerServiceImpl implements ProducerService{

    //這個隊列就是Springboot主類中bean的對象
    @Autowired
    private Queue queue;
    
    //用來發送消息到broker的對象,可以理解連接數據庫的JDBC
    @Autowired
    private JmsMessagingTemplate jmsTemplate;    
    
    //發送消息,destination是發送到的隊列,message是待發送的消息
    @Override
    public void sendMessage(Destination destination, String message) {        
        jmsTemplate.convertAndSend(destination, message);    
    }
    
    //發送消息,queue是發送到的隊列,message是待發送的消息
    @Override
    public void sendMessage(final String message) { 
        jmsTemplate.convertAndSend(this.queue, message);    
    }        
}

     5.QueueController類

/**
 * 功能描述:點對點消息隊列控制層
 */
@RestController
@RequestMapping("/api/v1")
public class QueueController {
    
    @Autowired
    private ProducerService producerService;        

    // 這里后面調用的是Springboot主類的quene隊列
    @GetMapping("first")
    public Object common(String msg){
        producerService.sendMessage(msg);    
       return "Success";
    }        
    
    // 這個隊列是新建的一個名為two.queue的點對點消息隊列
    @GetMapping("two")
    public Object order(String msg){
        
        Destination destination = new ActiveMQQueue("two.queue");
        producerService.sendMessage(destination, msg);
        
       return "Success";
    }        
}

      6、案例演示:

從演示效果可以得出以下結論:

     1:當springboot啟動時候,就生成了這兩個隊列,而且他們都會有一個消費者

     2:當我通過頁面訪問的時候,就相當於生產者把消息放到隊列中,一旦放進去就會被消費者監聽到,就可以獲取生產者放進去的值並在后台打印出

順便對頁面中四個單詞進行解釋:

   Number Of Pending Messages :待處理消息的數量。我們每次都會被監聽處理掉,所以不存在待處理,如果存在就說這里面哪里出故障了,需要排查

   Number Of Consumers : 消費者數量

   Messages Enqueued:    消息排列,這個只增不見,代表已經處理多少消息

   Messages Dequeued:    消息出隊。

     

 三、發布/訂閱者模式

 在上面點對點代碼的基礎上,添加發布/訂閱相關代碼

     1.appliaction.properties文件

#消息隊列默認是點對點的,如果需要發布/訂閱模式那么需要加上下面注解(如果同時需要點對點發布訂閱這里也需注釋掉)
spring.jms.pub-sub-domain=true

      2.Springboot主類添加

//新建一個topic隊列
    @Bean
    public Topic topic(){
        return new ActiveMQTopic("video.topic");
    }

       3.添加多個消費者類

//這里定義了三個消費者
@Component
public class TopicSub {
    
    @JmsListener(destination="video.topic")
    public void receive1(String text){
        System.out.println("video.topic 消費者:receive1="+text);
    }
        
    @JmsListener(destination="video.topic")
    public void receive2(String text){
        System.out.println("video.topic 消費者:receive2="+text);
    }
        
    @JmsListener(destination="video.topic")
    public void receive3(String text){
        System.out.println("video.topic 消費者:receive3="+text);
    }   
}

       4.Service類

 //功能描述:消息發布者
    public void publish(String msg);

      5.ServiceImpl實現類

//=======發布訂閱相關代碼=========
    
        @Autowired
        private Topic topic;
                
         @Override
        public void publish(String msg) {
            this.jmsTemplate.convertAndSend(this.topic, msg);
            
        }

       6.Controller類

// 這個隊列是新建的一個名為two.queue的點對點消息隊列
        @GetMapping("topic")
        public Object topic(String msg){

            producerService.publish(msg);
            
           return "Success";
        }    

      7.演示效果:

    從演示效果總結如下:

     1:Springboot啟動的時候,在Topics目錄下,一共出現了5個消費者。first.queue一個消費者、two.queue一個消費者、video.topic三個消費者

      2:當我在控制台輸入信息后,video.topic的三個消費者都會監聽video.topic發布的消息,並在控制台打印。

 

四、如何讓點對點和發布訂閱同時有效

為什么這么說呢,因為當我向上面一樣同時開啟,會發現點對點模式已經失效了。

 效果演示

從演示效果,可以得出如下結論:

     1:我們發現我們在頁面輸入..../two?msg=555消息后,后台並沒有成功打印消息。再看Active界面發現,這個queue對象,確實有一條待處理的消息,但是我們發現,它對應的消費者數量是為0.

     2:然而我們在打開topic頁面發現,這里卻存在一個消費者。

所以我個人理解是,當同時啟動的時候,所產生的消費者默認都是Topic消費者,沒有Queue消費者,所以它監聽不到queue所待處理的消息。

當配置文件不加:spring.jms.pub-sub-domain=true  那么系統會默認支持quene(點對點模式),但一旦加上這段配置,系統又變成只支持發布訂閱模式。

 

那如何同時都可以成功呢?

 思路如下:

第一步:還是需要去掉配置文件中的:

#消息隊列默認是點對點的,如果需要發布/訂閱模式那么需要加上下面注解(如果同時需要點對點發布訂閱這里也需注釋掉)
#spring.jms.pub-sub-domain=true

第二步:在發布訂閱者的中消費者中指定獨立的containerFactory

因為你去掉上面的配置,那么系統就默認是queue,所以@JmsListener如果不指定獨立的containerFactory的話是只能消費queue消息

    @JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
    public void receive1(String text){
        System.out.println("video.topic 消費者:receive1="+text);
    }
    
    
    @JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
    public void receive2(String text){
        System.out.println("video.topic 消費者:receive2="+text);
    }
    
    //第三步我不添加containerFactory="jmsListenerContainerTopic"看等下是否會打印出
    @JmsListener(destination="video.topic")
    public void receive3(String text){
        System.out.println("video.topic 消費者:receive3="+text);
    }

第三步:定義獨立的topic定義獨立的JmsListenerContainer

在springboot主類中添加:

        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
            bean.setPubSubDomain(true);
            bean.setConnectionFactory(activeMQConnectionFactory);
            return bean;
        }

效果:

得出結論:

    1:點對點,和發布訂閱都有用

   2:receive3沒有指定獨立的containerFactory一樣沒有打印出來。

 源碼

github地址:https://github.com/yudiandemingzi/springbootAcitveMQ

 

想太多,做太少,中間的落差就是煩惱。想沒有煩惱,要么別想,要么多做。上尉【14】

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM