SpringBoot系列之RabbitMQ使用實用教程


SpringBoot系列之RabbitMQ使用實用教程
@

1. 消息隊列概述

1.1 MQ的概述

消息隊列(Message Queue,簡稱MQ),其本質是個隊列,FIFO(First In First OUT,先入先出),MQ主要用於不同線程之間的線程通信。大多應用中,可通過消息服務中間件來提升系統異步通信、擴展解耦能力

兩個重要概念:

  • 消息代理(message broker)和目的地(destination)
    (消息發送者發送消息以后,將由消息代理broker接管,然后再傳遞到指定目的地)

1.2 MQ目的地形式

主要兩種形式的目的地:

  • 1.隊列(queue):也可以稱作為點對點式,即點對點消息通信(point-to-point),主要特點是消息只有唯一的發送者和接收者,但是不能說只有一個接收者,因為有可能是主從模式

  • 2.主題(topic):也可以稱作發布訂閱式,發送者(發布者)發送消息到主題,多個接收者(訂閱者)監聽(訂閱)這個主題

2. 消息隊列實現方式

2.1 常見MQ框架

MQ框架很多,比較流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里開源的RocketMQ等等

2.2 MQ實現方式

MQ框架的實現方式有多種,比如jms、amqp、mqtt等等,本文主要對比一下JMS和AMQP

JMS(Java Message Service)JAVA消息服務:

  • 基於JVM消息代理的規范。ActiveMQ、HornetMQ是JMS實現

在這里插入圖片描述
圖來自:https://www.javatpoint.com/jms-tutorial

AMQP(Advanced Message Queuing Protocol)

  • 高級消息隊列協議,也是一個消息代理的規范,兼容JMS, RabbitMQ是AMQP的實現
    在這里插入圖片描述
    引用尚硅谷視頻教程的總結圖示:
    在這里插入圖片描述

3. RabbitMQ簡介

3.1 RabbitMQ簡介

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。

開發語言:Erlang – 面向並發的編程語言。

3.2 核心概念

引用尚硅谷的視頻教程的歸納:

  • Message
    消息由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-
    該消息可能需要持久性存儲)等。
  • Publisher
    消息的生產者,也是一個向交換器發布消息的客戶端應用程序。
  • Exchange
    交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
    Exchange有4種類型:direct(默認),fanout, topic, 和headers,不同類型的Exchange轉發消息的策略有所區別
  • Queue
    消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
  • Binding
    綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。Exchange 和Queue的綁定可以是多對多的關系。
  • Connection
    網絡連接,比如一個TCP連接。
  • Channel
    信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內的虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對於操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接
  • Consumer
    消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
  • Virtual Host
    虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加
    密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有
    自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,
    RabbitMQ 默認的 vhost 是 / 。
  • Broker
    表示消息隊列服務器實體

學習尚硅谷課件的這些理論知識后,就可以很容易地理解RabbitMQ的體系結構如圖:
在這里插入圖片描述

3.3 RabbitMQ運行機制

RabbitMQ是基於AMQP協議,AMQP 中增加了Exchange 和 Binding這兩種角色,生產者發布消息后,發給代理Broker,主要還是由Exchange交換器處理,決定將消息發往那個消費者隊列
在這里插入圖片描述

3.4 Exchange類型

RabbitMQ目前共四種交換器類型:direct、fanout、topic、headers。headers 交換器和 direct 交換器完全一致,但性能差很多,用的比較少,所以只介紹三種類型

Direct Exchange:
在這里插入圖片描述
圖片來源:https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_MRG/2/html-single/Messaging_Programming_Reference/index.html

這種模式根據路由鍵(routing key)去匹配Bindings中的 binding key,如果完全一致,就發送消息到對應Queue

Fanout Exchange:
在這里插入圖片描述

這種模式是常見的發布訂閱模式,發消息方式類似於子網廣播,隊列只要綁定到對應的Exchange,生產者發送消息過來,有綁定的隊列都能接收消息

Topic Exchange:

在這里插入圖片描述
這種模式和Direct exchange有點像,不過Direct exchange是完全匹配,這種匹配方式是,先將路由鍵、bindings鍵根據點號隔開,# 表示匹配 0 個或多個單詞, “*”表示匹配一個單詞

4. RabbitMQ安裝部署

本文介紹基於Docker系統的RabbitMQ安裝部署

4.1 Docker版本部署RabbitMQ

查詢rabbitMQ鏡像:management版本,不指定默認為最新版本latest

 docker search rabbitmq:management

在這里插入圖片描述
拉取RabbitMQ鏡像:

docker pull rabbitmq:management

查看docker鏡像列表:

docker images

在這里插入圖片描述

啟動RabbitMQ:做下端口隱射

docker run -d -p 15672:15672  -p  5672:5672  -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest --name rabbitmq --hostname=rabbitmqhostone  rabbitmq:management
  • -d 后台運行
  • -p 隱射端口
  • --name 指定rabbitMQ名稱
  • RABBITMQ_DEFAULT_USER 指定用戶賬號
  • RABBITMQ_DEFAULT_PASS 指定賬號密碼

執行如上命令后訪問:http://ip:15672/

輸入默認賬號密碼:guest/guest
在這里插入圖片描述
在這里插入圖片描述

4.2 Admin新增用戶

用戶管理和權限管理都在Admin頁簽里
在這里插入圖片描述

  • 1、超級管理員(administrator)
    可登陸管理控制台,可查看所有的信息,並且可以對用戶,策略(policy)進行操作。
  • 2、監控者(monitoring)
    可登陸管理控制台,同時可以查看rabbitmq節點的相關信息(進程數,內存使用情況,磁盤使用情況等)
  • 3、策略制定者(policymaker)
    可登陸管理控制台, 同時可以對policy進行管理。但無法查看節點的相關信息
  • 4、普通管理者(management)
    僅可登陸管理控制台,無法看到節點信息,也無法對策略進行管理。
  • 5、其他
    無法登陸管理控制台,通常就是普通的生產者和消費者。

4.3 設置用戶權限

默認是Vitual host如圖所示
在這里插入圖片描述
設置topic permissions
在這里插入圖片描述

4.4 創建Virtual Hosts

在這里插入圖片描述
新增后,記得對應用戶也要設置權限,SpringBoot的yaml配置文件也得修改

4.5 其它管理配置

在這里插入圖片描述

5. SpringBoot集成RabbitMQ

5.1 引入spring-boot-starter-amqp

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

5.2 RabbitMQ YAML配置

注意spring-boot-starter-amqp有自動配置類,有些配置可以不需要配,詳情跟一下源碼

spring:
  rabbitmq:
    host: 192.168.7.135
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    #  支持發布確認
    publisher-confirms: true
    #  支持發布返回
    publisher-returns: true
    listener:
      simple:
        #  采用手動應答
        acknowledge-mode: manual
        #  當前監聽容器數
        concurrency: 1
        #  最大數
        max-concurrency: 1
        #  是否支持重試
        retry:
          enabled: true

5.3 RabbitMQ Boot支持

開啟支持RabbitMQ @EnableRabbit,同時配置自定義的AmqpTemplate Bean


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;


/**
 * <pre>
 *      RabbitMQ配置類
 * </pre>
 *
 * <pre>
 * @author mazq
 * 修改記錄
 *    修改后版本:     修改人:  修改日期: 2020/04/07 11:48  修改內容:
 * </pre>
 */
@Configuration
@EnableRabbit
public class RabbitMQConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    //@Primary
    public AmqpTemplate amqpTemplate(){
        Logger LOG = LoggerFactory.getLogger(AmqpTemplate.class);
        //使用jackson 消息轉換器(發送對象時候才開啟)
        //rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setEncoding("UTF-8");
        rabbitTemplate.setMandatory(true);
        // 開啟returncallback    yml 需要配置publisher-returns: true
        rabbitTemplate.setReturnCallback(((message,  replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationId();
            LOG.info("消息:{} 發送失敗, 應答碼:{} 原因:{} 交換機: {}  路由鍵: {}", correlationId, replyCode, replyText, exchange, routingKey);
        }));
        //開啟消息確認  yml 需要配置   publisher-returns: true
        rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) ->{
            if (ack) {
               LOG.info("消息發送到交換機成功,correlationId:{}",correlationData.getId());
            } else {
                LOG.info("消息發送到交換機失敗,原因:{}",cause);
            }
        } ));
        return rabbitTemplate;
    }
}

5.4 Direct Exchange例子

 /**
     * 聲明直連交換機 支持持久化.
     * @return the exchange
     */
    @Bean("directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange("amq.direct").durable(true).build();
    }

    @Bean("directQueue")
    public Queue directQueue(){
        return new Queue("directQueue", true, true, true);
        //return QueueBuilder.durable("directQueue").build();
    }

    @Bean
    public Binding directBinding(@Qualifier("directQueue")Queue queue,@Qualifier("directExchange")Exchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("direct_routingKey").noargs();
    }

在RabbitMQ管理平台,新增對應隊列,並新增綁定如圖所示:
在這里插入圖片描述
消息生產者:

package com.example.springboot.rabbitmq.component.direct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;

/**
 * <pre>
 *   消息生產者
 * </pre>
 *
 * <pre>
 * @author mazq
 * 修改記錄
 *    修改后版本:     修改人:  修改日期: 2020/04/07 13:42  修改內容:
 * </pre>
 */
@Component
public class DirectSender {

    Logger LOG = LoggerFactory.getLogger(DirectSender.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void send(int i) {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        String content = i+":hello!"+date;
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        LOG.info("class:{},message:{}","DirectSender",content);
        this.rabbitTemplate.convertAndSend("amq.direct","direct_routingKey",content,correlationData);
    }
}

消息接收者:

package com.example.springboot.rabbitmq.component.direct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * <pre>
 *   消息消費者
 * </pre>
 *
 * <pre>
 * @author mazq
 * 修改記錄
 *    修改后版本:     修改人:  修改日期: 2020/04/07 13:47  修改內容:
 * </pre>
 */
@Component
@RabbitListener(queues = {"directQueue"})
public class DirectReceiver {
    Logger LOG = LoggerFactory.getLogger(DirectReceiver.class);

    @RabbitHandler
    public void receiverMsg(String msg){
        LOG.info("class:{},message:{}","DirectReceiver",msg);
    }
}

Junit測試:

@Test
    void directSend(){
        directSender.send(1);
    }

在這里插入圖片描述

在這里插入圖片描述
查詢一下message:
在這里插入圖片描述

5.5 Fanout Exchange例子

配置開啟

@Bean("fanoutQueueA")
    public Queue fanoutQueueA(){
        return new Queue("fanoutQueueA", true, true, true);
    }

    @Bean("fanoutQueueB")
    public Queue fanoutQueueB(){
        return new Queue("fanoutQueueB", true, true, true);
    }

    @Bean("fanoutQueueC")
    public Queue fanoutQueueC(){
        return new Queue("fanoutQueueC", true, true, true);
    }

    /**
     * 聲明一個Fanout類型的交換器
     * @Author mazq
     * @Date 2020/04/08 11:25
     * @Param []
     * @return org.springframework.amqp.core.FanoutExchange
     */
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    public Binding fanoutABinding(@Qualifier("fanoutQueueA")Queue queue,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    @Bean
    public Binding fanoutBBinding(@Qualifier("fanoutQueueB")Queue queue,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    @Bean
    public Binding fanoutCBinding(@Qualifier("fanoutQueueC")Queue queue,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

新增3個接收者A、B、C:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = {"fanoutQueueA"})
public class FanoutReceiverA {

    Logger LOG = LoggerFactory.getLogger(FanoutReceiverA.class);

    @RabbitHandler
    public void process(String hello) {
        LOG.info("AReceiver  : " + hello + "/n");
    }
}

FanoutReceiverB、FanoutReceiverC代碼類似,不貼代碼

Fanout模式是發布訂閱模式,不需要綁定路由鍵,this.rabbitTemplate.convertAndSend("amq.fanout","",content,correlationData);,只要和fanout exchange綁定就可以,只要隊列綁定了fanout exchange,發送者發消息后,exchange都會將消息發給對應消費者隊列

import com.example.springboot.rabbitmq.component.direct.DirectSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;

@Component
public class FanoutSender {

    Logger LOG = LoggerFactory.getLogger(DirectSender.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send() {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        String content = "hello!"+date;
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        LOG.info("class:{},message:{}","FanoutSender",content);
        this.rabbitTemplate.convertAndSend("amq.fanout","",content,correlationData);
    }

}

同理在RabbitMQ管理新增對應隊列和綁定

在這里插入圖片描述
在這里插入圖片描述
用Junit進行測試消息發送,ReceiverA、B、C都可以接收到消息

5.6 Topic Exchange例子

新增兩個隊列,規則為topic.msg和topic.#,#表示匹配0或多個字符

@Bean("topicQueueA")
    public Queue topicQueueA(){
        return new Queue("topicQueueA",true, true, true);
    }

    @Bean("topicQueueB")
    public Queue topicQueueB(){
        return new Queue("topicQueueB",true, true, true);
    }

    @Bean("topicExchange")
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

    @Bean
    public Binding topicABinding(@Qualifier("topicQueueA")Queue queue,TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("topic.msg");
    }

    @Bean
    public Binding topicBBinding(@Qualifier("topicQueueB")Queue queue,TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("topic.#");
    }

接收者A代碼:

import com.example.springboot.rabbitmq.component.direct.DirectReceiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = {"topicQueueA"})
public class TopicReceiverA {

    Logger LOG = LoggerFactory.getLogger(DirectReceiver.class);

    @RabbitHandler
    public void receiverMsg(String msg){
        LOG.info("class:{},message:{}","TopicReceiverA",msg);
    }
}

TopicB代碼類似,不貼代碼,給出兩個發送者代碼:

import com.example.springboot.rabbitmq.component.direct.DirectSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;


@Component
public class TopicSender {

    Logger LOG = LoggerFactory.getLogger(DirectSender.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send1() {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        String content = "hello!"+date;
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        LOG.info("class:{},message:{}","TopicSender",content);
        this.rabbitTemplate.convertAndSend("amq.topic","topic.msg",content,correlationData);
    }

    public void send2() {
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        String content = "hello!"+date;
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        LOG.info("class:{},message:{}","TopicSender",content);
        this.rabbitTemplate.convertAndSend("amq.topic","topic.msg1",content,correlationData);
    }
}

同理進行隊列綁定
在這里插入圖片描述

TopicA:
在這里插入圖片描述
topicB:
在這里插入圖片描述
路由鍵是topic.msg、topic.msg1,所以send1方法執行后,兩個綁定鍵分別為topic.msg、topic.#的都可以收到消息,send2方法執行后,只有綁定鍵為topic.#的隊列能收到消息

在這里插入圖片描述

5.7 MQ對象支持例子

上面例子都是基於字符串的發送,接着可以進行對象數據的發送

import lombok.*;

import java.io.Serializable;

/**
 * User信息類
 * @Author mazq
 * @Date 2020/04/08 15:12
 */
@Data
@AllArgsConstructor
@ToString
public class User implements Serializable{

    private String name;

    private String pwd;

//    @Override
//    public String toString() {
//        return "User{" +
//                "name='" + name + '\'' +
//                ", pwd='" + pwd + '\'' +
//                '}';
//    }
}
//發送者
    public void send(User user) {
        LOG.info("Sender object: " + user.toString());
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend("amq.direct","direct_routingKey",user,correlationData);
    }

發送者:

import com.example.springboot.rabbitmq.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * <pre>
 *   消息消費者
 * </pre>
 *
 * <pre>
 * @author mazq
 * 修改記錄
 *    修改后版本:     修改人:  修改日期: 2020/04/07 13:47  修改內容:
 * </pre>
 */
@Component
@RabbitListener(queues = {"directQueue"})
public class DirectReceiver {
    Logger LOG = LoggerFactory.getLogger(DirectReceiver.class);

    //接收者
    @RabbitHandler
    public void process(User user) {
        LOG.info("Receiver object : " + user);
    }
}

修改配置類,需要換消息轉換器
在這里插入圖片描述

在這里插入圖片描述

5.8 參考資料和代碼例子

參考博客:
CSDN RabbitMQ教程
Springboot:RabbitMQ 詳解

代碼下載:github下載鏈接


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM