SpringBoot:消息隊列(集成RabbitMQ)


 簡介

使用Erlang語言編寫的一種消息中間件。消息中間件是一種數據傳送的消息傳遞機制,換句話說,是一種軟件應用之間的通訊方式。

舉個栗子

消息中間件的作用之一是應用解耦。
拿取快遞為例,前幾年的快遞收取方式通常是由快遞員上門派件,那么也就是說快遞員需要與顧客建立直接聯系;近年來,快遞行業蒸蒸日上,大量的快件如果由人工派送,不僅效率低,資源耗費也相當大。
而現在不管是小區,還是學校,都專門設置了快遞櫃,菜鳥驛站。這個快遞櫃就可以類比消息中間件,快遞員無需把快件送到顧客手中,只要放入對應區域設置的快遞櫃中,顧客就可以自己來取。這樣就消除了快遞員與顧客的直接聯,也就是應用解耦。

基礎概念

Broker:簡單來說就是消息隊列服務器實體
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列,Exchange有4種類型:direct(默認點對點),fanout(發布訂閱), topic(匹配規則發布訂閱), 和headers,不同類型的Exchange轉發消息的策略有所區別
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列
Binding:綁定,它的作用就是把 exchange和 queue按照路由規則綁定起來
Routing Key:路由關鍵字, exchange根據這個關鍵字進行消息投遞
vhost:虛擬主機,一個 broker里可以開設多個 vhost,用作不同用戶的權限分離
producer:消息生產者,就是投遞消息的程序
consumer:消息消費者,就是接受消息的程序
channel:消息通道,在客戶端的每個連接里,可建立多個 channel,每個 channel代表一個會話任務
Message:消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。

消息發送、接受流程

首先生產者把消息投遞到exchange交換機,
exchange接受到消息后,根據消息的key和已經設置好的binging,進行消息路由,將消息投遞到一個或者多個queue里面,
消費者從queue中取出消息,開始執行相應的代碼

 

RabbitMQ三種分發策略

1.direct 單播

根據消息體中的路由鍵指定分發的隊列

2.fanout 廣播

忽略路由鍵,分發消息到所有隊列

3.topic 多播(模式匹配)

路由鍵可包含模式匹配字符(*代表一個字符,#代表一個或多個字符)

搭建RabbitMQ環境

 1.下載安裝RabbitMQ鏡像

docker pull rabbitmq:3.8.1-management  #下載鏡像
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 68055d63a993 #啟動鏡像

-d :后台運行

-p:暴露端口

訪問:http://192.168.0.146:15672/(146服務器地址)  進入rabbitMQ管理頁面,默認用戶名和密碼為guest。

 


 

SpringBoot集成RabbitMQ

1.Pom(引入starter)

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置application.properties

spring.rabbitmq.host=192.168.0.146
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3.測試單播(點對點),多播(發布訂閱)

  @Autowired
    RabbitTemplate rabbitTemplate;

    //單播發送
    @Test
    void directTest() {
        Person person = new Person("mengY",25,"男");
        rabbitTemplate.convertAndSend("exchange.direct","queue_01", person);
    }


    //單播接收
    @Test
    void directReceive(){
        Person person = (Person)rabbitTemplate.receiveAndConvert("queue_01");
        System.out.println(person.getClass());
        System.out.println(person.toString());
    }



    //廣播發送
    @Test
    void fanoutTest() {
        Person person = new Person("mengY",25,"男");
        rabbitTemplate.convertAndSend("exchange.fanout","", person);
    }


    //廣播接收
    @Test
    void fanoutReceive(){
        Object obj = rabbitTemplate.receiveAndConvert("queue_03");
        System.out.println(obj.getClass());
        System.out.println(obj);
    }

rabboitMQ默認的是一java的方式序列化消息對象

 

 

 上面的展示方式不太直觀,有時我們需要用json的方式來序列化消息對象,可以通過配置MessageConverter來實現

@Configuration
public class RabbitConfig {
    /**
     * 消息轉換器
     * @return
     */
    @Bean
    public MessageConverter getMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

 

 

 消息監聽

@EnableRabbit
@SpringBootApplication
public class DemoMqApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoMqApplication.class, args);
    }

}
@EnableRabbit:開啟基於注解的rabbitMQ模式
@Service
public class PersonService {
    /**
     * 監聽接受消息
     */
    @RabbitListener(queues = "queue_01")
    public void receive(Person person){
        System.out.println("收到消息:"+person.toString());
    }
}

@RabbitListener:監聽消息隊列
@RabbitListener(queues = {"queue_03","queue_04"}):監聽多個隊列

AmqpAdmin管理組件

創建,刪除交換器,隊列

參考:https://blog.csdn.net/Leon_Jinhai_Sun/article/details/99692190

參考資料:

SpringBoot整合RabbitMQ之 典型應用場景實戰一

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM