SpringBoot筆記十四:消息隊列


什么是消息隊列

消息隊列就是消息存儲的容器,Java里面有兩種

  1. JMS:Sun公司出品,有兩種模式,點對點和發布訂閱。
  2. AMQP:消息隊列的一個協議,其實現有RabbitMQ,stormMQ等

我們會重點講解RabbitMQ

消息隊列的作用

異步通信

例如下面的案例,用戶注冊之后,我需要存儲用戶的信息,還要發郵件,發短信給用戶。傳統的方式呢,像第一個圖,一步一步來,需要150ms

第二幅圖呢好點了,用戶信息存儲到數據庫之后,發送郵件和發送短信兩個同時進行。需要100ms

第三幅圖使用了消息隊列,用戶信息存儲到數據庫之后,直接存入消息隊列,然后直接返回。是的,你沒看錯,直接返回了,這樣用戶等待的時間大概只需要55ms,真的是超級快了。那你可能會問了,消息隊列的方式郵件和短信不計算時間了?肯定計算啊,但是你發郵件和發短信和用戶有什么關系?用戶注冊之后提示注冊成功就可以了,郵件和短信可以異步的去執行發送。

總結:

  1. 用戶可以選擇等待150ms看到注冊成功的消息,有郵件和短信
  2. 用戶可以選擇等待100ms看到注冊成功的消息,有郵件和短信
  3. 用戶可以選擇等待55ms看到成功的消息,郵件和短信稍后回來

應用解耦

比如一個商城系統,用戶下了一個訂單,我們肯定要去庫存系統里面查一下,還有沒有貨了。這兩個系統寫在一起的話耦合度肯定是很高的。

現在采用消息隊列的方式,訂單系統和庫存系統可以獨立出來。用戶下了訂單,就存到消息隊列里面,庫存系統訂閱了消息隊列,里面一有內容就會去讀取。

流量削峰

所謂的流量削峰,就類似於現在的1元秒殺活動。假如1萬個人去秒殺一個1元商品,肯定不可能我1萬的人一個一個的去判斷,這樣負載太大了。原理就是1萬個用戶請求進消息隊列,我消息隊列就一個位置,誰進去了誰就1元秒殺了,后面沒進去的就不用判斷了。秒殺業務處理就對消息隊列里面的那一個幸運兒進行處理就可以了。所以,秒殺活動,你沒進去,就別傻傻的在刷新了,因為你已經沒有判斷的資格了.......

RabbitMQ

RabbitMQ流程簡介

講一下,RabbitMQ的流程,如下圖,首先是發布者發一個消息到RabbitMQ

我們可以看到RabbitMQ里面有很多的交換器路由和消息隊列。很多。

交換器路由可以綁定多個消息隊列,每個消息隊列可以被多個交換器路由綁定

發布者發布的消息選擇一個交換器路由,然后交換器路由會通過 模式 發給消息隊列,這個模式下面講。然后客戶可以去獲取隊列的消息。

RabbitMQ的三種模式

上面講了,交換器路由給隊列發消息是通過模式篩選的,模式有三種

  1. Direct:點對點模式,交換器路由只會給路由鍵為XXX的隊列發消息
  2. Topic:模糊匹配模式,# 匹配多個單詞,* 匹配一個單詞
  3. Fanout:廣播模式,交換器路由的每個綁定的隊列都會收到消息

這個模糊匹配模式,我需要講解一下,挺有意思的,#是多個單詞匹配,例如 Vae.#可以表示為 Vae.Music.com 。而是一個單詞,Vae.就只能跟一個單詞,例如 Vae.Music 后面不能再加了
假如我有一個交換器路由,綁定了4個隊列,分別為 Vae.com,Vae.Music.com,shuyunquan.com,shuyunquan.Music.com
現在我的交換器路由發一個消息,路由鍵是Vae.#,消息內容是:哈哈哈 。那么請問,這四個隊列,哪幾個可以收到消息呢?答案肯定是Vae.com,Vae.Music.com可以收到消息了,那么路由鍵是 #.com呢?又會是哪幾個隊列收到消息?簡單吧

學了RabbitMQ的流程和三種模式,我們要開始實戰一下了

安裝RabbitMQ

我們還是使用Docker安裝,你如果沒學過Docker,你就不會知道Docker有多爽,我寫的有Docker的博客,自己去翻閱學習。

先下載RabbitMQ,注意了,有latest版本和3-management版本的,兩個版本,這個也是我的血淚史啊,latest沒有后台網頁,3-management版本有后台網頁,別下錯了

docker pull docker.io/rabbitmq:3-management

運行鏡像,生成容器

docker run --name myrabbiymq -d  -p 5672:5672 -p 15672:15672  rabbitmq:3-management

這里可以參考我的Docker一文的血淚史,還需要講解一下,第一個 -p是5672,這個是RabbitMQ自己的端口,第二個 -p是15672,這個是給后台網頁使用的

啟動完成之后,我們打開瀏覽器,輸入ip+15672,成功訪問了

RabbitMQ交換器路由和隊列的創建與綁定

我們來新建3個交換器路由,Direct,Fanout,Topic,新建如下圖所示,訪問你的RabbitMQ的網頁,直接添加,type類型選擇一致的,然后Durability要選擇持久化的Durable,這樣我們下次打開RabbitMQ的時候,交換器路由還是存在的

照葫蘆畫瓢,我3個交換器路由全部新建完畢了。接下來新建3個隊列吧,如下圖:

現在把交換器路由和隊列進行綁定,Direct和Fanout的隊列名和key都是一樣的

但是Topic就不一樣了,這個是模糊匹配,Vae.Music,Vae.com的key都寫成Vae.#

RabbitMQ測試

綁定完成之后,發個消息測試一下,先來Direct的

果然是一對一,Direct一對一

再來試試Fanout

完美啊,加上我們Direct的一個,現在隊列的消息是2 1 1了,我就不點進去看了

最后,試試我們的模糊匹配,我現在發一個Vae.JJ,看看效果咋樣

3 2 1了,Nice啊!

RabbitMQ在Spring Boot中實現

引入RabbitMQ的Maven依賴

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.1.3.RELEASE</version>
</dependency>

這個Maven依賴沒有去找RabbitMQ,而是找的SpringBoot的啟動器,叫amqp

配置yml配置文件

指定一下我們RabbitMQ的服務器ip,用戶名和密碼都是默認的,我沒改

spring:
  rabbitmq:
    host: 193.112.28.104
    username: guest
    password: guest

RabbitMQ Direct寫入和讀取

先寫入RabbitMQ,我寫的是Object類型的數據,默認是會序列化的

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public  void testRabbitMQ(){
        //寫入消息,Object會自動序列化
        Map<String,Object> map=new HashMap<>();
        map.put("msg","這是一個消息標題");
        map.put("data", Arrays.asList("許嵩",123,true));
        rabbitTemplate.convertAndSend("exchange.direct","Vae.com",map);
    }

可以看到,都被序列化了

這個時候,我們的Vae.com隊列里面是有3個消息的,我們獲取一下消息

    @Test
    public void getRabbitMQ(){
        Object msg = rabbitTemplate.receiveAndConvert("Vae.com");
        System.out.println(msg.getClass());
        System.out.println(msg);
    }

你執行一下,會發現出來的消息不能看,因為Vae.com我們上面存了兩個不是Object類型的數據,我們執行3次這個獲取方法,可以發現,輸出內容是

class java.util.HashMap
{msg=這是一個消息標題, data=[許嵩, 123, true]}

這個時候,你再去RabbitMQ的網頁里面看Vae.com這個隊列的消息數目,發現已經變成0了,說明只要獲取方法一執行,隊列里的消息就被讀取了,就沒了

RabbitMQ使用json序列化Object

上面的Object序列化總歸不好看,為了好看,我們也可以使用json來序列化,替換Java本身的序列化就行,新建一個類,config文件夾下的MyRabbitMQConfig

package com.cache.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyAMQPConfig {
    @Bean
    public MessageConverter messageConverter(){
        return  new Jackson2JsonMessageConverter();
    }
}

就這一個方法就可以了,我們再次執行上面的寫入方法,然后來網頁上看看

讀取也是一樣的,讀出來也是json格式的,一目了然

自定義類類型上傳

這次不使用Object了,我們自己定義一個類型,我新建一個類,叫Book

package com.cache.bean;

public class Book {

    private String booName;
    private String author;

    public Book() {
    }

    public Book(String booName, String author) {
        this.booName = booName;
        this.author = author;
    }

    public String getBooName() {
        return booName;
    }

    public void setBooName(String booName) {
        this.booName = booName;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }
        @Override
    public String toString() {
        return "Book{" +
                "booName='" + booName + '\'' +
                ", author='" + author + '\'' +
                '}';
    }
}

我們接下來就傳這個類型的數據

    @Test
    public  void testRabbitMQ(){
        rabbitTemplate.convertAndSend("exchange.direct","Vae.com",new Book("海上靈光","許嵩"));
    }

看看RabbitMQ的網頁后台

很不錯,類型Book都標出來了

RabbitMQ Fanout寫入和讀取

廣播模式其實很簡單,寫入

    @Test
    public  void FanoutWrite(){
        //廣播的routingKey填不填都無所謂,沒用
        rabbitTemplate.convertAndSend("exchange.fanout","",new Book("三國演義","羅貫中"));
    }

我就不截圖了,RabbitMQ后台網頁都是OK的

剩下的讀取還有Topic都不講了,都一樣

發布者和訂閱者的監聽

發布者發布一個圖書的消息,我的訂閱者呢,可以立即的收到發布的消息,來寫一下代碼,發布者我們還使用上面的測試方法,訂閱者寫一個service

package com.cache.service;

import com.cache.bean.Book;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class BookService {
    @RabbitListener(queues = "Vae.com")
    public void receive(Book book){
        System.out.println("收到消息了,發布的書籍是:" + book);
    }
}

還不行,還需要在主方法里面開啟一下RabbitMQ的監聽注解

@EnableRabbit

然后啟動主方法,在啟動一下發布的方法,你會看到主方法的輸出框里已經有輸出內容了

如果你想看消息的消息頭信息,你可以這樣寫

//記住,Message是org.springframework.amqp.core.Message;包下的
    @RabbitListener(queues = "Vae.com")
    public void receiveHead(Message message){
        System.out.println(message.getBody());
        System.out.println(message.getMessageProperties());
    }

AmqpAdmin

AmqpAdmin就是使用代碼來創建交換器路由和隊列的,我們上面是自己在RabbitMQ的后台網頁創建的,現在通過Amqp來使用代碼進行創建

 @Autowired
    AmqpAdmin amqpAdmin;
    @Test
    public void amqp(){
        //創建交換器路由,還有很多參數,比如持久化我就不寫了
        amqpAdmin.declareExchange(new DirectExchange("exchange.DirectTest"));
        //創建隊列,可持久化為true
        amqpAdmin.declareQueue(new Queue("Vae.Vae+",true));
        //綁定交換器路由和隊列
        amqpAdmin.declareBinding(new Binding("Vae.Vae+",Binding.DestinationType.QUEUE,"exchange.DirectTest","Vae.Vae+",null));
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM