Springboot整合RabbitMQ


說明該篇文章內容包括有rabbitMq相關的一些簡單理論介紹,provider消息推送實例,consumer消息消費實例,Direct、Topic、Fanout的使用。

 

  在安裝完rabbitMq后,輸入http://ip:15672/ ,是可以看到一個簡單后台管理界面的。

 

在這個界面里面我們可以做些什么?
可以手動創建虛擬host,創建用戶,分配權限,創建交換機,創建隊列等等,還有查看隊列消息,消費效率,推送效率等等。

以上這些管理界面的操作在這篇暫時不做擴展描述,我想着重介紹后面實例里會使用到的。

首先先介紹一個簡單的一個消息推送到接收的流程,提供一個簡單的圖:

 

黃色的圈圈就是我們的消息推送服務,將消息推送到 中間方框里面也就是 rabbitMq的服務器,然后經過服務器里面的交換機、隊列等各種關系(后面會詳細講)將數據處理入列后,最終右邊的藍色圈圈消費者獲取對應監聽的消息。

常用的交換機有以下三種,因為消費者是從隊列獲取信息的,隊列是綁定交換機的(一般),所以對應的消息推送/接收模式也會有以下幾種:
Direct Exchange

直連型交換機,根據消息攜帶的路由鍵將消息投遞給對應隊列。

大致流程,有一個隊列綁定到一個直連交換機上,同時賦予一個路由鍵 routing key 。
然后當一個消息攜帶着路由值為X,這個消息通過生產者發送給交換機時,交換機就會根據這個路由值X去尋找綁定值也是X的隊列。

Fanout Exchange

扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。 這個交換機在接收到消息后,會直接轉發到綁定到它上面的所有隊列。

Topic Exchange

主題交換機,這個交換機其實跟直連交換機流程差不多,但是它的特點就是在它的路由鍵和綁定鍵之間是有規則的。
簡單地介紹下規則:

* (星號) 用來表示一個單詞 (必須出現的)
# (井號) 用來表示任意數量(零個或多個)單詞
通配的綁定鍵是跟隊列進行綁定的,舉個小例子
隊列Q1 綁定鍵為 *.TT.* 隊列Q2綁定鍵為 TT.#
如果一條消息攜帶的路由鍵為 A.TT.B,那么隊列Q1將會收到;
如果一條消息攜帶的路由鍵為TT.AA.BB,那么隊列Q2將會收到;

主題交換機是非常強大的,為啥這么膨脹?
當一個隊列的綁定鍵為 "#"(井號) 的時候,這個隊列將會無視消息的路由鍵,接收所有的消息。
當 * (星號) 和 # (井號) 這兩個特殊字符都未在綁定鍵中出現的時候,此時主題交換機就擁有的直連交換機的行為。
所以主題交換機也就實現了扇形交換機的功能,和直連交換機的功能。

另外還有 Header Exchange 頭交換機 ,Default Exchange 默認交換機,Dead Letter Exchange 死信交換機,這幾個該篇暫不做講述。

本次實例教程需要創建2個springboot項目,一個生產者,一個消費者。

首先創建 生產者,

pom.xml里用到的jar依賴:

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

  application.yml:

#本機端口
server:
  port: 8082
spring:
  #給項目來個名字
  application:
    name: rabbitmq-provider
  #配置rabbitMq 服務器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

  然后先使用下direct exchange(直連型交換機),創建DirectRabbitConfig.java

package com.wx.test.rtmdemo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author : laz
 * @CreateTime : 2021/11/14
 * @Description :
 **/
@Configuration
public class DirectRabbitConfig {

    //隊列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        //隊列的三個參數講解
        // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
        // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable
        // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
        //   return new Queue("TestDirectQueue",true,true,false);

        //一般設置一下隊列的持久化就好,其余兩個就是默認false
        return new Queue("TestDirectQueue",true);
    }

    //Direct交換機 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
      //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("TestDirectExchange",true,false);
    }

    //綁定  將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting
    @Bean
    Binding bindingDirect() {

        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }


}

  然后寫個簡單的接口進行消息推送:

    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        //將消息攜帶綁定鍵值:TestDirectRouting 發送到交換機TestDirectExchange
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
        return "ok";
    }

  將項目運行起來,調用這個接口:

 

 

 此時,在rabbitMq管理頁面可以看到,有一條推送的消息。

 然后有一個消息等待被消費。

 

 

 

 此時,說明我們的生產者的消息已經生產成功,只需要等待消費者消費即可!

接下來創建消費者服務:

首先是pom.xml文件的依賴:

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

  然后是application.yml:

server:
  port: 8083
spring:
  #給項目來個名字
  application:
    name: rabbitmq-consumer
  #配置rabbitMq 服務器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

然后是創建消息接收監聽類,DirectReceiver.java:

package com.wx.test.consumer.config;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@RabbitListener(queues = "TestDirectQueue")//監聽的隊列名稱 TestDirectQueue
public class DirectReceiver {

    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("DirectReceiver消費者收到消息  : " + testMessage.toString());
    }

}

  然后啟動項目,可以看到生產者的消息,在這邊被成功消費了。

 

 

 

 

 接下來是Topic Exchange交換機,配置文件以及依賴包不變,新建一個TopicRabbitConfig.java:

package com.wx.test.rtmdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author : laz
 * @CreateTime : 2021/11/14
 * @Description :
 **/

@Configuration
public class TopicRabbitConfig {
    //綁定鍵
    public final static String man = "topic.man";
    public final static String woman = "topic.woman";

    @Bean
    public Queue firstQueue() {
        return new Queue(TopicRabbitConfig.man);
    }

    @Bean
    public Queue secondQueue() {
        return new Queue(TopicRabbitConfig.woman);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }


    //將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man
    //這樣只要是消息攜帶的路由鍵是topic.man,才會分發到該隊列
    @Bean
    Binding bindingExchangeMessage() {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
    }

    //將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規則topic.#
    // 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發到該隊列
    @Bean
    Binding bindingExchangeMessage2() {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }

}

  然后在創建兩個接口,用於測試:

    @GetMapping("/sendTopicMessage1")
    public String sendTopicMessage1() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: M A N ";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> manMap = new HashMap<>();
        manMap.put("messageId", messageId);
        manMap.put("messageData", messageData);
        manMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
        return "ok";
    }

    @GetMapping("/sendTopicMessage2")
    public String sendTopicMessage2() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: woman is all ";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> womanMap = new HashMap<>();
        womanMap.put("messageId", messageId);
        womanMap.put("messageData", messageData);
        womanMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "topic.woman123", womanMap);
        return "ok";
    }

  然后在創建一個消費者TopicManReceiver.java::

package com.wx.test.consumer.config;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;

/**
 * @Author : laz
 * @CreateTime : 2021/11/14
 * @Description :
 **/
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {

    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("TopicManReceiver消費者收到消息  : " + testMessage.toString());
    }
}

  然后再創建一個消費者TopicTotalReceiver.java::

package com.wx.test.consumer.config;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;

/**
 * @Author : laz
 * @CreateTime : 2021/11/14
 * @Description :
 **/

@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {

    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("TopicTotalReceiver消費者收到消息  : " + testMessage.toString());
    }
}

  啟動項目,調用sendTopicMessage1這個接口:

 

 

 

可以看到,兩個消費者都消費了這條消息。

注意:

TopicManReceiver監聽隊列1,綁定鍵為:topic.man
TopicTotalReceiver監聽隊列2,綁定鍵為:topic.#
而當前推送的消息,攜帶的路由鍵為:topic.man

再調用sendTopicMessage2這個接口:

 

 這時,只有TopicTotalReceiver成功消費了。

 

 

然后是Fanout Exchang交換機:

首先,創建生產者:

package com.wx.test.rtmdemo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @Author : laz
 * @CreateTime : 2021/11/14
 * @Description :
 **/

@Configuration
public class FanoutRabbitConfig {

    /**
     *  創建三個隊列 :fanout.A   fanout.B  fanout.C
     *  將三個隊列都綁定在交換機 fanoutExchange 上
     *  因為是扇型交換機, 路由鍵無需配置,配置也不起作用
     */


    @Bean
    public Queue queueA() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue queueB() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue queueC() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA() {
        return BindingBuilder.bind(queueA()).to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeB() {
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeC() {
        return BindingBuilder.bind(queueC()).to(fanoutExchange());
    }
}

  同樣寫一個接口,用於生產一條消息:

    @GetMapping("/sendFanoutMessage")
    public String sendFanoutMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: testFanoutMessage ";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        rabbitTemplate.convertAndSend("fanoutExchange", null, map);
        return "ok";
    }

  接下來在消費者里面創建一個類,用於接收消息:

FanoutReceiverA.java:

package com.wx.test.consumer.config;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
 * @Author : laz
 * @CreateTime : 2021/11/14
 * @Description :
 **/
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("FanoutReceiverA消費者收到消息  : " +testMessage.toString());
    }

}

  

FanoutReceiverB.java:

package com.wx.test.consumer.config;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
 * @Author : laz
 * @CreateTime : 2021/11/14
 * @Description :
 **/
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {

    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("FanoutReceiverB消費者收到消息  : " +testMessage.toString());
    }

}

  

FanoutReceiverC.java:

package com.wx.test.consumer.config;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;

/**
 * @Author : laz
 * @CreateTime : 2021/11/14
 * @Description :
 **/
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {

    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("FanoutReceiverC消費者收到消息  : " +testMessage.toString());
    }

}

  最后啟動項目,調用我們生產者的接口,查看消息消費情況:

 

 可以看到,這三個類都消費到消息。

 

 

好了,這篇Springboot整合rabbitMq教程就暫且到此。本文若有錯誤,還請各路大佬指正指正!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM