SpringBoot進階教程(五十)集成RabbitMQ---MQ實戰演練


RabbitMQ是一個在AMQP基礎上完成的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。RabbitMQ是AMQP(高級消息隊列協議)的標准實現。

消息中間件的工作過程可以用生產者消費者模型來表示.即,生產者不斷的向消息隊列發送信息,而消費者從消息隊列中消費信息.

如果你還沒有安裝rabbitmq的,可以看看這篇《centos安裝MQ》

不說了不說了,來一張圖直截了當的看看MQ工作的具體過程:

SpringBoot進階教程(二十二)集成RabbitMQ---MQ實戰演練

開局一張圖 故事全靠編.從上圖可看出,對於消息隊列來說,生產者,消息隊列,消費者是最重要的三個概念,生產者發消息到消息隊列中去,消費者監聽指定的消息隊列,並且當消息隊列收到消息之后,接收消息隊列傳來的消息,並且給予相應的處理.消息隊列常用於分布式系統之間互相信息的傳遞.

v基礎概念

對於RabbitMQ來說,除了這三個基本模塊以外,還添加了一個模塊,即交換機(Exchange).它使得生產者和消息隊列之間產生了隔離,生產者將消息發送給交換機,而交換機則根據調度策略把相應的消息轉發給對應的消息隊列.那么RabitMQ的工作流程如下所示:

SpringBoot進階教程(二十二)集成RabbitMQ---MQ實戰演練

關於rabbitmq幾個基礎名詞的介紹:

Broker: 簡單來說就是消息隊列服務器實體。
Exchange: 消息交換機,它指定消息按什么規則,路由到哪個隊列。
Queue: 消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding: 綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key: 路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost: 虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
producer: 消息生產者,就是投遞消息的程序。
consumer: 消息消費者,就是接受消息的程序。
channel: 消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。

交換機的主要作用是接收相應的消息並且綁定到指定的隊列.交換機有四種類型,分別為Direct,topic,headers,Fanout:

Direct: 處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個隊列綁定到該交換機上要求路由鍵 “demo”,則只有被標記為“demo”的消息才被轉發,不會轉發demo.ooo,也不會轉發test.123,只會轉發demo。
Topic: 轉發信息主要是依據通配符,將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。
Headers: 根據一個規則進行匹配,在消息隊列和交換機綁定的時候會指定一組鍵值對規則,而發送消息的時候也會指定一組鍵值對規則,當兩組鍵值對規則相匹配的時候,消息會被發送到匹配的消息隊列中.
Fanout: 路由廣播的形式,將會把消息發給綁定它的全部隊列,即便設置了key,也會被忽略.

v實戰演練

♛ 2.1 創建MQ

SpringBoot進階教程(二十二)集成RabbitMQ---MQ實戰演練

注:若是現有工程引入MQ,則添加Maven引用。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

這里我們延續之前springboot系列博文中的例子hellospringboot,在已有項目中添加mq的Maven引用。

♛ 2.2 application.properties

在application.properties文件當中引入RabbitMQ基本的配置信息

# ----- MQ -------- #
spring.rabbitmq.host=192.168.11.108
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
♛ 2.3 添加實體類MyModel
package com.demo.mq.model;

import java.io.Serializable;
import java.util.UUID;

/**
 * Created by toutou on 2019/1/1.
 */
public class MyModel implements Serializable {
    private static final long serialVersionUID = 1L;
    private UUID id;
    private String info;

    public UUID getId() {
        return id;
    }

    public void setId(UUID id) {
        this.id = id;
    }

    public String getInfo() {
        return info;
    }

    public void setInfo(String info) {
        this.info = info;
    }
}
♛ 2.4 添加RabbitConfig
package com.demo.mq.common;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

/**
 * Created by toutou on 2019/1/1.
 */
@Configuration
public class RabbitConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;


    public static final String EXCHANGE_A = "my-mq-exchange_A";
    public static final String EXCHANGE_B = "my-mq-exchange_B";


    public static final String QUEUE_A = "QUEUE_A";
    public static final String QUEUE_B = "QUEUE_B";

    public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
    public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    /**
     * 針對消費者配置
     * 1. 設置交換機類型
     * 2. 將隊列綁定到交換機
     FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念
     HeadersExchange :通過添加屬性key-value匹配
     DirectExchange:按照routingkey分發到指定隊列
     TopicExchange:多關鍵字匹配
     */
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE_A);
    }

    /**
     * 獲取隊列A
     * @return
     */
    @Bean
    public Queue queueA() {
        return new Queue(QUEUE_A, true); //隊列持久
    }

    /**
     * 獲取隊列B
     * @return
     */
    @Bean
    public Queue queueB() {
        return new Queue(QUEUE_B, true); //隊列持久
    }

    /**
     * 把交換機,隊列,通過路由關鍵字進行綁定
     * @return
     */
    @Bean
    public Binding binding() {

        return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }

    /**
     * 一個交換機可以綁定多個消息隊列,也就是消息通過一個交換機,可以分發到不同的隊列當中去。
     * @return
     */
    @Bean
    public Binding bindingB(){
        return BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B);
    }

}
♛ 2.5 添加消息的生產者MyProducer
package com.demo.mq.producer;

import com.demo.mq.common.RabbitConfig;
import com.demo.mq.model.MyModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Created by toutou on 2019/1/1.
 */
@Component
public class MyProducer implements RabbitTemplate.ConfirmCallback {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    //由於rabbitTemplate的scope屬性設置為ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自動注入
    private RabbitTemplate rabbitTemplate;

    /**
     * 構造方法注入rabbitTemplate
     */
    @Autowired
    public MyProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果為單例的話,那回調就是最后設置的內容
    }

    public void sendMsg(MyModel model) {
        //把消息放入ROUTINGKEY_A對應的隊列當中去,對應的是隊列A
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, model);
    }

    /**
     * 回調
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.info(" 回調id:" + correlationData);
        if (ack) {
            logger.info("消息成功消費");
        } else {
            logger.info("消息消費失敗:" + cause);
        }
    }
}
♛ 2.6 添加消息的消費者MyReceiver
package com.demo.mq.receiver;

import com.demo.mq.common.RabbitConfig;
import com.demo.mq.model.MyModel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Created by toutou on 2019/1/1.
 */
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MyReceiver {
    @RabbitHandler
    public void process(MyModel model) {
        System.out.println("接收處理隊列A當中的消息: " + model.getInfo());
    }
}
♛ 2.7 添加MyMQController
package com.demo.controller;

import com.demo.mq.model.MyModel;
import com.demo.mq.producer.MyProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
 * Created by toutou on 2019/1/1.
 */
@RestController
@Slf4j
public class MyMQController {
    @Autowired
    MyProducer myProducers;

    @GetMapping("/mq/producer")
    public String myProducer(String content){
        MyModel model = new MyModel();
        model.setId(UUID.randomUUID());
        model.setInfo(content);
        myProducers.sendMsg(model);
        return "已發送:" + content;
    }
}
♛ 2.8 項目整體目錄

SpringBoot進階教程(二十二)集成RabbitMQ---MQ實戰演練

 

♛ 2.9 調試

2.9.1 在頁面中請求http://localhost:8081/mq/producer?content=hello rabbitmq

SpringBoot進階教程(二十二)集成RabbitMQ---MQ實戰演練

2.9.2 查看http://ip:15672/#/queues的變化

關於RabbitMQ Management有疑問的,可以看上篇博文。《淺談RabbitMQ Management》

 

2.9.3 查看消費者日志記錄

SpringBoot進階教程(二十二)集成RabbitMQ---MQ實戰演練

這樣一個完整的rabbitmq實例就有了。

v源碼地址

https://github.com/toutouge/javademosecond/tree/master/hellospringboot


作  者:請叫我頭頭哥
出  處:http://www.cnblogs.com/toutou/
關於作者:專注於基礎平台的項目開發。如有問題或建議,請多多賜教!
版權聲明:本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。
特此聲明:所有評論和私信都會在第一時間回復。也歡迎園子的大大們指正錯誤,共同進步。或者直接私信
聲援博主:如果您覺得文章對您有幫助,可以點擊文章右下角推薦一下。您的鼓勵是作者堅持原創和持續寫作的最大動力!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM